ParallelSequentialSchedulerSvc Class Reference

This SchedulerSvc implements the IScheduler interface. More...

#include <src/ParallelSequentialSchedulerSvc.h>

Inheritance diagram for ParallelSequentialSchedulerSvc:
Collaboration diagram for ParallelSequentialSchedulerSvc:

Public Member Functions

 ParallelSequentialSchedulerSvc (const std::string &name, ISvcLocator *svc)
 Constructor. More...
 
 ~ParallelSequentialSchedulerSvc ()
 Destructor. More...
 
virtual StatusCode initialize ()
 Initialise. More...
 
virtual StatusCode finalize ()
 Finalise. More...
 
virtual StatusCode pushNewEvent (EventContext *eventContext)
 Make an event available to the scheduler. More...
 
virtual StatusCode pushNewEvents (std::vector< EventContext * > &eventContexts)
 
virtual StatusCode popFinishedEvent (EventContext *&eventContext)
 Blocks until an event is availble. More...
 
virtual StatusCode tryPopFinishedEvent (EventContext *&eventContext)
 Try to fetch an event from the scheduler. More...
 
virtual unsigned int freeSlots ()
 Get free slots number. More...
 
- Public Member Functions inherited from extends< BASE, Interfaces >
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::string > getInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
 ~extends () override=default
 Virtual destructor. More...
 
void * i_cast (const InterfaceID &tid) const override
 Implementation of IInterface::i_cast. More...
 
StatusCode queryInterface (const InterfaceID &ti, void **pp) override
 Implementation of IInterface::queryInterface. More...
 
std::vector< std::string > getInterfaceNames () const override
 Implementation of IInterface::getInterfaceNames. More...
 
 ~extends () override=default
 Virtual destructor. More...
 
- Public Member Functions inherited from extend_interfaces< Interfaces...>
 ~extend_interfaces () override=default
 Virtual destructor. More...
 
 ~extend_interfaces () override=default
 Virtual destructor. More...
 

Private Attributes

bool m_useTopAlgList
 Decide if the top alglist or its flat version has to be used. More...
 
std::list< IAlgorithm * > m_algList
 Cache the list of algs to be executed. More...
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
 Queue of finished events. More...
 
std::atomic_int m_freeSlots
 Atomic to account for asyncronous updates by the scheduler wrt the rest. More...
 
SmartIF< IHiveWhiteBoardm_whiteboard
 A shortcut to the whiteboard. More...
 
std::string m_whiteboardSvcName
 The whiteboard name. More...
 
SmartIF< IAlgResourcePoolm_algResourcePool
 Cache for the algorithm resource pool. More...
 
int m_threadPoolSize
 Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose. More...
 
std::unique_ptr< tbb::task_scheduler_init > m_tbb_sched
 
concurrency::ExecutionFlowManager m_controlFlow
 
std::vector< std::string > m_algname_vect
 Vector to bookkeep the information necessary to the index2name conversion. More...
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
 Map to bookkeep the information necessary to the name2index conversion. More...
 
std::vector< std::vector< std::string > > m_algosDependencies
 Ugly, will disappear when the deps are declared only within the C++ code of the algos. More...
 

Friends

class SequentialTask
 

Additional Inherited Members

- Public Types inherited from extends< BASE, Interfaces >
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces...>
 Typedef to the base of this class. More...
 
using base_class = extends
 Typedef to this class. More...
 
using extend_interfaces_base = extend_interfaces< Interfaces...>
 Typedef to the base of this class. More...
 
- Public Types inherited from extend_interfaces< Interfaces...>
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids...>::type
 take union of the ext_iids of all Interfaces... More...
 
using ext_iids = typename Gaudi::interface_list_cat< typename Interfaces::ext_iids...>::type
 take union of the ext_iids of all Interfaces... More...
 

Detailed Description

This SchedulerSvc implements the IScheduler interface.

It executes all the algorithms in sequence for several events in flight. It pulls the algorithms from the AlgResourcePool

Author
Daniel Funke
Version
0.1

Definition at line 41 of file ParallelSequentialSchedulerSvc.h.

Constructor & Destructor Documentation

ParallelSequentialSchedulerSvc::ParallelSequentialSchedulerSvc ( const std::string &  name,
ISvcLocator svc 
)

