9 #include "boost/thread.hpp"
12 #include <unordered_set>
29 #include "tbb/task_scheduler_init.h"
42 m_first(true), m_checkDeps(false)
45 declareProperty(
"MaxEventsInFlight", m_maxEventsInFlight = 0 );
46 declareProperty(
"ThreadPoolSize", m_threadPoolSize = -1 );
47 declareProperty(
"WhiteboardSvc", m_whiteboardSvcName =
"EventDataSvc" );
48 declareProperty(
"MaxAlgosInFlight", m_maxAlgosInFlight = 0,
"Taken from the whiteboard. Deprecated" );
50 declareProperty(
"useGraphFlowManagement", m_CFNext =
false );
51 declareProperty(
"DataFlowManagerNext", m_DFNext =
false );
52 declareProperty(
"SimulateExecution", m_simulateExecution =
false );
53 declareProperty(
"Optimizer", m_optimizationMode =
"",
54 "The following modes are currently available: PCE, COD, DRE, E" );
55 declareProperty(
"DumpIntraEventDynamics", m_dumpIntraEventDynamics =
false,
56 "Dump intra-event concurrency dynamics to csv file" );
59 declareProperty(
"AlgosDependencies", m_algosDependencies);
61 declareProperty(
"CheckDependencies", m_checkDeps =
false);
79 warning () <<
"Base class could not be initialized" <<
endmsg;
84 fatal() <<
"Error retrieving ThreadPoolSvc" << endreq;
90 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
99 info() <<
"Waiting for ForwardSchedulerSvc to activate" <<
endmsg;
107 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
114 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard."
123 warning() <<
"Property MaxEventsInFlight was set. This works but it's deprecated. "
124 <<
"Please migrate your code options files." <<
endmsg;
127 warning() <<
"In addition, the number of events in flight ("
129 << numberOfWBSlots <<
"). Setting the number of events in flight to "
130 << numberOfWBSlots <<
endmsg;
141 warning() <<
" ##### Property AlgosDependencies is deprecated and ignored."
142 <<
" FIX your job options #####" <<
endmsg;
147 const unsigned int algsNumber = algos.
size();
148 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
157 info() <<
"Data Dependencies for Algorithms:";
162 if (
nullptr == algoPtr)
163 fatal() <<
"Could not convert IAlgorithm into Algorithm: this will result in a crash." <<
endmsg;
171 info() <<
"\n o INPUT " << id;
172 algoDependencies.
insert(
id);
176 info() <<
"\n o OUTPUT " << id;
188 unsigned int index=0;
200 for (
auto o : globalInp) {
201 if (globalOutp.
find(o) == globalOutp.
end()) {
206 if (unmetDep.
size() > 0) {
207 fatal() <<
"The following unmet INPUT data dependencies were found: ";
208 for (
auto &o : unmetDep) {
209 fatal() <<
"\n o " << o <<
" required by Algorithm: ";
210 for (
size_t i =0;
i<m_algosDependencies.
size(); ++
i) {
211 if ( m_algosDependencies[
i].find( o ) != m_algosDependencies[
i].
end() ) {
219 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
226 fatal() <<
"Execution optimization is only available with the graph-based execution flow management" <<
endmsg;
233 unsigned int controlFlowNodeNumber =
239 error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
242 controlFlowNodeNumber,messageSvc));
244 [](
EventSlot& slot){slot.complete=
true;});
247 info() <<
"Concurrency level information:" <<
endmsg;
274 warning () <<
"Scheduler could not be deactivated" <<
endmsg;
276 info() <<
"Joining Scheduler thread" <<
endmsg;
297 debug() <<
"ForwardSchedulerSvc::activate()" <<
endmsg;
300 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
313 info() <<
"Start checking the actionsQueue" <<
endmsg;
318 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
383 debug() <<
"A free processing slot could not be found." <<
endmsg;
392 const unsigned int thisSlotNum = eventContext->
slot();
395 fatal() <<
"The slot " << thisSlotNum
396 <<
" is supposed to be a finished event but it's not" <<
endmsg;
400 info() <<
"Executing event " << eventContext->
evt() <<
" on slot "
402 thisSlot.
reset(eventContext);
414 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
425 for (
auto context : eventContexts){
443 unsigned int slotNum=0;
445 if (not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete){
469 debug() <<
"Popped slot " << eventContext->
slot() <<
"(event "
482 debug() <<
"Try Pop successful slot " << eventContext->
slot()
483 <<
"(event " << eventContext->
evt() <<
")" <<
endmsg;
501 fatal() <<
"*** Event " << eventContext->
evt() <<
" on slot "
502 << eventContext->
slot() <<
" failed! ***" <<
endmsg;
562 eventSlotsPtrs.
reserve(eventsSlotsSize);
564 if (!slotIt->complete)
568 eventSlotsPtrs.
end(),
574 for (
EventSlot* thisSlotPtr : eventSlotsPtrs) {
586 if (!algo_name.
empty())
616 for(
auto it = thisAlgsStates.
begin(AlgsExecutionStates::State::CONTROLREADY);
617 it != thisAlgsStates.
end(AlgsExecutionStates::State::CONTROLREADY); ++it) {
623 verbose() <<
"Could not apply transition from "
625 <<
" for algorithm " <<
index2algname(algIndex) <<
" on processing slot " << iSlot <<
endmsg;
631 auto comp_nodes = [
this] (
const uint&
i,
const uint& j) {
636 for(
auto it = thisAlgsStates.
begin(AlgsExecutionStates::State::DATAREADY);
637 it != thisAlgsStates.
end(AlgsExecutionStates::State::DATAREADY); ++it)
647 while (!buffer.
empty()) {
651 verbose() <<
"Could not apply transition from "
658 for(
auto it = thisAlgsStates.
begin(AlgsExecutionStates::State::DATAREADY);
659 it != thisAlgsStates.
end(AlgsExecutionStates::State::DATAREADY); ++it) {
664 verbose() <<
"Could not apply transition from "
666 <<
" for algorithm " <<
index2algname(algIndex) <<
" on processing slot " << iSlot <<
endmsg;
673 s << algo_name <<
", " << thisAlgsStates.
sizeOfSubset(State::CONTROLREADY)
674 <<
", " << thisAlgsStates.
sizeOfSubset(State::DATAREADY)
675 <<
", " << thisAlgsStates.
sizeOfSubset(State::SCHEDULED) <<
"\n";
677 :
std::to_string(tbb::task_scheduler_init::default_num_threads());
679 myfile.
open(
"IntraEventConcurrencyDynamics_" + threads +
"T.csv",
std::ios::app);
686 if (!thisSlot.complete &&
692 thisSlot.complete=
true;
695 if (!thisSlot.eventContext->evtFail()) {
698 debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot "
699 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
708 thisSlot.eventContext=
nullptr;
737 info() <<
"About to declare a stall" <<
endmsg;
760 <<
"============================== Execution Task State ============================="
766 <<
"============================== Scheduler State ================================="
772 if ( thisSlot.complete )
775 outputMessageStream <<
"----------- slot: " << thisSlot.eventContext->slot()
776 <<
" event: " << thisSlot.eventContext->evt()
779 if ( 0 > iSlot or iSlot == slotCount) {
780 outputMessageStream <<
"Algorithms states:" <<
std::endl;
782 const DataObjIDColl& wbSlotContent ( thisSlot.dataFlowMgr.content() );
783 for (
unsigned int algoIdx=0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
787 DataObjIDColl deps (thisSlot.dataFlowMgr.dataDependencies(algoIdx));
788 const int depsSize=deps.
size();
790 outputMessageStream <<
" none";
794 outputMessageStream << d <<
" ";
795 if ( wbSlotContent.find(d) == wbSlotContent.end() ) {
801 if (! missing.
empty()) {
802 outputMessageStream <<
". The following are missing: ";
803 for (
auto d: missing) {
804 outputMessageStream << d <<
" ";
812 outputMessageStream <<
"\nWhiteboard contents: "<<
std::endl;
813 for (
auto& product : wbSlotContent )
814 outputMessageStream <<
" o " << product <<
std::endl;
817 outputMessageStream <<
"\nControl Flow:" <<
std::endl;
821 outputMessageStream << cFlowStateStringStream.
str() <<
std::endl;
826 <<
"=================================== END ======================================"
829 info() <<
"Dumping Scheduler State " << std::endl
855 sc =
m_eventSlots[si].dataFlowMgr.canAlgorithmRun(iAlgo);
885 if (
sc.isSuccess()) {
889 fatal() <<
"Event context for algorithm " << algName <<
" is a nullptr (slot " << si<<
")" <<
endmsg;
896 tbb::task::enqueue( *t);
903 debug() <<
"Algorithm " << algName <<
" was submitted on event "
904 << eventContext->evt() <<
" in slot " << si
911 if (updateSc.isSuccess())
933 fatal() <<
"The casting did not succeed!" <<
endmsg;
943 error() <<
"[Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot()
944 <<
"] " <<
"Instance of algorithm " << algo->name()
945 <<
" could not be properly put back." <<
endmsg;
961 for (
const auto& new_product : new_products)
963 debug() <<
"Found in WB [" << si <<
"]: " << new_product <<
endmsg;
968 debug() <<
"Algorithm " << algo->name() <<
" executed in slot " << si
982 <<
" on slot " << si <<
endmsg;
985 state = State::EVTACCEPTED;
987 state = State::EVTREJECTED;
1023 error() <<
"could not find Alg " << a->
name() <<
" in Scheduler!" <<
endmsg;
virtual StatusCode initPool(const int &poolSize)=0
StatusCode deactivate()
Deactivate scheduler.
bool algsPresent(State state) const
void simulateExecutionFlow(IGraphVisitor &visitor) const
StatusCode initialize() override
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
The ISvcLocator is the interface implemented by the Service Factory in the Application Manager to loc...
unsigned int getControlFlowNodeCounter() const
Get total number of graph nodes.
virtual StatusCode initialize()
Initialise.
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
void updateDataObjectsCatalog(const DataObjIDColl &newProducts)
Update the catalog of available products in the slot.
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
StatusCode finalize() override
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
bool algoDataDependenciesSatisfied(const std::string &algo_name, const int &slotNum) const
Check all data dependencies of an algorithm are satisfied.
virtual StatusCode pushNewEvent(EventContext *eventContext)
Make an event available to the scheduler.
const DataObjIDColl & inputDataObjs() const
AlgsExecutionStates algsStates
Vector of algorithms states.
bool isSuccess() const
Test for a status code of SUCCESS.
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
EventContext * eventContext
Cache for the eventContext.
std::string m_optimizationMode
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
SmartIF< IThreadPoolSvc > m_threadPoolSvc
void touchReadyAlgorithms(IGraphVisitor &visitor) const
Promote all algorithms, ready to be executed, to DataReady state.
virtual StatusCode popFinishedEvent(EventContext *&eventContext)
Blocks until an event is availble.
unsigned int m_maxAlgosInFlight
Maximum number of simultaneous algorithms.
virtual size_t getNumberOfStores()=0
Get the number of 'slots'.
virtual tbb::task * execute()
The SchedulerSvc implements the IScheduler interface.
EventContext * getContext() const
get the context
size_t sizeOfSubset(State state) const
A visitor, performing full top-down traversals of a graph.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
This class represents an entry point to all the event specific data.
bool isFailure() const
Test for a status code of FAILURE.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo)
The call to this method is triggered only from within the AlgoExecutionTask.
virtual StatusCode tryPopFinishedEvent(EventContext *&eventContext)
Try to fetch an event from the scheduler.
StatusCode m_drain()
Drain the actions present in the queue.
void setContext(EventContext *context)
set the context
const std::string & name() const override
The identifying name of the algorithm object.
TYPE * get() const
Get interface pointer.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
~ForwardSchedulerSvc()
Destructor.
int m_threadPoolSize
Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose...
void addAlg(Algorithm *, EventContext *, pthread_t)
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
const std::string & name() const override
Retrieve name of the service.
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
const float & getRank() const
Get Algorithm rank.
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given 'slot' for all subsequent calls within the same thread id.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
unsigned int m_algosInFlight
Number of algoritms presently in flight.
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::thread m_thread
The thread in which the activate function runs.
virtual StatusCode getNewDataObjects(DataObjIDColl &products)=0
Get the latest new data objects registred in store.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
static std::list< SchedulerState > m_sState
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
std::string m_whiteboardSvcName
The whiteboard name.
virtual StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts)
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
virtual concurrency::ExecutionFlowGraph * getExecutionFlowGraph() const
#define DECLARE_SERVICE_FACTORY(x)
bool complete
Flags completion of the event.
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
The IAlgorithm is the interface implemented by the Algorithm base class.
virtual unsigned int freeSlots()
Get free slots number.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
bool m_updateNeeded
Keep track of update actions scheduled.
Base class from which all concrete algorithm classes should be derived.
void activate()
Activate scheduler.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
int m_maxEventsInFlight
Maximum number of event processed simultaneously.
Iterator begin(State kind)
StatusCode promoteToControlReady(unsigned int iAlgo, int si)
Algorithm promotion: Accepted by the control flow.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Class representing the event slot.
bool m_dumpIntraEventDynamics
static std::mutex m_ssMut
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
virtual StatusCode finalize()
Finalise.
const DataObjIDColl & outputDataObjs() const
friend class AlgoExecutionTask
ExecutionFlowGraph * getExecutionFlowGraph() const
Get the flow graph instance.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
StatusCode initialize(ExecutionFlowGraph *CFGraph, const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize the control flow manager It greps the topalg list and the index map for the algo names...
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
std::vector< std::vector< std::string > > m_algosDependencies
DEPRECATED!
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
StatusCode promoteToDataReady(unsigned int iAlgo, int si)
static std::map< State, std::string > stateNames
StatusCode updateState(unsigned int iAlgo, State newState)