The Gaudi Framework  v30r3 (a5ef0a68)
IOBoundAlgSchedulerSvc.cpp
Go to the documentation of this file.
1 #include <algorithm>
2 #include <map>
3 #include <queue>
4 #include <sstream>
5 #include <thread>
6 #include <unordered_set>
7 
8 // Framework includes
10 #include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface
11 #include "GaudiKernel/IAlgorithm.h"
13 
14 // Local
15 #include "IOBoundAlgSchedulerSvc.h"
16 
17 // Instantiation of a static factory class used by clients to create instances of this service
19 
20 
25 {
26 
27  // Initialise mother class (read properties, ...)
29  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
30 
31  // Activate the scheduler in another thread.
32  info() << "Activating scheduler in a separate thread" << endmsg;
34 
35  return sc;
36 }
37 //---------------------------------------------------------------------------
38 
43 {
44 
46  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
47 
48  sc = deactivate();
49  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
50 
51  info() << "Joining preemptive scheduler's thread" << endmsg;
52  m_thread.join();
53 
54  return sc;
55 }
56 //---------------------------------------------------------------------------
64 {
65 
66  // Now it's running
67  m_isActive = true;
68 
69  // Wait for actions pushed into the queue by finishing tasks.
70  action thisAction;
72 
73  // Continue to wait if the scheduler is running or there is something to do
74  info() << "Start checking the queue of I/O-bound algorithm tasks.." << endmsg;
75  while ( m_isActive or m_actionsQueue.size() > 0 ) {
76  m_actionsQueue.pop( thisAction );
77  std::thread th( thisAction );
78  th.detach();
79  }
80 }
81 
82 //---------------------------------------------------------------------------
83 
91 {
92 
93  if ( m_isActive ) {
94  // Drain the scheduler
95  // m_actionsQueue.push(std::bind(&IOBoundSchedulerSvc::m_drain,
96  // this));
97  // we set the flag in this thread, not in the last action, to avoid stall,
98  // since we execute tasks asynchronously, in a detached thread, and it's possible that
99  // the task doesn't complete by the time the last while-iteration is entered
100  m_isActive = false;
101  // This would be the last (empty) action, just to trigger one last while-iteration
102  m_actionsQueue.push( []() -> StatusCode { return StatusCode::SUCCESS; } );
103  }
104 
105  return StatusCode::SUCCESS;
106 }
107 
109 {
110 
111  // the temporary lambda should be moved into the queue in here
112  auto actionn = [&]() {
113  debug() << " .. launching I/O-bound algo-closure .. " << endmsg;
114  return task.execute();
115  };
116 
117  m_actionsQueue.push( actionn );
118 
119  // until the queue to accelerator becomes limited
120  return StatusCode::SUCCESS;
121 }
122 
123 //===========================================================================
StatusCode initialize() override
Definition: Service.cpp:63
StatusCode finalize() override
Definition: Service.cpp:173
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
bool isSuccess() const
Definition: StatusCode.h:287
bool m_isActive
Flag to track if the scheduler is active or not.
Header file for class GaudiAlgorithm.
std::thread m_thread
The thread in which the activate function runs.
virtual StatusCode execute()=0
#define DECLARE_COMPONENT(type)
General interface for a wrapper around Gaudi algorithm.
Definition: IAlgTask.h:15
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
T join(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:51
StatusCode deactivate()
Deactivate scheduler.
T bind(T...args)
constexpr static const auto SUCCESS
Definition: StatusCode.h:87
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
void activate()
Activate scheduler.
StatusCode push(IAlgTask &task) override
Add an algorithm to local queue to run on accelerator.
T detach(T...args)
StatusCode finalize() override
Finalise.
Please refer to the full documentation of the methods for more details.
tbb::concurrent_bounded_queue< action > m_actionsQueue
This is done since the copy of the lambda storage is too expensive.
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209