Constructor.

Definition at line 27 of file ParallelSequentialSchedulerSvc.cpp.

27  :
28  base_class(name,svcLoc) {
29 
30  // Will disappear when dependencies are properly propagated into the C++ code of the algos
31  declareProperty("AlgosDependencies", m_algosDependencies);
32  declareProperty("UseTopAlgList", m_useTopAlgList = false);
33  declareProperty("ThreadPoolSize", m_threadPoolSize = -1);
34  declareProperty("WhiteboardSvc", m_whiteboardSvcName = "EventDataSvc");
35 
36 }
extends base_class
Typedef to this class.
Definition: extends.h:14
bool m_useTopAlgList
Decide if the top alglist or its flat version has to be used.
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
std::string m_whiteboardSvcName
The whiteboard name.
std::vector< std::vector< std::string > > m_algosDependencies
Ugly, will disappear when the deps are declared only within the C++ code of the algos.
ParallelSequentialSchedulerSvc::~ParallelSequentialSchedulerSvc ( )

Destructor.

Definition at line 39 of file ParallelSequentialSchedulerSvc.cpp.

39 {}

Member Function Documentation

StatusCode ParallelSequentialSchedulerSvc::finalize ( )
virtual

Finalise.

Definition at line 147 of file ParallelSequentialSchedulerSvc.cpp.

147  {
148  m_tbb_sched.reset();
149 
151  if (!sc.isSuccess())
152  warning () << "Base class could not be finalized" << endmsg;
153  return sc;
154 }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode finalize() override
Definition: Service.cpp:188
std::unique_ptr< tbb::task_scheduler_init > m_tbb_sched
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
unsigned int ParallelSequentialSchedulerSvc::freeSlots ( )
virtual

Get free slots number.

Given that the scheduler is sequential and its methods non reentrant, this is always 1.

Definition at line 219 of file ParallelSequentialSchedulerSvc.cpp.

219 {return m_freeSlots;}
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
StatusCode ParallelSequentialSchedulerSvc::initialize ( )
virtual

Initialise.

Definition at line 42 of file ParallelSequentialSchedulerSvc.cpp.

