RoundRobinSchedulerSvc Class Reference

The RoundRobinSchedulerSvc implements the IScheduler interface. More...

#include <src/RoundRobinSchedulerSvc.h>

Inheritance diagram for RoundRobinSchedulerSvc:
Collaboration diagram for RoundRobinSchedulerSvc:

Public Member Functions

 RoundRobinSchedulerSvc (const std::string &name, ISvcLocator *svc)
 Constructor. More...
 
 ~RoundRobinSchedulerSvc ()
 Destructor. More...
 
virtual StatusCode initialize ()
 Initialise. More...
 
virtual StatusCode finalize ()
 Finalise. More...
 
virtual StatusCode pushNewEvent (EventContext *eventContext)
 Make an event available to the scheduler. More...
 
virtual StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts)
 
virtual StatusCode popFinishedEvent (EventContext *&eventContext)
 Blocks until an event is availble. More...
 
virtual StatusCode tryPopFinishedEvent (EventContext *&eventContext)
 Try to fetch an event from the scheduler. More...
 
virtual unsigned int freeSlots ()
 Get free slots number. More...
 
- Public Member Functions inherited from extends< BASE, Interfaces >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::string > getInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
 ~extends () override=default
 Virtual destructor. More...
 
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::string > getInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
 ~extends () override=default
 Virtual destructor. More...
 
- Public Member Functions inherited from extend_interfaces< Interfaces...>
 ~extend_interfaces () override=default
 Virtual destructor. More...
 
 ~extend_interfaces () override=default
 Virtual destructor. More...
 

Private Member Functions

StatusCode processEvents ()
 

Private Attributes

bool m_useTopAlgList
 Decide if the top alglist or its flat version has to be used. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 
concurrency::ExecutionFlowManager m_controlFlow
 
std::vector< std::string > m_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::vector< std::string > > m_algosDependencies
 Ugly, will disappear when the deps are declared only within the C++ code of the algos. More...
 
std::list< IAlgorithm * > m_algList
 Cache the list of algs to be executed. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
unsigned int m_freeSlots
 The number of free slots (0 or 1) More...
 
std::vector< EventContext * > m_evtCtx_buffer
 

Additional Inherited Members

- Public Types inherited from extends< BASE, Interfaces >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces...>
 Typedef to the base of this class. More...
 
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces...>
 Typedef to the base of this class. More...
 
- Public Types inherited from extend_interfaces< Interfaces...>
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids...>::type
 take union of the ext_iids of all Interfaces... More...
 
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids...>::type
 take union of the ext_iids of all Interfaces... More...
 

Detailed Description

The RoundRobinSchedulerSvc implements the IScheduler interface.

It deals with multiple events and tries to handle all events algorithm type by algorithm type, using one single thread. It serves as simple implementation against the concurrent state machine and provides a test for instruction cache locality

Author
Benedikt Hegner
Version
1.0

Definition at line 39 of file RoundRobinSchedulerSvc.h.

Constructor & Destructor Documentation

RoundRobinSchedulerSvc::RoundRobinSchedulerSvc ( const std::string &  name,
ISvcLocator svc 
)

Constructor.

Definition at line 26 of file RoundRobinSchedulerSvc.cpp.

26  :
27  base_class(name,svcLoc){
28  declareProperty("UseTopAlgList", m_useTopAlgList=true);
29  declareProperty("SimultaneousEvents", m_freeSlots=1);
30 }
bool m_useTopAlgList
Decide if the top alglist or its flat version has to be used.
extends base_class
Typedef to this class.
Definition: extends.h:14
unsigned int m_freeSlots
The number of free slots (0 or 1)
RoundRobinSchedulerSvc::~RoundRobinSchedulerSvc ( )

Destructor.

Definition at line 33 of file RoundRobinSchedulerSvc.cpp.

33 {}

Member Function Documentation

StatusCode RoundRobinSchedulerSvc::finalize ( )
virtual

Finalise.

Definition at line 78 of file RoundRobinSchedulerSvc.cpp.

78  {
80  if (!sc.isSuccess())
81  warning () << "Base class could not be finalized" << endmsg;
82  return sc;
83 }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode finalize() override
Definition: Service.cpp:188
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
unsigned int RoundRobinSchedulerSvc::freeSlots ( )
virtual

Get free slots number.

Given that the scheduler is sequential and its methods non reentrant, this is always 1.

Definition at line 280 of file RoundRobinSchedulerSvc.cpp.

