22 #include <unordered_set> 32 #include "boost/thread.hpp" 33 #include "boost/tokenizer.hpp" 34 #include "boost/algorithm/string.hpp" 37 #include "tbb/task_scheduler_init.h" 58 if ( !sc.
isSuccess() ) warning() <<
"Base class could not be initialized" <<
endmsg;
61 m_threadPoolSvc = serviceLocator()->service(
"ThreadPoolSvc" );
62 if ( !m_threadPoolSvc.isValid() ) {
63 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
68 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
71 while ( m_isActive != ACTIVE ) {
73 fatal() <<
"Terminating initialization" <<
endmsg;
76 info() <<
"Waiting for ForwardSchedulerSvc to activate" <<
endmsg;
82 m_algResourcePool = serviceLocator()->service(
"AlgResourcePool" );
83 if ( !m_algResourcePool.isValid() ) {
84 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
88 m_algExecStateSvc = serviceLocator()->service(
"AlgExecStateSvc");
89 if (!m_algExecStateSvc.isValid()) {
90 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
95 m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
96 if ( !m_whiteboard.isValid() ) {
97 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
103 size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
104 if ( m_maxEventsInFlight != 0 ) {
105 warning() <<
"Property MaxEventsInFlight was set. This works but it's deprecated. " 106 <<
"Please migrate your code options files." <<
endmsg;
108 if ( m_maxEventsInFlight != (
int)numberOfWBSlots ) {
109 warning() <<
"In addition, the number of events in flight (" << m_maxEventsInFlight
110 <<
") differs from the slots in the whiteboard (" << numberOfWBSlots
111 <<
"). Setting the number of events in flight to " << numberOfWBSlots <<
endmsg;
116 if ( m_useIOBoundAlgScheduler ) {
117 m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
118 if ( !m_IOBoundAlgScheduler.isValid() )
119 fatal() <<
"Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." <<
endmsg;
122 m_maxEventsInFlight = numberOfWBSlots;
125 m_freeSlots = m_maxEventsInFlight;
127 if ( m_algosDependencies.size() != 0 ) {
128 warning() <<
" ##### Property AlgosDependencies is deprecated and ignored." 129 <<
" FIX your job options #####" <<
endmsg;
134 const unsigned int algsNumber = algos.
size();
135 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
148 fatal() <<
"Could not convert IAlgorithm into Algorithm: this will result in a crash." <<
endmsg;
151 auto r = globalOutp.
insert(
id);
153 warning() <<
"multiple algorithms declare " <<
id <<
" as output! could be a single instance in multiple paths though, or control flow may guarantee only one runs...!" <<
endmsg;
157 info() <<
"outputs:\n" ;
158 for (
const auto& i : globalOutp ) {
159 info() << i <<
'\n' ;
165 info() <<
"Data Dependencies for Algorithms:";
170 if (
nullptr == algoPtr )
171 fatal() <<
"Could not convert IAlgorithm into Algorithm: this will result in a crash." <<
endmsg;
173 info() <<
"\n " << algoPtr->
name();
179 info() <<
"\n o INPUT " << id;
180 if (
id.key().find(
":")!=std::string::npos) {
181 info() <<
" contains alternatives which require resolution... " <<
endmsg;
182 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(),boost::char_separator<char>{
":"}};
185 return globalOutp.find(
DataObjID{t} ) != globalOutp.end();
187 if (itok!=tokens.end()) {
188 info() <<
"found matching output for " << *itok <<
" -- updating scheduler info" <<
endmsg;
191 error() <<
"failed to find alternate in global output list" <<
endmsg;
194 algoDependencies.
insert(
id );
198 info() <<
"\n o OUTPUT " << id;
199 if (
id.key().find(
":")!=std::string::npos) {
200 info() <<
" alternatives are NOT allowed for outputs..." <<
endmsg;
211 m_algname_vect.reserve( algsNumber );
212 unsigned int index = 0;
215 m_algname_index_map[
name] = index;
216 m_algname_vect.emplace_back( name );
223 for (
auto o : globalInp ) {
224 if ( globalOutp.find( o ) == globalOutp.end() ) {
229 if ( unmetDep.
size() > 0 ) {
230 fatal() <<
"The following unmet INPUT data dependencies were found: ";
231 for (
auto& o : unmetDep ) {
232 fatal() <<
"\n o " << o <<
" required by Algorithm: ";
233 for (
size_t i = 0; i < m_algosDependencies.
size(); ++i ) {
234 if ( m_algosDependencies[i].find( o ) != m_algosDependencies[i].
end() ) {
235 fatal() <<
"\n * " << m_algname_vect[i];
242 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
247 if ( m_CFNext ) m_DFNext =
true;
248 if ( !m_CFNext && !m_optimizationMode.empty() ) {
249 fatal() <<
"Execution optimization is only available with the graph-based execution flow management" <<
endmsg;
254 m_efManager.initialize( algPool->
getExecutionFlowGraph(), m_algname_index_map, m_eventSlots, m_optimizationMode );
255 unsigned int controlFlowNodeNumber = m_efManager.getExecutionFlowGraph()->getControlFlowNodeCounter();
259 if ( !messageSvc.
isValid() ) error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
261 m_eventSlots.assign( m_maxEventsInFlight,
262 EventSlot( m_algosDependencies, algsNumber, controlFlowNodeNumber, messageSvc ) );
263 std::for_each( m_eventSlots.begin(), m_eventSlots.end(), [](
EventSlot& slot ) { slot.complete =
true; } );
266 info() <<
"Concurrency level information:" <<
endmsg;
267 info() <<
" o Number of events in flight: " << m_maxEventsInFlight <<
endmsg;
268 info() <<
" o Number of algorithms in flight: " << m_maxAlgosInFlight <<
endmsg;
269 info() <<
" o TBB thread pool size: " << m_threadPoolSize <<
endmsg;
272 if ( m_simulateExecution ) {
274 m_efManager.simulateExecutionFlow( vis );
293 info() <<
"Joining Scheduler thread" <<
endmsg;
298 error() <<
"problems in scheduler thread" <<
endmsg;
321 debug() <<
"ForwardSchedulerSvc::activate()" <<
endmsg;
324 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
336 info() <<
"Start checking the actionsQueue" <<
endmsg;
341 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
346 info() <<
"Terminating thread-pool resources" <<
endmsg;
348 error() <<
"Problems terminating thread pool" <<
endmsg;
407 if ( !eventContext ) {
422 const unsigned int thisSlotNum = eventContext->
slot();
425 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
429 info() <<
"Executing event " << eventContext->
evt() <<
" on slot " << thisSlotNum <<
endmsg;
430 thisSlot.
reset( eventContext );
442 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
454 for (
auto context : eventContexts ) {
471 unsigned int slotNum = 0;
473 if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
498 debug() <<
"Popped slot " << eventContext->
slot() <<
"(event " 512 debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" 532 fatal() <<
"*** Event " << eventContext->
evt() <<
" on slot " 533 << eventContext->
slot() <<
" failed! ***" <<
endmsg;
538 info() <<
"Dumping Alg Exec State for slot " << eventContext->
slot()
602 eventSlotsPtrs.
reserve( eventsSlotsSize );
604 if ( !slotIt->complete ) eventSlotsPtrs.
push_back( &( *slotIt ) );
612 for (
EventSlot* thisSlotPtr : eventSlotsPtrs ) {
624 if ( !algo_name.
empty() )
652 for (
auto it = thisAlgsStates.
begin( AlgsExecutionStates::State::CONTROLREADY );
653 it != thisAlgsStates.
end( AlgsExecutionStates::State::CONTROLREADY ); ++it ) {
659 verbose() <<
"Could not apply transition from " 661 <<
" for algorithm " <<
index2algname(algIndex) <<
" on processing slot " << iSlot <<
endmsg;
667 auto comp_nodes = [
this](
const uint& i,
const uint& j ) {
673 for (
auto it = thisAlgsStates.
begin( AlgsExecutionStates::State::DATAREADY );
674 it != thisAlgsStates.
end( AlgsExecutionStates::State::DATAREADY ); ++it )
702 while ( !buffer.
empty() ) {
703 bool IOBound =
false;
713 if (partial_sc.isFailure())
714 verbose() <<
"Could not apply transition from " 722 for (
auto it = thisAlgsStates.
begin( AlgsExecutionStates::State::DATAREADY );
723 it != thisAlgsStates.
end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
726 bool IOBound =
false;
737 verbose() <<
"Could not apply transition from " 739 <<
" for algorithm " <<
index2algname(algIndex) <<
" on processing slot " << iSlot <<
endmsg;
746 s << algo_name <<
", " << thisAlgsStates.
sizeOfSubset(State::CONTROLREADY) <<
", " 747 << thisAlgsStates.
sizeOfSubset(State::DATAREADY) <<
", " 748 << thisAlgsStates.
sizeOfSubset(State::SCHEDULED) <<
", " 752 :
std::to_string(tbb::task_scheduler_init::default_num_threads());
754 myfile.
open(
"IntraEventConcurrencyDynamics_" + threads +
"T.csv",
std::ios::app );
765 thisSlot.complete =
true;
771 debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot " 772 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
781 thisSlot.eventContext =
nullptr;
812 info() <<
"About to declare a stall" <<
endmsg;
835 outputMessageStream <<
"============================== Execution Task State =============================" 839 outputMessageStream << std::endl
840 <<
"============================== Scheduler State =================================" 846 if ( thisSlot.complete )
continue;
848 outputMessageStream <<
"----------- slot: " << thisSlot.eventContext->slot()
849 <<
" event: " << thisSlot.eventContext->evt() <<
" -----------" <<
std::endl;
851 if ( 0 > iSlot or iSlot == slotCount ) {
852 outputMessageStream <<
"Algorithms states:" <<
std::endl;
854 const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
855 for (
unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
856 outputMessageStream <<
" o " <<
index2algname( algoIdx ) <<
" [" 858 DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
859 const int depsSize = deps.
size();
860 if ( depsSize == 0 ) outputMessageStream <<
" none";
863 for (
auto d : deps ) {
864 outputMessageStream << d <<
" ";
865 if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
871 if ( !missing.
empty() ) {
872 outputMessageStream <<
". The following are missing: ";
873 for (
auto d : missing ) {
874 outputMessageStream << d <<
" ";
882 outputMessageStream <<
"\nWhiteboard contents: " <<
std::endl;
883 for (
auto& product : wbSlotContent ) outputMessageStream <<
" o " << product <<
std::endl;
886 outputMessageStream <<
"\nControl Flow:" <<
std::endl;
890 outputMessageStream << cFlowStateStringStream.
str() <<
std::endl;
894 outputMessageStream <<
"=================================== END ======================================" <<
std::endl;
896 info() <<
"Dumping Scheduler State " << std::endl << outputMessageStream.
str() <<
endmsg;
921 sc =
m_eventSlots[si].dataFlowMgr.canAlgorithmRun( iAlgo );
949 if ( sc.isSuccess() ) {
952 fatal() <<
"Event context for algorithm " << algName <<
" is a nullptr (slot " << si <<
")" <<
endmsg;
957 tbb::task* t =
new( tbb::task::allocate_root() )
960 tbb::task::enqueue( *t);
968 debug() <<
"Algorithm " << algName <<
" was submitted on event " << eventContext->
evt() <<
" in slot " << si
975 if (updateSc.isSuccess())
1000 if ( sc.isSuccess() ) {
1002 if ( !eventContext )
1003 fatal() <<
"[Asynchronous] Event context for algorithm " << algName <<
" is a nullptr (slot " << si <<
")" 1014 debug() <<
"[Asynchronous] Algorithm " << algName <<
" was submitted on event " 1015 << eventContext->
evt() <<
" in slot " << si
1020 if (updateSc.isSuccess())
1023 <<
" to SCHEDULED on slot " << si <<
endmsg;
1027 debug() <<
"[Asynchronous] Could not acquire instance for algorithm " <<
index2algname( iAlgo ) <<
" on slot " 1043 if ( !castedAlgo )
fatal() <<
"The casting did not succeed!" <<
endmsg;
1054 error() <<
"[Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1055 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1071 for (
const auto& new_product : new_products )
1077 debug() <<
"Algorithm " << algo->name() <<
" executed in slot " << si <<
". Algorithms scheduled are " 1094 state = State::EVTACCEPTED;
1096 state = State::EVTREJECTED;
1119 if ( !castedAlgo )
fatal() <<
"[Asynchronous] The casting did not succeed!" <<
endmsg;
1129 error() <<
"[Asynchronous] [Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1130 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1146 for (
const auto& new_product : new_products)
1148 debug() <<
"Found in WB [" << si <<
"]: " << new_product <<
endmsg;
1153 debug() <<
"[Asynchronous] Algorithm " << algo->name() <<
" executed in slot " << si
1167 debug() <<
"[Asynchronous] Trying to handle execution result of " 1171 state = State::EVTACCEPTED;
1173 state = State::EVTREJECTED;
1207 error() <<
"could not find Alg " << a->
name() <<
" in Scheduler!" <<
endmsg;
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
StatusCode deactivate()
Deactivate scheduler.
bool algsPresent(State state) const
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode initialize() override
Gaudi::Property< bool > m_CFNext
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si)
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
void updateEventState(AlgsExecutionStates &algo_states, std::vector< int > &node_decisions) const
Update the state of algorithms to controlready, where possible.
void updateDataObjectsCatalog(const DataObjIDColl &newProducts)
Update the catalog of available products in the slot.
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
const std::string & name() const override
The identifying name of the algorithm object.
StatusCode finalize() override
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
bool algoDataDependenciesSatisfied(const std::string &algo_name, const int &slotNum) const
Check all data dependencies of an algorithm are satisfied.
AlgsExecutionStates algsStates
Vector of algorithms states.
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
bool isSuccess() const
Test for a status code of SUCCESS.
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
const DataObjIDColl & inputDataObjs() const override
EventContext * eventContext
Cache for the eventContext.
Header file for class GaudiAlgorithm.
StatusCode finalize() override
Finalise.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
SmartIF< IThreadPoolSvc > m_threadPoolSvc
void touchReadyAlgorithms(IGraphVisitor &visitor) const
Promote all algorithms, ready to be executed, to DataReady state.
T duration_cast(T...args)
The SchedulerSvc implements the IScheduler interface.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
size_t sizeOfSubset(State state) const
A visitor, performing full top-down traversals of a graph.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
This class represents an entry point to all the event specific data.
bool isFailure() const
Test for a status code of FAILURE.
bool isIOBound() const
Check if algorithm is I/O-bound.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
StatusCode m_drain()
Drain the actions present in the queue.
Gaudi::Property< unsigned int > m_maxAlgosInFlight
Gaudi::Property< std::string > m_optimizationMode
tbb::task * execute() override
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
virtual StatusCode terminatePool()=0
Finalize the thread pool.
void addAlg(Algorithm *, EventContext *, pthread_t)
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
const float & getRank() const
Get Algorithm rank.
DataFlowManager dataFlowMgr
DataFlowManager of this slot.
virtual StatusCode selectStore(size_t partitionIndex)=0
Activate an given 'slot' for all subsequent calls within the same thread id.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
unsigned int m_algosInFlight
Number of algoritms presently in flight.
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::thread m_thread
The thread in which the activate function runs.
virtual StatusCode getNewDataObjects(DataObjIDColl &products)=0
Get the latest new data objects registred in store.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
static std::list< SchedulerState > m_sState
std::vector< EventSlot > m_eventSlots
Vector of events slots.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
This class is used for returning status codes from appropriate routines.
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
const DataObjIDColl & outputDataObjs() const override
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
unsigned int freeSlots() override
Get free slots number.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
virtual concurrency::ExecutionFlowGraph * getExecutionFlowGraph() const
#define DECLARE_SERVICE_FACTORY(x)
bool complete
Flags completion of the event.
Gaudi::Property< int > m_maxEventsInFlight
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
The IAlgorithm is the interface implemented by the Algorithm base class.
GAUDI_API void setCurrentContext(const EventContext *ctx)
Gaudi::Property< bool > m_DFNext
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
bool m_updateNeeded
Keep track of update actions scheduled.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
Base class from which all concrete algorithm classes should be derived.
Gaudi::Property< bool > m_useIOBoundAlgScheduler
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
void activate()
Activate scheduler.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Iterator begin(State kind)
StatusCode promoteToControlReady(unsigned int iAlgo, int si)
Algorithm promotion: Accepted by the control flow.
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
Class representing the event slot.
Gaudi::Property< bool > m_dumpIntraEventDynamics
static std::mutex m_ssMut
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
friend class IOBoundAlgTask
friend class AlgoExecutionTask
virtual StatusCode push(IAlgTask &task)=0
ExecutionFlowGraph * getExecutionFlowGraph() const
Get the flow graph instance.
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
Gaudi::Property< int > m_threadPoolSize
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
void dumpState() override
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
const std::chrono::system_clock::time_point getInitTime() const
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
StatusCode promoteToDataReady(unsigned int iAlgo, int si)
static std::map< State, std::string > stateNames
StatusCode updateState(unsigned int iAlgo, State newState)