42  {
43 
44  // Initialise mother class (read properties, ...)
46  if (!sc.isSuccess())
47  warning () << "Base class could not be initialized" << endmsg;
48 
49  // Get the algo resource pool
50  m_algResourcePool = serviceLocator()->service("AlgResourcePool");
51  if (!m_algResourcePool.isValid()){
52  error() << "Error retrieving AlgResourcePool" << endmsg;
53  return StatusCode::FAILURE;
54  }
55 
56  // Get the list of algorithms
58  info() << "Found " << m_algList.size() << " algorithms" << endmsg;
59 
60  // Get Whiteboard
61  m_whiteboard = serviceLocator()->service(m_whiteboardSvcName);
62  if (!m_whiteboard.isValid())
63  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
64 
65  // Check the MaxEventsInFlight parameters and react
66  // Deprecated for the moment
67  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
68 
69  // Set the number of free slots
70  m_freeSlots=numberOfWBSlots;
71 
72  info() << "Allowing " << m_freeSlots << " events in flight" << endmsg;
73 
74  if(m_threadPoolSize == -1)
75  m_threadPoolSize = numberOfWBSlots;
76 
77  debug() << "Initialising a TBB thread pool of size " << m_threadPoolSize << endmsg;
78  m_tbb_sched.reset(new tbb::task_scheduler_init(m_threadPoolSize));
79 
80  // Fill the containers to convert algo names to index
81  m_algname_index_map.reserve(m_algList.size());
82  m_algname_vect.reserve(m_algList.size());
83  unsigned int index=0;
84  for (IAlgorithm* algo : m_algList){
85  const std::string& name = algo->name();
87  m_algname_vect.emplace_back(name);
88  index++;
89  }
90 
91  //initialize control flow manager
92  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_algResourcePool.get());
93 
95 
96  const unsigned int algosDependenciesSize=m_algosDependencies.size();
97  info() << "Algodependecies size is " << algosDependenciesSize << endmsg;
98 
99  //get algorithm dependencies
100  /* Dependencies
101  0) Read deps from config file
102  1) Look for handles in algo, if none
103  2) Assume none are required
104  */
105  if (algosDependenciesSize == 0){
106  // Get the event root from the IDataManagerSvc interface of the WhiteBoard
108  std::string rootInTESName(dataMgrSvc->rootName());
109  if ("" != rootInTESName && '/'!=rootInTESName[rootInTESName.size()-1]){
110  rootInTESName = rootInTESName+"/";
111  }
112 
113  for (IAlgorithm* ialgoPtr : m_algList){
114  Algorithm* algoPtr = dynamic_cast<Algorithm*> (ialgoPtr);
115  if (nullptr == algoPtr){
116  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
117  }
118 
119 #pragma GCC diagnostic push
120 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
121  const std::vector<MinimalDataObjectHandle*>& algoHandles(algoPtr->handles());
122 #pragma GCC diagnostic pop
123  std::vector<std::string> algoDependencies;
124  if (!algoHandles.empty()){
125 
126  info() << "Algorithm " << algoPtr->name() << " data dependencies:" << endmsg;
127  for (MinimalDataObjectHandle* handlePtr : algoHandles ){
128  if (handlePtr->accessType() == MinimalDataObjectHandle::AccessType::READ){
129  const std::string& productName = rootInTESName + handlePtr->dataProductName();
130  info() << " o READ Handle found for product " << productName << endmsg;
131  algoDependencies.emplace_back(productName);
132  }
133  }
134  } else {
135  info() << "Algorithm " << algoPtr->name() << " has no data dependencies." << endmsg;
136  }
137 
138  m_algosDependencies.emplace_back(algoDependencies);
139  }
140  }
141 
142  return StatusCode::SUCCESS;
143 
144 }
std::list< IAlgorithm * > m_algList
Cache the list of algs to be executed.
StatusCode initialize() override
Definition: Service.cpp:63
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
std::unique_ptr< tbb::task_scheduler_init > m_tbb_sched
virtual size_t getNumberOfStores()=0
Get the number of 'slots'.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:919
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:76
bool m_useTopAlgList
Decide if the top alglist or its flat version has to be used.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
virtual std::list< IAlgorithm * > getTopAlgList()=0
Get top list of algorithms.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::string m_whiteboardSvcName
The whiteboard name.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
virtual concurrency::ExecutionFlowGraph * getExecutionFlowGraph() const
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:23
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:77
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
StatusCode initialize(ExecutionFlowGraph *CFGraph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
concurrency::ExecutionFlowManager m_controlFlow
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
std::vector< std::vector< std::string > > m_algosDependencies
Ugly, will disappear when the deps are declared only within the C++ code of the algos.
StatusCode ParallelSequentialSchedulerSvc::popFinishedEvent ( EventContext *&  eventContext)
virtual

Blocks until an event is availble.

Get a finished event or block until one becomes available.

Definition at line 191 of file ParallelSequentialSchedulerSvc.cpp.

191  {
192 
193  m_finishedEvents.pop(eventContext);
194  debug() << "Popped slot " << eventContext->slot() << "(event "
195  << eventContext->evt() << ")" << endmsg;
196  m_freeSlots++;
197  return StatusCode::SUCCESS;
198 }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
long int evt() const
Definition: EventContext.h:37
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
ID_type slot() const
Definition: EventContext.h:38
StatusCode ParallelSequentialSchedulerSvc::pushNewEvent ( EventContext eventContext)
virtual

Make an event available to the scheduler.

Immediately the algortihms are executed.

Definition at line 159 of file ParallelSequentialSchedulerSvc.cpp.

159  {
160  std::vector<EventContext*> eventContexts;
161  eventContexts.push_back(eventContext);
162  eventContext->setFail(false);
163  return pushNewEvents(eventContexts);
164 }
virtual StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts)
void setFail(const bool &b=true)
Definition: EventContext.h:54
StatusCode ParallelSequentialSchedulerSvc::pushNewEvents ( std::vector< EventContext * > &  eventContexts)
virtual

Definition at line 166 of file ParallelSequentialSchedulerSvc.cpp.

