IOBoundAlgSchedulerSvc.cpp
Go to the documentation of this file.
1 #include <unordered_set>
2 #include <algorithm>
3 #include <map>
4 #include <sstream>
5 #include <queue>
6 #include <thread>
7 
8 // Framework includes
10 #include "GaudiKernel/IAlgorithm.h"
11 #include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface
14 
15 // Local
16 #include "IOBoundAlgSchedulerSvc.h"
17 
18 // Instantiation of a static factory class used by clients to create instances of this service
20 
21 //===========================================================================
22 // Infrastructure methods
23 
25  base_class(name,svcLoc),
26  m_isActive(false)
27 {}
28 
29 //---------------------------------------------------------------------------
31 //---------------------------------------------------------------------------
32 
38 
39  // Initialise mother class (read properties, ...)
41  if (!sc.isSuccess())
42  warning () << "Base class could not be initialized" << endmsg;
43 
44  // Activate the scheduler in another thread.
45  info() << "Activating scheduler in a separate thread" << endmsg;
47  this));
48 
49  return sc;
50 
51 }
52 //---------------------------------------------------------------------------
53 
58 
60  if (!sc.isSuccess())
61  warning () << "Base class could not be finalized" << endmsg;
62 
63  sc = deactivate();
64  if (!sc.isSuccess())
65  warning () << "Scheduler could not be deactivated" << endmsg;
66 
67  info() << "Joining preemptive scheduler's thread" << endmsg;
68  m_thread.join();
69 
70  return sc;
71 
72  }
73 //---------------------------------------------------------------------------
81 
82  // Now it's running
83  m_isActive=true;
84 
85  // Wait for actions pushed into the queue by finishing tasks.
86  action thisAction;
88 
89  // Continue to wait if the scheduler is running or there is something to do
90  info() << "Start checking the queue of I/O-bound algorithm tasks.." << endmsg;
91  while(m_isActive or m_actionsQueue.size() > 0){
92  m_actionsQueue.pop(thisAction);
93  std::thread th(thisAction);
94  th.detach();
95  }
96 }
97 
98 //---------------------------------------------------------------------------
99 
107 
108  if (m_isActive){
109  // Drain the scheduler
110  //m_actionsQueue.push(std::bind(&IOBoundSchedulerSvc::m_drain,
111  // this));
112  // we set the flag in this thread, not in the last action, to avoid stall,
113  // since we execute tasks asynchronously, in a detached thread, and it's possible that
114  // the task doesn't complete by the time the last while-iteration is entered
115  m_isActive = false;
116  // This would be the last (empty) action, just to trigger one last while-iteration
117  m_actionsQueue.push([this]() -> StatusCode {return StatusCode::SUCCESS;});
118  }
119 
120  return StatusCode::SUCCESS;
121 }
122 
124 
125  // the temporary lambda should be moved into the queue in here
126  auto actionn = [&](){
127  debug() << " .. launching I/O-bound algo-closure .. " << endmsg;
128  return task.execute();
129  };
130 
131  m_actionsQueue.push(actionn);
132 
133  // until the queue to accelerator becomes limited
134  return StatusCode::SUCCESS;
135 }
136 
137 //===========================================================================
StatusCode initialize() override
Definition: Service.cpp:64
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:25
StatusCode finalize() override
Definition: Service.cpp:174
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
bool m_isActive
Flag to track if the scheduler is active or not.
Header file for class GaudiAlgorithm.
std::thread m_thread
The thread in which the activate function runs.
~IOBoundAlgSchedulerSvc() override
Destructor.
STL namespace.
virtual StatusCode execute()=0
StatusCode initialize() override
Initialise.
General interface for a wrapper around Gaudi algorithm.
Definition: IAlgTask.h:15
T join(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode deactivate()
Deactivate scheduler.
T bind(T...args)
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:242
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
void activate()
Activate scheduler.
StatusCode push(IAlgTask &task) override
Add an algorithm to local queue to run on accelerator.
T detach(T...args)
StatusCode finalize() override
Finalise.
Please refer to the full documentation of the methods for more details.
tbb::concurrent_bounded_queue< action > m_actionsQueue
This is done since the copy of the lambda storage is too expensive.
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244