2 #include "GaudiKernel/SvcFactory.h"
3 #include "GaudiKernel/IAlgorithm.h"
4 #include "GaudiKernel/Algorithm.h"
5 #include <GaudiAlg/GaudiAlgorithm.h>
6 #include <GaudiKernel/IDataManagerSvc.h>
10 #include <unordered_set>
24 #include "tbb/task_scheduler_init.h"
38 declareProperty(
"MaxEventsInFlight", m_maxEventsInFlight = 0 );
39 declareProperty(
"ThreadPoolSize", m_threadPoolSize = -1 );
40 declareProperty(
"WhiteboardSvc", m_whiteboardSvcName =
"EventDataSvc" );
42 declareProperty(
"AlgosDependencies", m_algosDependencies);
43 declareProperty(
"MaxAlgosInFlight", m_maxAlgosInFlight = 0,
"Taken from the whiteboard. Deprecated" );
45 declareProperty(
"useGraphFlowManagement", m_CFNext =
false );
46 declareProperty(
"DataFlowManagerNext", m_DFNext =
false );
47 declareProperty(
"SimulateExecution", m_simulateExecution =
false );
48 declareProperty(
"Optimizer", m_optimizationMode =
"",
"The following modes are currently available: PCE, COD, DRE, E" );
49 declareProperty(
"DumpIntraEventDynamics", m_dumpIntraEventDynamics =
false,
"Dump intra-event concurrency dynamics to csv file" );
66 warning () <<
"Base class could not be initialized" <<
endmsg;
71 error() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
76 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
82 warning() <<
"Property MaxEventsInFlight was set. This works but it's deprecated. "
83 <<
"Please migrate your code options files." <<
endmsg;
86 warning() <<
"In addition, the number of events in flight ("
88 << numberOfWBSlots <<
"). Setting the number of events in flight to "
89 << numberOfWBSlots <<
endmsg;
101 const unsigned int algsNumber = algos.size();
102 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
105 info() <<
"Algodependecies size is " << algosDependenciesSize <<
endmsg;
112 if (algosDependenciesSize == 0) {
115 if (
nullptr == algoPtr)
116 fatal() <<
"Could not convert IAlgorithm into Algorithm: this will result in a crash." <<
endmsg;
118 #pragma GCC diagnostic push
119 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
120 const std::vector<MinimalDataObjectHandle*>& algoHandles(algoPtr->handles());
121 #pragma GCC diagnostic pop
122 std::vector<std::string> algoDependencies;
123 if (!algoHandles.empty()) {
124 info() <<
"Algorithm " << algoPtr->
name() <<
" data dependencies:" <<
endmsg;
126 if (handlePtr->isValid()) {
128 const std::string& productName = handlePtr->dataProductName();
129 info() <<
" o READ Handle found for product " << productName <<
endmsg;
130 algoDependencies.emplace_back(productName);
133 if (handlePtr->alternativeDataProductNames().size() != 0) {
134 info() <<
"\t\t alternative locations";
135 for (
auto s : handlePtr->alternativeDataProductNames())
141 info() <<
" o WRITE Handle found for product " << handlePtr->dataProductName() <<
endmsg;
146 info() <<
"Algorithm " << algoPtr->
name() <<
" has no data dependencies." <<
endmsg;
151 if (algsNumber != algosDependenciesSize){
152 error() <<
"number of Algorithms is different from size of Data Dependency list!" <<
endmsg;
159 unsigned int index=0;
161 const std::string&
name = algo->name();
170 fatal() <<
"Execution optimization is only available with the graph-based execution flow management" <<
endmsg;
179 error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
182 std::for_each(m_eventSlots.begin(),m_eventSlots.end(),[](
EventSlot& slot){slot.complete=
true;});
185 info() <<
"Concurrency level information:" <<
endmsg;
197 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
213 warning () <<
"Base class could not be finalized" <<
endmsg;
217 warning () <<
"Scheduler could not be deactivated" <<
endmsg;
219 info() <<
"Joining Scheduler thread" <<
endmsg;
244 tbb::task_scheduler_init* TBBSchedInit =
nullptr;
252 if (thePoolSize == -1)
253 debug() <<
"...default TBB thread pool size amounts to " << tbb::task_scheduler_init::default_num_threads()<<
endmsg;
254 if (thePoolSize != -1)
256 TBBSchedInit =
new tbb::task_scheduler_init (thePoolSize);
258 debug() <<
"Thread pool size is one. Pool not initialised." <<
endmsg;
265 info() <<
"Start checking the actionsQueue" <<
endmsg;
270 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
272 verbose() <<
"Action succeeded." <<
endmsg;
327 fatal() <<
"Event context is nullptr" <<
endmsg;
333 debug() <<
"A free processing slot could not be found." <<
endmsg;
342 const unsigned int thisSlotNum = eventContext->
slot();
345 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
346 info() <<
"A free processing slot was found." <<
endmsg;
347 thisSlot.
reset(eventContext);
359 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
370 for (
auto context : eventContexts){
388 unsigned int slotNum=0;
390 if (not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete){
409 debug() <<
"Popped slot " << eventContext->
slot() <<
"(event "
422 debug() <<
"Try Pop successful slot " << eventContext->
slot()
423 <<
"(event " << eventContext->
evt() <<
")" <<
endmsg;
441 fatal() <<
"*** Event " << eventContext->
evt() <<
" on slot "
442 << eventContext->
slot() <<
" failed! ***" <<
endmsg;
498 std::vector<EventSlot*> eventSlotsPtrs;
503 eventSlotsPtrs.reserve(eventsSlotsSize);
505 if (!slotIt->complete)
506 eventSlotsPtrs.push_back(&(*slotIt));
508 std::sort(eventSlotsPtrs.begin(),
509 eventSlotsPtrs.end(),
515 for (
EventSlot* thisSlotPtr : eventSlotsPtrs) {
516 int iSlot = thisSlotPtr->eventContext->slot();
527 if (!algo_name.empty())
557 for(
auto it = thisAlgsStates.
begin(AlgsExecutionStates::State::CONTROLREADY);
558 it != thisAlgsStates.
end(AlgsExecutionStates::State::CONTROLREADY); ++it) {
564 debug() <<
"Could not apply transition from "
566 <<
" for algorithm " <<
index2algname(algIndex) <<
" on processing slot " << iSlot <<
endmsg;
572 auto comp_nodes = [
this] (
const uint&
i,
const uint& j) {
576 std::priority_queue<uint,std::vector<uint>,std::function<bool(const uint&,const uint&)>> buffer(comp_nodes,std::vector<uint>());
577 for(
auto it = thisAlgsStates.
begin(AlgsExecutionStates::State::DATAREADY);
578 it != thisAlgsStates.
end(AlgsExecutionStates::State::DATAREADY); ++it)
588 while (!buffer.empty()) {
592 debug() <<
"Could not apply transition from "
594 <<
" for algorithm " <<
index2algname(buffer.top()) <<
" on processing slot " << iSlot <<
endmsg;
599 for(
auto it = thisAlgsStates.
begin(AlgsExecutionStates::State::DATAREADY);
600 it != thisAlgsStates.
end(AlgsExecutionStates::State::DATAREADY); ++it) {
605 debug() <<
"Could not apply transition from "
607 <<
" for algorithm " <<
index2algname(algIndex) <<
" on processing slot " << iSlot <<
endmsg;
614 s << algo_name <<
", " << thisAlgsStates.
sizeOfSubset(State::CONTROLREADY)
615 <<
", " << thisAlgsStates.
sizeOfSubset(State::DATAREADY)
616 <<
", " << thisAlgsStates.
sizeOfSubset(State::SCHEDULED) <<
"\n";
618 :
std::to_string(tbb::task_scheduler_init::default_num_threads());
619 std::ofstream myfile;
620 myfile.open(
"IntraEventConcurrencyDynamics_" + threads +
"T.csv",
std::ios::app);
627 if (!thisSlot.complete &&
633 thisSlot.complete=
true;
636 if (!thisSlot.eventContext->evtFail()) {
639 debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot "
640 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
644 std::stringstream ss;
646 debug() << ss.str() <<
endmsg;
649 thisSlot.eventContext=
nullptr;
657 verbose() <<
"States Updated." <<
endmsg;
678 info() <<
"About to declare a stall" <<
endmsg;
679 fatal() <<
"*** Stall detected! ***\n" <<
endmsg;
698 std::stringstream outputMessageStream;
703 outputMessageStream.str(std::string());
704 if ( thisSlot.complete )
707 outputMessageStream <<
"Dump of Scheduler State for slot " << thisSlot.eventContext->evt() << std::endl;
709 if ( 0 > iSlot or iSlot == slotCount) {
710 outputMessageStream <<
"Algorithms states for event " << thisSlot.eventContext->evt() << std::endl;
712 const std::vector<std::string>& wbSlotContent ( thisSlot.dataFlowMgr.content() );
713 for (
unsigned int algoIdx=0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
716 <<
". Its data dependencies are ";
717 std::vector<std::string> deps (thisSlot.dataFlowMgr.dataDependencies(algoIdx));
718 const int depsSize=deps.size();
720 outputMessageStream <<
" none.";
722 for (
int i=0;
i<depsSize;++
i)
723 outputMessageStream << deps[
i] << (
i==(depsSize-1) ?
"" :
", ");
727 std::vector<std::string>::iterator missinngDepsEndIt =
728 std::remove_if(deps.begin(),
730 [&wbSlotContent] (std::string dep) {
731 return std::count(wbSlotContent.begin(),wbSlotContent.end(),dep)!=0;
734 if (deps.begin() != missinngDepsEndIt) {
735 outputMessageStream <<
". The following are missing: ";
736 for (std::vector<std::string>::iterator missingDep=deps.begin();missingDep!=missinngDepsEndIt;++missingDep)
737 outputMessageStream << *missingDep << (missingDep==(missinngDepsEndIt-1)?
"":
", ");
740 outputMessageStream << std::endl;
743 fatal() << outputMessageStream.str() <<
endmsg;
744 outputMessageStream.str(std::string());
747 outputMessageStream <<
"The content of the whiteboard for this event was:\n";
748 for (
auto& product : wbSlotContent )
749 outputMessageStream <<
" o " << product << std::endl;
751 fatal() << outputMessageStream.str()<<
endmsg;
752 outputMessageStream.str(std::string());
755 outputMessageStream <<
"The status of the control flow for this event was:\n";
756 std::stringstream cFlowStateStringStream;
759 outputMessageStream << cFlowStateStringStream.str();
761 fatal() << outputMessageStream.str() <<
endmsg;
787 sc =
m_eventSlots[si].dataFlowMgr.canAlgorithmRun(iAlgo);
816 if (
sc.isSuccess()) {
820 fatal() <<
"Event context for algorithm " << algName <<
" is a nullptr (slot " << si<<
")" <<
endmsg;
826 tbb::task* t =
new( tbb::task::allocate_root() )
AlgoExecutionTask(ialgoPtr, iAlgo, serviceLocator(),
this);
827 tbb::task::enqueue( *t);
834 debug() <<
"Algorithm " << algName <<
" was submitted on event " << eventContext->evt()
839 if (updateSc.isSuccess())
845 debug() <<
"Could not acquire instance for algorithm " <<
index2algname(iAlgo) <<
" on slot " << si <<
endmsg;
860 fatal() <<
"The casting did not succeed!" <<
endmsg;
870 error() <<
"[Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] "
871 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
885 std::vector<std::string> new_products;
887 for (
const auto& new_product : new_products)
889 debug() <<
"Found in WB: " << new_product <<
endmsg;
894 debug() <<
"Algorithm " << algo->name() <<
" executed. Algorithms scheduled are " <<
m_algosInFlight <<
endmsg;
906 debug() <<
"Trying to handle execution result of " <<
index2algname(iAlgo) <<
"." <<
endmsg;
909 state = State::EVTACCEPTED;
911 state = State::EVTREJECTED;
918 debug() <<
"Promoting " <<
index2algname(iAlgo) <<
" on slot " << si <<
" to "
StatusCode deactivate()
Deactivate scheduler.
bool algsPresent(State state) const
void simulateExecutionFlow(IGraphVisitor &visitor) const
virtual StatusCode getNewDataObjects(std::vector< std::string > &products)=0
Get the latest new data objects registred in store.
StatusCode initialize() override
string to_string(const T &value)
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
unsigned int getControlFlowNodeCounter() const
Get total number of graph nodes.
virtual StatusCode initialize()
Initialise.
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
StatusCode finalize() override
bool algoDataDependenciesSatisfied(const std::string &algo_name, const int &slotNum) const
Check all data dependencies of an algorithm are satisfied.
virtual StatusCode pushNewEvent(EventContext *eventContext)
Make an event available to the scheduler.
bool m_isActive
Flag to track if the scheduler is active or not.
AlgsExecutionStates algsStates
Vector of algorithms states.
bool isSuccess() const
Test for a status code of SUCCESS.
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
EventContext * eventContext
Cache for the eventContext.
std::string m_optimizationMode
void touchReadyAlgorithms(IGraphVisitor &visitor) const
Promote all algorithms, ready to be executed, to DataReady state.
virtual StatusCode popFinishedEvent(EventContext *&eventContext)
Blocks until an event is availble.
unsigned int m_maxAlgosInFlight
Maximum number of simultaneous algorithms.
virtual size_t getNumberOfStores()=0
Get the number of 'slots'.
virtual tbb::task * execute()
The SchedulerSvc implements the IScheduler interface.
size_t sizeOfSubset(State state) const
A visitor, performing full top-down traversals of a graph.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
This class represents an entry point to all the event specific data.
bool isFailure() const
Test for a status code of FAILURE.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo)
The call to this method is triggered only from within the AlgoExecutionTask.
virtual StatusCode tryPopFinishedEvent(EventContext *&eventContext)
Try to fetch an event from the scheduler.
StatusCode m_drain()
Drain the actions present in the queue.
void setContext(EventContext *context)
set the context
const std::string & name() const override
The identifying name of the algorithm object.
TYPE * get() const
Get interface pointer.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
void updateDataObjectsCatalog(const std::vector< std::string > &newProducts)
Update the catalog of available products in the slot.
~ForwardSchedulerSvc()
Destructor.
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
const float & getRank() const
Get Algorithm rank.
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given 'slot' for all subsequent calls within the same thread id.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
unsigned int m_algosInFlight
Number of algoritms presently in flight.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::thread m_thread
The thread in which the activate function runs.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
std::string m_whiteboardSvcName
The whiteboard name.
virtual StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts)
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
#define DECLARE_SERVICE_FACTORY(x)
virtual concurrency::ExecutionFlowGraph * getExecutionFlowGraph() const
bool complete
Flags completion of the event.
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
The IAlgorithm is the interface implemented by the Algorithm base class.
virtual unsigned int freeSlots()
Get free slots number.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
bool m_updateNeeded
Keep track of update actions scheduled.
std::function< StatusCode()> action
Base class from which all concrete algorithm classes should be derived.
void activate()
Activate scheduler.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
bool isValid() const
Allow for check if smart pointer is valid.
Base class used to extend a class implementing other interfaces.
int m_maxEventsInFlight
Maximum number of event processed simultaneously.
Iterator begin(State kind)
StatusCode promoteToControlReady(unsigned int iAlgo, int si)
Algorithm promotion: Accepted by the control flow.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Class representing the event slot.
bool m_dumpIntraEventDynamics
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
virtual StatusCode finalize()
Finalise.
friend class AlgoExecutionTask
ExecutionFlowGraph * getExecutionFlowGraph() const
Get the flow graph instance.
StatusCode initialize(ExecutionFlowGraph *CFGraph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
std::vector< std::vector< std::string > > m_algosDependencies
Ugly, will disappear when the deps are declared only within the C++ code of the algos.
StatusCode promoteToDataReady(unsigned int iAlgo, int si)
static std::map< State, std::string > stateNames
EventContext * getContext()
get the context
StatusCode updateState(unsigned int iAlgo, State newState)