166  {
167 
168  for(auto evt : eventContexts){
169  if(m_freeSlots.load() > 0){
170  //only one thread executes scheduler --> m_freeSlots can only grow if other thread finishes
171  m_freeSlots--;
172 
173  debug() << "Enqueuing event " << evt->evt() << " @ " << evt->slot() << endmsg;
174 
175  tbb::task* t = new( tbb::task::allocate_root() )
176  SequentialTask(serviceLocator(), evt, this, m_algResourcePool);
177  tbb::task::enqueue( *t);
178  } else {
179  return StatusCode::FAILURE;
180  }
181  }
182 
183  return StatusCode::SUCCESS;
184 
185 }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
StatusCode ParallelSequentialSchedulerSvc::tryPopFinishedEvent ( EventContext *&  eventContext)
virtual

Try to fetch an event from the scheduler.

Try to get a finished event, if not available just return a failure.

Definition at line 204 of file ParallelSequentialSchedulerSvc.cpp.

204  {
205  if (m_finishedEvents.try_pop(eventContext)){
206  debug() << "Try Pop successful slot " << eventContext->slot()
207  << "(event " << eventContext->evt() << ")" << endmsg;
208  m_freeSlots++;
209  return StatusCode::SUCCESS;
210  }
211  return StatusCode::FAILURE;
212 }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
long int evt() const
Definition: EventContext.h:37
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
ID_type slot() const
Definition: EventContext.h:38

Friends And Related Function Documentation

friend class SequentialTask
friend

Definition at line 114 of file ParallelSequentialSchedulerSvc.h.

Member Data Documentation

std::list<IAlgorithm*> ParallelSequentialSchedulerSvc::m_algList
private

Cache the list of algs to be executed.

Definition at line 77 of file ParallelSequentialSchedulerSvc.h.

std::unordered_map<std::string,unsigned int> ParallelSequentialSchedulerSvc::m_algname_index_map
private

Map to bookkeep the information necessary to the name2index conversion.

Definition at line 108 of file ParallelSequentialSchedulerSvc.h.

std::vector<std::string> ParallelSequentialSchedulerSvc::m_algname_vect
private

Vector to bookkeep the information necessary to the index2name conversion.

Definition at line 105 of file ParallelSequentialSchedulerSvc.h.

std::vector<std::vector<std::string> > ParallelSequentialSchedulerSvc::m_algosDependencies
private

Ugly, will disappear when the deps are declared only within the C++ code of the algos.

Definition at line 111 of file ParallelSequentialSchedulerSvc.h.

SmartIF<IAlgResourcePool> ParallelSequentialSchedulerSvc::m_algResourcePool
private

Cache for the algorithm resource pool.

Definition at line 93 of file ParallelSequentialSchedulerSvc.h.

concurrency::ExecutionFlowManager ParallelSequentialSchedulerSvc::m_controlFlow
private

Definition at line 102 of file ParallelSequentialSchedulerSvc.h.

tbb::concurrent_bounded_queue<EventContext*> ParallelSequentialSchedulerSvc::m_finishedEvents
private

Queue of finished events.

Definition at line 80 of file ParallelSequentialSchedulerSvc.h.

std::atomic_int ParallelSequentialSchedulerSvc::m_freeSlots
private

Atomic to account for asyncronous updates by the scheduler wrt the rest.

Definition at line 84 of file ParallelSequentialSchedulerSvc.h.

std::unique_ptr<tbb::task_scheduler_init> ParallelSequentialSchedulerSvc::m_tbb_sched
private

Definition at line 99 of file ParallelSequentialSchedulerSvc.h.

int ParallelSequentialSchedulerSvc::m_threadPoolSize
private

Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose.

Definition at line 96 of file ParallelSequentialSchedulerSvc.h.

bool ParallelSequentialSchedulerSvc::m_useTopAlgList
private

Decide if the top alglist or its flat version has to be used.

Definition at line 74 of file ParallelSequentialSchedulerSvc.h.

SmartIF<IHiveWhiteBoard> ParallelSequentialSchedulerSvc::m_whiteboard
private

A shortcut to the whiteboard.

Definition at line 87 of file ParallelSequentialSchedulerSvc.h.

std::string ParallelSequentialSchedulerSvc::m_whiteboardSvcName
private

The whiteboard name.

Definition at line 90 of file ParallelSequentialSchedulerSvc.h.


The documentation for this class was generated from the following files: