RoundRobinSchedulerSvc.cpp
Go to the documentation of this file.
1 // Framework includes
2 #include "GaudiKernel/SvcFactory.h"
3 #include "GaudiKernel/IAlgorithm.h"
4 #include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface
5 #include "GaudiKernel/IProperty.h"
6 #include "GaudiKernel/AppReturnCode.h"
7 
8 #include "GaudiKernel/ContextSpecificPtr.h"
9 
10 // C++
11 #include <list>
12 #include <thread>
13 #include <csignal>
14 
15 // Local
16 #include "RoundRobinSchedulerSvc.h"
17 #include "AlgResourcePool.h"
18 #include "RetCodeGuard.h"
19 
20 // Instantiation of a static factory class used by clients to create instances of this service
22 
23 //===========================================================================
24 // Infrastructure methods
25 
27  base_class(name,svcLoc){
28  declareProperty("UseTopAlgList", m_useTopAlgList=true);
29  declareProperty("SimultaneousEvents", m_freeSlots=1);
30 }
31 
32 //---------------------------------------------------------------------------
34 //---------------------------------------------------------------------------
35 
37 
38  // Initialise mother class (read properties, ...)
40  if (!sc.isSuccess())
41  warning () << "Base class could not be initialized" << endmsg;
42 
43  // Get the algo resource pool
44  m_algResourcePool = serviceLocator()->service("AlgResourcePool");
45  if (!m_algResourcePool.isValid()){
46  error() << "Error retrieving AlgResourcePool" << endmsg;
47  return StatusCode::FAILURE;
48  }
49 
50  // Get the list of algorithms
52  info() << "Found " << m_algList.size() << " algorithms" << endmsg;
53 
54  // Fill the containers to convert algo names to index
55  m_algname_index_map.reserve(m_algList.size());
56  m_algname_vect.reserve(m_algList.size());
57  unsigned int index=0;
58  for (IAlgorithm* algo : m_algList){
59  const std::string& name = algo->name();
61  m_algname_vect.emplace_back(name);
62  index++;
63  }
64 
65  //initialize control flow manager
66  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_algResourcePool.get());
67 
69 
70  return StatusCode::SUCCESS;
71 
72  // prepare the event slots
73  // TODO !
74 
75 }
76 //---------------------------------------------------------------------------
77 
80  if (!sc.isSuccess())
81  warning () << "Base class could not be finalized" << endmsg;
82  return sc;
83 }
84 
85 //---------------------------------------------------------------------------
86 
91 
92  // consistency check
93  if (!m_freeSlots) {
94  fatal() << "More contexts than slots provided" << m_freeSlots << endmsg;
95  return StatusCode::FAILURE;
96  }
97 
98  --m_freeSlots;
99  m_evtCtx_buffer.push_back(eventContext);
100  eventContext->setFail(false);
101 
103 }
104 
105 StatusCode RoundRobinSchedulerSvc::pushNewEvents(std::vector<EventContext*>& eventContexts){
106  // consistency check
107  if (eventContexts.size() > m_freeSlots) {
108  fatal() << "More contexts than slots provided" << m_freeSlots << endmsg;
109  return StatusCode::FAILURE;
110  }
111  m_freeSlots -= eventContexts.size();
112 
113  m_evtCtx_buffer.insert(m_evtCtx_buffer.end(), eventContexts.begin(), eventContexts.end());
114 
116 }
117 
118 //---------------------------------------------------------------------------
121 
122  // Get the IProperty interface of the ApplicationMgr to pass it to RetCodeGuard
123  const SmartIF<IProperty> appmgr(serviceLocator());
124  SmartIF<IMessageSvc> messageSvc (serviceLocator());
125 
126  //initialize control algorithm states and decisions
127  AlgsExecutionStates algStates(m_algList.size(), messageSvc);
128  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_algResourcePool.get());
129  std::vector<int> nodeDecisions(algPool->getExecutionFlowGraph()->getControlFlowNodeCounter(), -1);
130 
131 
132  m_controlFlow.updateEventState(algStates, nodeDecisions);
133  m_controlFlow.promoteToControlReadyState(algStates, nodeDecisions);
134 
135  //initialize data flow manager
136  //DataFlowManager dataFlow(m_scheduler->m_algosDependencies);
137 
138  info() << "Got " << m_evtCtx_buffer.size() << " events, starting loop" << endmsg;
139 
140  while(algStates.algsPresent(AlgsExecutionStates::State::CONTROLREADY) ){
141 
142  debug() << "algorithms left" << endmsg;
143 
144  //std::for_each(algStates.begin(AlgsExecutionStates::State::CONTROLREADY), algStates.end(AlgsExecutionStates::State::CONTROLREADY),
145 
146  //[&] (uint algIndex) {
147  for(auto it = algStates.begin(AlgsExecutionStates::State::CONTROLREADY); it != algStates.end(AlgsExecutionStates::State::CONTROLREADY); ++it){
148 
149  uint algIndex = *it;
150 
151  std::string algName = m_algname_vect[algIndex];
152 
153  debug() << "Running algorithm [" << algIndex << "] " << algName << endmsg;
154 
155  std::vector<AlgsExecutionStates::State> algResults(m_evtCtx_buffer.size());
156 
157  //promote algorithm to data ready
158  algStates.updateState(algIndex,AlgsExecutionStates::DATAREADY);
159 
160  IAlgorithm* ialgoPtr=nullptr;
161  m_algResourcePool->acquireAlgorithm(algName, ialgoPtr);
162  //promote algorithm to scheduled
163  algStates.updateState(algIndex,AlgsExecutionStates::SCHEDULED);
164 
165  Algorithm* algoPtr = dynamic_cast<Algorithm*> (ialgoPtr); // DP: expose the setter of the context?
166  algoPtr->resetExecuted();
167 
168  for (uint i = 0; i < m_evtCtx_buffer.size(); ++i) {
169  if (false == m_evtCtx_buffer[i]->evtFail()) {
170  bool eventfailed=false;
171 
172  // m_evtCtx_buffer[i]->m_thread_id = pthread_self();
173  algoPtr->resetExecuted();
174  algoPtr->setContext(m_evtCtx_buffer[i]);
176  // Call the execute() method
177  try {
179  sc = ialgoPtr->sysExecute();
180  if (UNLIKELY(!sc.isSuccess())) {
181  warning() << "Execution of algorithm " << algName << " failed for event " << m_evtCtx_buffer[i]->evt() << endmsg;
182  eventfailed = true;
183  }
184  rcg.ignore(); // disarm the guard
185  } catch ( const GaudiException& Exception ) {
186  error() << ".executeEvent(): Exception with tag=" << Exception.tag()
187  << " thrown by " << algName << endmsg;
188  error() << Exception << endmsg;
189  } catch ( const std::exception& Exception ) {
190  fatal() << ".executeEvent(): Standard std::exception thrown by "
191  << algName << endmsg;
192  error() << Exception.what() << endmsg;
193  } catch(...) {
194  fatal() << ".executeEvent(): UNKNOWN Exception thrown by "
195  << algName << endmsg;
196  }
197  m_evtCtx_buffer[i]->setFail(eventfailed);
198  }
199 
200  if (ialgoPtr->filterPassed()){
201  algResults[i] = AlgsExecutionStates::State::EVTACCEPTED;
202  } else {
203  algResults[i] = AlgsExecutionStates::State::EVTREJECTED;
204  }
205 
206  }
207 
208  m_algResourcePool->releaseAlgorithm(algName,ialgoPtr);
209 
210  AlgsExecutionStates::State result = algResults[0];
211  bool unanimous = true;
212  for(uint i = 1; i < algResults.size(); ++i)
213  if(result != algResults[i])
214  unanimous = false;
215 
216  if(unanimous)
217  algStates.updateState(algIndex,result);
218  else{
219  fatal() << "divergent algorithm execution" << endmsg;
220  fatal() << "Algorithm results: ";
221  for(uint i =0; i < algResults.size(); ++i){
222  fatal() << i << ": " << (algResults[i] == AlgsExecutionStates::State::EVTACCEPTED ? "A" : "R") << "\t";
223  if(algResults[i] == AlgsExecutionStates::State::EVTREJECTED){
224  //std::cerr << m_evtCtx_buffer[i]->m_evt_num << std::endl;
225  }
226  }
227  fatal() << endmsg;
228 
229  sc = StatusCode::FAILURE;
230  }
231  }
232  //});
233 
234  if(sc.isFailure())
235  break; //abort execution of events, something went wrong
236 
237  m_controlFlow.updateEventState(algStates, nodeDecisions);
238  m_controlFlow.promoteToControlReadyState(algStates, nodeDecisions);
239  }
240  for (EventContext* eventContext : m_evtCtx_buffer) {
241  m_finishedEvents.push(eventContext);
242  }
243 
244  m_evtCtx_buffer.clear();
245 
246  return sc; //TODO: define proper return value
247 }
248 
249 //---------------------------------------------------------------------------
252 
253  if(m_finishedEvents.empty() && !m_evtCtx_buffer.empty())
254  processEvents();
255 
256  m_finishedEvents.pop(eventContext);
257  m_freeSlots++;
258  debug() << "Popped slot " << eventContext->slot() << "(event "
259  << eventContext->evt() << ")" << endmsg;
260  return StatusCode::SUCCESS;
261 }
262 
263 //---------------------------------------------------------------------------
266  if (m_finishedEvents.try_pop(eventContext)){
267  debug() << "Try Pop successful slot " << eventContext->slot()
268  << "(event " << eventContext->evt() << ")" << endmsg;
269  m_freeSlots++;
270  return StatusCode::SUCCESS;
271  }
272  return StatusCode::FAILURE;
273 
274 }
275 //---------------------------------------------------------------------------
276 
281 
282 //---------------------------------------------------------------------------
void resetExecuted() override
Reset the executed state of the Algorithm for the duration of the current event.
Definition: Algorithm.cpp:939
virtual StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts)
StatusCode initialize() override
Definition: Service.cpp:63
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
Define general base for Gaudi exception.
Helper class to set the application return code in case of early exit (e.g.
Definition: RetCodeGuard.h:9
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
Definition: ISvcLocator.h:25
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode finalize() override
Definition: Service.cpp:188
virtual unsigned int freeSlots()
Get free slots number.
virtual StatusCode finalize()
Finalise.
std::list< IAlgorithm * > m_algList
Cache the list of algs to be executed.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
STL namespace.
virtual StatusCode popFinishedEvent(EventContext *&eventContext)
Blocks until an event is availble.
bool m_useTopAlgList
Decide if the top alglist or its flat version has to be used.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:22
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:86
GAUDI_API void setCurrentContextId(ContextIdType newId)
Used by the framework to change the value of the current context id.
unsigned int m_freeSlots
The number of free slots (0 or 1)
void setContext(EventContext *context)
set the context
Definition: Algorithm.h:556
virtual StatusCode sysExecute()=0
System execution. This method invokes the execute() method of a concrete algorithm.
SmartIF< IAlgResourcePool > m_algResourcePool
constexpr int UnhandledException
Definition: AppReturnCode.h:27
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
virtual std::list< IAlgorithm * > getTopAlgList()=0
Get top list of algorithms.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
virtual StatusCode tryPopFinishedEvent(EventContext *&eventContext)
Try to fetch an event from the scheduler.
void promoteToControlReadyState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions, const int &slotNum=-1) const
XXX: CF tests.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
long int evt() const
Definition: EventContext.h:37
void setFail(const bool &b=true)
Definition: EventContext.h:54
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
virtual StatusCode pushNewEvent(EventContext *eventContext)
Make an event available to the scheduler.
virtual const std::string & tag() const
name tag for the exception, or exception type
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:354
virtual concurrency::ExecutionFlowGraph * getExecutionFlowGraph() const
concurrency::ExecutionFlowManager m_controlFlow
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:23
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:77
virtual StatusCode initialize()
Initialise.
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
Base class used to extend a class implementing other interfaces.
Definition: extends.h:10
ID_type slot() const
Definition: EventContext.h:38
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
#define UNLIKELY(x)
Definition: Kernel.h:126
void ignore()
Definition: RetCodeGuard.h:13
StatusCode initialize(ExecutionFlowGraph *CFGraph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
list i
Definition: ana.py:128
std::vector< EventContext * > m_evtCtx_buffer
The RoundRobinSchedulerSvc implements the IScheduler interface.