280 {return m_freeSlots;}
unsigned int m_freeSlots
The number of free slots (0 or 1)
StatusCode RoundRobinSchedulerSvc::initialize ( )
virtual

Initialise.

Definition at line 36 of file RoundRobinSchedulerSvc.cpp.

36  {
37 
38  // Initialise mother class (read properties, ...)
40  if (!sc.isSuccess())
41  warning () << "Base class could not be initialized" << endmsg;
42 
43  // Get the algo resource pool
44  m_algResourcePool = serviceLocator()->service("AlgResourcePool");
45  if (!m_algResourcePool.isValid()){
46  error() << "Error retrieving AlgResourcePool" << endmsg;
47  return StatusCode::FAILURE;
48  }
49 
50  // Get the list of algorithms
52  info() << "Found " << m_algList.size() << " algorithms" << endmsg;
53 
54  // Fill the containers to convert algo names to index
55  m_algname_index_map.reserve(m_algList.size());
56  m_algname_vect.reserve(m_algList.size());
57  unsigned int index=0;
58  for (IAlgorithm* algo : m_algList){
59  const std::string& name = algo->name();
61  m_algname_vect.emplace_back(name);
62  index++;
63  }
64 
65  //initialize control flow manager
66  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_algResourcePool.get());
67 
69 
70  return StatusCode::SUCCESS;
71 
72  // prepare the event slots
73  // TODO !
74 
75 }
StatusCode initialize() override
Definition: Service.cpp:63
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
std::list< IAlgorithm * > m_algList
Cache the list of algs to be executed.
bool m_useTopAlgList
Decide if the top alglist or its flat version has to be used.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
SmartIF< IAlgResourcePool > m_algResourcePool
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
virtual std::list< IAlgorithm * > getTopAlgList()=0
Get top list of algorithms.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
virtual concurrency::ExecutionFlowGraph * getExecutionFlowGraph() const
concurrency::ExecutionFlowManager m_controlFlow
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:23
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
StatusCode initialize(ExecutionFlowGraph *CFGraph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
StatusCode RoundRobinSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
virtual

Blocks until an event is availble.

Definition at line 251 of file RoundRobinSchedulerSvc.cpp.

251  {
252 
253  if(m_finishedEvents.empty() && !m_evtCtx_buffer.empty())
254  processEvents();
255 
256  m_finishedEvents.pop(eventContext);
257  m_freeSlots++;
258  debug() << "Popped slot " << eventContext->slot() << "(event "
259  << eventContext->evt() << ")" << endmsg;
260  return StatusCode::SUCCESS;
261 }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
unsigned int m_freeSlots
The number of free slots (0 or 1)
long int evt() const
Definition: EventContext.h:37
ID_type slot() const
Definition: EventContext.h:38
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< EventContext * > m_evtCtx_buffer
StatusCode RoundRobinSchedulerSvc::processEvents ( )
private

Definition at line 119 of file RoundRobinSchedulerSvc.cpp.

119  {
121 
122  // Get the IProperty interface of the ApplicationMgr to pass it to RetCodeGuard
123  const SmartIF<IProperty> appmgr(serviceLocator());
124  SmartIF<IMessageSvc> messageSvc (serviceLocator());
125 
126  //initialize control algorithm states and decisions
127  AlgsExecutionStates algStates(m_algList.size(), messageSvc);
128  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_algResourcePool.get());
129  std::vector<int> nodeDecisions(algPool->getExecutionFlowGraph()->getControlFlowNodeCounter(), -1);
130 
131 
132  m_controlFlow.updateEventState(algStates, nodeDecisions);
133  m_controlFlow.promoteToControlReadyState(algStates, nodeDecisions);
134 
135  //initialize data flow manager
136  //DataFlowManager dataFlow(m_scheduler->m_algosDependencies);
137 
138  info() << "Got " << m_evtCtx_buffer.size() << " events, starting loop" << endmsg;
139 
140  while(algStates.algsPresent(AlgsExecutionStates::State::CONTROLREADY) ){
141 
142  debug() << "algorithms left" << endmsg;
143 
144  //std::for_each(algStates.begin(AlgsExecutionStates::State::CONTROLREADY), algStates.end(AlgsExecutionStates::State::CONTROLREADY),
145 
146  //[&] (uint algIndex) {
147  for(auto it = algStates.begin(AlgsExecutionStates::State::CONTROLREADY); it != algStates.end(AlgsExecutionStates::State::CONTROLREADY); ++it){
148 
149  uint algIndex = *it;
150 
151  std::string algName = m_algname_vect[algIndex];
152 
153  debug() << "Running algorithm [" << algIndex << "] " << algName << endmsg;
154 
155  std::vector<AlgsExecutionStates::State> algResults(m_evtCtx_buffer.size());
156 
157  //promote algorithm to data ready
158  algStates.updateState(algIndex,AlgsExecutionStates::DATAREADY);
159 
160  IAlgorithm* ialgoPtr=nullptr;
161  m_algResourcePool->acquireAlgorithm(algName, ialgoPtr);
162  //promote algorithm to scheduled
163  algStates.updateState(algIndex,AlgsExecutionStates::SCHEDULED);
164 
165  Algorithm* algoPtr = dynamic_cast<Algorithm*> (ialgoPtr); // DP: expose the setter of the context?
166  algoPtr->resetExecuted();
167 
168  for (uint i = 0; i < m_evtCtx_buffer.size(); ++i) {
169  if (false == m_evtCtx_buffer[i]->evtFail()) {
170  bool eventfailed=false;
171 
172  // m_evtCtx_buffer[i]->m_thread_id = pthread_self();
173  algoPtr->resetExecuted();
174  algoPtr->setContext(m_evtCtx_buffer[i]);
176  // Call the execute() method
177  try {
179  sc = ialgoPtr->sysExecute();
180  if (UNLIKELY(!sc.isSuccess())) {
181  warning() << "Execution of algorithm " << algName << " failed for event " << m_evtCtx_buffer[i]->evt() << endmsg;
182  eventfailed = true;
183  }
184  rcg.ignore(); // disarm the guard
185  } catch ( const GaudiException& Exception ) {
186  error() << ".executeEvent(): Exception with tag=" << Exception.tag()
187  << " thrown by " << algName << endmsg;
188  error() << Exception << endmsg;
189  } catch ( const std::exception& Exception ) {
190  fatal() << ".executeEvent(): Standard std::exception thrown by "
191  << algName << endmsg;
192  error() << Exception.what() << endmsg;
193  } catch(...) {
194  fatal() << ".executeEvent(): UNKNOWN Exception thrown by "
195  << algName << endmsg;
196  }
197  m_evtCtx_buffer[i]->setFail(eventfailed);
198  }
199 
200  if (ialgoPtr->filterPassed()){
201  algResults[i] = AlgsExecutionStates::State::EVTACCEPTED;
202  } else {
203  algResults[i] = AlgsExecutionStates::State::EVTREJECTED;
204  }
205 
206  }
207 
208  m_algResourcePool->releaseAlgorithm(algName,ialgoPtr);
209 
210  AlgsExecutionStates::State result = algResults[0];
211  bool unanimous = true;
212  for(uint i = 1; i < algResults.size(); ++i)
213  if(result != algResults[i])
214  unanimous = false;
215 
216  if(unanimous)
217  algStates.updateState(algIndex,result);
218  else{
219  fatal() << "divergent algorithm execution" << endmsg;
220  fatal() << "Algorithm results: ";
221  for(uint i =0; i < algResults.size(); ++i){
222  fatal() << i << ": " << (algResults[i] == AlgsExecutionStates::State::EVTACCEPTED ? "A" : "R") << "\t";
223  if(algResults[i] == AlgsExecutionStates::State::EVTREJECTED){
224  //std::cerr << m_evtCtx_buffer[i]->m_evt_num << std::endl;
225  }
226  }
227  fatal() << endmsg;
228 
230  }
231  }
232  //});
233 
234  if(sc.isFailure())
235  break; //abort execution of events, something went wrong
236 
237  m_controlFlow.updateEventState(algStates, nodeDecisions);
238  m_controlFlow.promoteToControlReadyState(algStates, nodeDecisions);
239  }
240  for (EventContext* eventContext : m_evtCtx_buffer) {
241  m_finishedEvents.push(eventContext);
242  }
243 
244  m_evtCtx_buffer.clear();
245 
246  return sc; //TODO: define proper return value
247 }
void resetExecuted() override
Reset the executed state of the Algorithm for the duration of the current event.
Definition: Algorithm.cpp:939
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
Define general base for Gaudi exception.
Helper class to set the application return code in case of early exit (e.g.
Definition: RetCodeGuard.h:9
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
std::list< IAlgorithm * > m_algList
Cache the list of algs to be executed.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:22
GAUDI_API void setCurrentContextId(ContextIdType newId)
Used by the framework to change the value of the current context id.
void setContext(EventContext *context)
set the context
Definition: Algorithm.h:556
virtual StatusCode sysExecute()=0
System execution. This method invokes the execute() method of a concrete algorithm.
SmartIF< IAlgResourcePool > m_algResourcePool
constexpr int UnhandledException
Definition: AppReturnCode.h:27
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
void promoteToControlReadyState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions, const int &slotNum=-1) const
XXX: CF tests.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
virtual const std::string & tag() const
name tag for the exception, or exception type
concurrency::ExecutionFlowManager m_controlFlow
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:23
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:77
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
#define UNLIKELY(x)
Definition: Kernel.h:126
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
list i
Definition: ana.py:128
std::vector< EventContext * > m_evtCtx_buffer
StatusCode RoundRobinSchedulerSvc::pushNewEvent ( EventContext eventContext)
virtual

