The Gaudi Framework  v29r0 (ff2e7097)
IOBoundAlgSchedulerSvc.cpp
Go to the documentation of this file.
1 #include <algorithm>
2 #include <map>
3 #include <queue>
4 #include <sstream>
5 #include <thread>
6 #include <unordered_set>
7 
8 // Framework includes
10 #include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface
11 #include "GaudiKernel/IAlgorithm.h"
13 #include "GaudiKernel/SvcFactory.h"
14 
15 // Local
16 #include "IOBoundAlgSchedulerSvc.h"
17 
18 // Instantiation of a static factory class used by clients to create instances of this service
20 
21 //===========================================================================
22 // Infrastructure methods
23 
25  : base_class( name, svcLoc ), m_isActive( false )
26 {
27 }
28 
29 //---------------------------------------------------------------------------
31 //---------------------------------------------------------------------------
32 
38 {
39 
40  // Initialise mother class (read properties, ...)
42  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
43 
44  // Activate the scheduler in another thread.
45  info() << "Activating scheduler in a separate thread" << endmsg;
47 
48  return sc;
49 }
50 //---------------------------------------------------------------------------
51 
56 {
57 
59  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
60 
61  sc = deactivate();
62  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
63 
64  info() << "Joining preemptive scheduler's thread" << endmsg;
65  m_thread.join();
66 
67  return sc;
68 }
69 //---------------------------------------------------------------------------
77 {
78 
79  // Now it's running
80  m_isActive = true;
81 
82  // Wait for actions pushed into the queue by finishing tasks.
83  action thisAction;
85 
86  // Continue to wait if the scheduler is running or there is something to do
87  info() << "Start checking the queue of I/O-bound algorithm tasks.." << endmsg;
88  while ( m_isActive or m_actionsQueue.size() > 0 ) {
89  m_actionsQueue.pop( thisAction );
90  std::thread th( thisAction );
91  th.detach();
92  }
93 }
94 
95 //---------------------------------------------------------------------------
96 
104 {
105 
106  if ( m_isActive ) {
107  // Drain the scheduler
108  // m_actionsQueue.push(std::bind(&IOBoundSchedulerSvc::m_drain,
109  // this));
110  // we set the flag in this thread, not in the last action, to avoid stall,
111  // since we execute tasks asynchronously, in a detached thread, and it's possible that
112  // the task doesn't complete by the time the last while-iteration is entered
113  m_isActive = false;
114  // This would be the last (empty) action, just to trigger one last while-iteration
115  m_actionsQueue.push( [this]() -> StatusCode { return StatusCode::SUCCESS; } );
116  }
117 
118  return StatusCode::SUCCESS;
119 }
120 
122 {
123 
124  // the temporary lambda should be moved into the queue in here
125  auto actionn = [&]() {
126  debug() << " .. launching I/O-bound algo-closure .. " << endmsg;
127  return task.execute();
128  };
129 
130  m_actionsQueue.push( actionn );
131 
132  // until the queue to accelerator becomes limited
133  return StatusCode::SUCCESS;
134 }
135 
136 //===========================================================================
StatusCode initialize() override
Definition: Service.cpp:64
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:25
StatusCode finalize() override
Definition: Service.cpp:174
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:75
bool m_isActive
Flag to track if the scheduler is active or not.
Header file for class GaudiAlgorithm.
std::thread m_thread
The thread in which the activate function runs.
~IOBoundAlgSchedulerSvc() override
Destructor.
STL namespace.
virtual StatusCode execute()=0
StatusCode initialize() override
Initialise.
General interface for a wrapper around Gaudi algorithm.
Definition: IAlgTask.h:15
T join(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
StatusCode deactivate()
Deactivate scheduler.
T bind(T...args)
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:211
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
void activate()
Activate scheduler.
StatusCode push(IAlgTask &task) override
Add an algorithm to local queue to run on accelerator.
T detach(T...args)
StatusCode finalize() override
Finalise.
Please refer to the full documentation of the methods for more details.
tbb::concurrent_bounded_queue< action > m_actionsQueue
This is done since the copy of the lambda storage is too expensive.
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209