Loading [MathJax]/extensions/tex2jax.js
The Gaudi Framework  v31r0 (aeb156f0)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
IOBoundAlgSchedulerSvc.cpp
Go to the documentation of this file.
1 #include <algorithm>
2 #include <map>
3 #include <queue>
4 #include <sstream>
5 #include <thread>
6 #include <unordered_set>
7 
8 // Framework includes
10 #include "GaudiKernel/IAlgorithm.h"
12 #include <Gaudi/Algorithm.h> // will be IAlgorithm if context getter promoted to interface
13 
14 // Local
15 #include "IOBoundAlgSchedulerSvc.h"
16 
17 // Instantiation of a static factory class used by clients to create instances of this service
19 
20 
25 
26  // Initialise mother class (read properties, ...)
28  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
29 
30  // Activate the scheduler in another thread.
31  info() << "Activating scheduler in a separate thread" << endmsg;
33 
34  return sc;
35 }
36 //---------------------------------------------------------------------------
37 
42 
44  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
45 
46  sc = deactivate();
47  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
48 
49  info() << "Joining preemptive scheduler's thread" << endmsg;
50  m_thread.join();
51 
52  return sc;
53 }
54 //---------------------------------------------------------------------------
62 
63  // Now it's running
64  m_isActive = true;
65 
66  // Wait for actions pushed into the queue by finishing tasks.
67  action thisAction;
69 
70  // Continue to wait if the scheduler is running or there is something to do
71  info() << "Start checking the queue of I/O-bound algorithm tasks.." << endmsg;
72  while ( m_isActive or m_actionsQueue.size() > 0 ) {
73  m_actionsQueue.pop( thisAction );
74  std::thread th( thisAction );
75  th.detach();
76  }
77 }
78 
79 //---------------------------------------------------------------------------
80 
88 
89  if ( m_isActive ) {
90  // Drain the scheduler
91  // m_actionsQueue.push(std::bind(&IOBoundSchedulerSvc::m_drain,
92  // this));
93  // we set the flag in this thread, not in the last action, to avoid stall,
94  // since we execute tasks asynchronously, in a detached thread, and it's possible that
95  // the task doesn't complete by the time the last while-iteration is entered
96  m_isActive = false;
97  // This would be the last (empty) action, just to trigger one last while-iteration
98  m_actionsQueue.push( []() -> StatusCode { return StatusCode::SUCCESS; } );
99  }
100 
101  return StatusCode::SUCCESS;
102 }
103 
105 
106  // the temporary lambda should be moved into the queue in here
107  auto actionn = [&]() {
108  debug() << " .. launching I/O-bound algo-closure .. " << endmsg;
109  return task.execute();
110  };
111 
112  m_actionsQueue.push( actionn );
113 
114  // until the queue to accelerator becomes limited
115  return StatusCode::SUCCESS;
116 }
117 
118 //===========================================================================
StatusCode initialize() override
Definition: Service.cpp:60
StatusCode finalize() override
Definition: Service.cpp:164
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
bool isSuccess() const
Definition: StatusCode.h:267
bool m_isActive
Flag to track if the scheduler is active or not.
Header file for class GaudiAlgorithm.
std::thread m_thread
The thread in which the activate function runs.
constexpr static const auto SUCCESS
Definition: StatusCode.h:85
virtual StatusCode execute()=0
#define DECLARE_COMPONENT(type)
General interface for a wrapper around Gaudi algorithm.
Definition: IAlgTask.h:15
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
T join(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
StatusCode deactivate()
Deactivate scheduler.
T bind(T...args)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
void activate()
Activate scheduler.
StatusCode push(IAlgTask &task) override
Add an algorithm to local queue to run on accelerator.
T detach(T...args)
StatusCode finalize() override
Finalise.
Please refer to the full documentation of the methods for more details.
tbb::concurrent_bounded_queue< action > m_actionsQueue
This is done since the copy of the lambda storage is too expensive.
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192