Make an event available to the scheduler.

Immediately the algortihms are executed.

Definition at line 90 of file RoundRobinSchedulerSvc.cpp.

90  {
91 
92  // consistency check
93  if (!m_freeSlots) {
94  fatal() << "More contexts than slots provided" << m_freeSlots << endmsg;
95  return StatusCode::FAILURE;
96  }
97 
98  --m_freeSlots;
99  m_evtCtx_buffer.push_back(eventContext);
100  eventContext->setFail(false);
101 
103 }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
unsigned int m_freeSlots
The number of free slots (0 or 1)
void setFail(const bool &b=true)
Definition: EventContext.h:54
std::vector< EventContext * > m_evtCtx_buffer
StatusCode RoundRobinSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
virtual

Definition at line 105 of file RoundRobinSchedulerSvc.cpp.

105  {
106  // consistency check
107  if (eventContexts.size() > m_freeSlots) {
108  fatal() << "More contexts than slots provided" << m_freeSlots << endmsg;
109  return StatusCode::FAILURE;
110  }
111  m_freeSlots -= eventContexts.size();
112 
113  m_evtCtx_buffer.insert(m_evtCtx_buffer.end(), eventContexts.begin(), eventContexts.end());
114 
116 }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
unsigned int m_freeSlots
The number of free slots (0 or 1)
std::vector< EventContext * > m_evtCtx_buffer
StatusCode RoundRobinSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
virtual

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 265 of file RoundRobinSchedulerSvc.cpp.

265  {
266  if (m_finishedEvents.try_pop(eventContext)){
267  debug() << "Try Pop successful slot " << eventContext->slot()
268  << "(event " << eventContext->evt() << ")" << endmsg;
269  m_freeSlots++;
270  return StatusCode::SUCCESS;
271  }
272  return StatusCode::FAILURE;
273 
274 }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
unsigned int m_freeSlots
The number of free slots (0 or 1)
long int evt() const
Definition: EventContext.h:37
ID_type slot() const
Definition: EventContext.h:38
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.

Member Data Documentation

std::list<IAlgorithm*> RoundRobinSchedulerSvc::m_algList
private

Cache the list of algs to be executed.

Definition at line 90 of file RoundRobinSchedulerSvc.h.

std::unordered_map<std::string,unsigned int> RoundRobinSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 84 of file RoundRobinSchedulerSvc.h.

std::vector<std::string> RoundRobinSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 81 of file RoundRobinSchedulerSvc.h.

std::vector<std::vector<std::string> > RoundRobinSchedulerSvc::m_algosDependencies
private

Ugly, will disappear when the deps are declared only within the C++ code of the algos.

Definition at line 87 of file RoundRobinSchedulerSvc.h.

SmartIF<IAlgResourcePool> RoundRobinSchedulerSvc::m_algResourcePool
private

Definition at line 75 of file RoundRobinSchedulerSvc.h.

concurrency::ExecutionFlowManager RoundRobinSchedulerSvc::m_controlFlow
private

Definition at line 78 of file RoundRobinSchedulerSvc.h.

std::vector<EventContext*> RoundRobinSchedulerSvc::m_evtCtx_buffer
private

Definition at line 97 of file RoundRobinSchedulerSvc.h.

tbb::concurrent_bounded_queue<EventContext*> RoundRobinSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 93 of file RoundRobinSchedulerSvc.h.

unsigned int RoundRobinSchedulerSvc::m_freeSlots
private

The number of free slots (0 or 1)

Definition at line 96 of file RoundRobinSchedulerSvc.h.

bool RoundRobinSchedulerSvc::m_useTopAlgList
private

Decide if the top alglist or its flat version has to be used.

Definition at line 74 of file RoundRobinSchedulerSvc.h.


The documentation for this class was generated from the following files: