19 #include <unordered_set> 22 #include "boost/algorithm/string.hpp" 23 #include "boost/thread.hpp" 24 #include "boost/tokenizer.hpp" 26 #include "tbb/task_scheduler_init.h" 31 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) ) 32 #define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) ) 35 struct DataObjIDSorter {
72 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
77 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
94 warning() <<
"No CondSvc found, or not enabled. " 95 <<
"Will not manage CondAlgorithms" <<
endmsg;
103 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
109 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
116 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
124 fatal() <<
"Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." <<
endmsg;
135 const unsigned int algsNumber = algos.
size();
136 if ( algsNumber != 0 ) {
137 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
154 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." <<
endmsg;
158 auto r = globalOutp.
insert(
id );
160 warning() <<
"multiple algorithms declare " <<
id 161 <<
" as output! could be a single instance in multiple paths " 162 "though, or control flow may guarantee only one runs...!" 169 ostdd <<
"Data Dependencies for Algorithms:";
174 if (
nullptr == algoPtr ) {
175 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
176 <<
": this will result in a crash." <<
endmsg;
180 ostdd <<
"\n " << algoPtr->
name();
186 ostdd <<
"\n o INPUT " << id;
187 if (
id.key().find(
":" ) != std::string::npos ) {
188 ostdd <<
" contains alternatives which require resolution...\n";
189 auto tokens = boost::tokenizer<boost::char_separator<char>>{
id.key(), boost::char_separator<char>{
":"}};
193 if ( itok != tokens.end() ) {
194 ostdd <<
"found matching output for " << *itok <<
" -- updating scheduler info\n";
195 id.updateKey( *itok );
197 error() <<
"failed to find alternate in global output list" 198 <<
" for id: " <<
id <<
" in Alg " << algoPtr->
name() <<
endmsg;
202 algoDependencies.
insert(
id );
206 ostdd <<
"\n o OUTPUT " << *id;
207 if ( id->key().find(
":" ) != std::string::npos ) {
208 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id 216 algosDependenciesMap[algoPtr->
name()] = algoDependencies;
225 for (
auto o : globalInp )
226 if ( globalOutp.
find( o ) == globalOutp.
end() ) unmetDep.
insert( o );
228 if ( unmetDep.
size() > 0 ) {
231 for (
const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
232 ost <<
"\n o " << *o <<
" required by Algorithm: ";
234 for (
const auto& p : algosDependenciesMap )
235 if ( p.second.find( *o ) != p.second.end() ) ost <<
"\n * " << p.first;
244 dataLoaderAlg = algo;
248 if ( dataLoaderAlg ==
nullptr ) {
250 <<
"\" found, and unmet INPUT dependencies " 256 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/" 257 << dataLoaderAlg->name() <<
"\" Algorithm" << ost.
str() <<
endmsg;
260 Gaudi::Algorithm* dataAlg = dynamic_cast<Gaudi::Algorithm*>( dataLoaderAlg );
262 fatal() <<
"Unable to dcast DataLoader \"" <<
m_useDataLoader.value() <<
"\" IAlg to Gaudi::Algorithm" 267 for (
auto&
id : unmetDep ) {
268 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/" 269 << dataLoaderAlg->name() <<
endmsg;
274 fatal() <<
"Auto DataLoading not requested, " 275 <<
"and the following unmet INPUT dependencies were found:" << ost.
str() <<
endmsg;
280 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
287 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
292 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
307 if ( !messageSvc.
isValid() )
error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
319 info() <<
"Concurrency level information:" <<
endmsg;
345 info() <<
"Joining Scheduler thread" <<
endmsg;
350 error() <<
"problems in scheduler thread" <<
endmsg;
373 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
391 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
400 error() <<
"Problems terminating thread pool" <<
endmsg;
459 if ( !eventContext ) {
474 const unsigned int thisSlotNum = eventContext->
slot();
477 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
482 thisSlot.
reset( eventContext );
490 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
495 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
504 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
517 for (
auto context : eventContexts ) {
556 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")" 577 const int source_slot ) {
588 eventSlotsPtrs.
reserve( eventsSlotsSize );
590 if ( !slotIt->complete ) eventSlotsPtrs.
push_back( &( *slotIt ) );
598 for (
EventSlot* thisSlotPtr : eventSlotsPtrs ) {
606 if ( algo_index >= 0 ) {
610 if ( sub_slot == -1 || iSlot != source_slot ) {
612 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << iSlot <<
endmsg;
617 error() <<
"Failed to call IPrecedenceSvc::iterate for sub-slot " << sub_slot <<
" of " << iSlot <<
endmsg;
627 auto comp_nodes = [
this](
const uint& i,
const uint& j ) {
632 for (
auto it = thisAlgsStates.
begin( AState::DATAREADY ); it != thisAlgsStates.
end( AState::DATAREADY ); ++it )
634 while ( !buffer.
empty() ) {
635 bool IOBound =
false;
644 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " 651 for (
auto it = thisAlgsStates.
begin( AState::DATAREADY ); it != thisAlgsStates.
end( AState::DATAREADY ); ++it ) {
654 bool IOBound =
false;
663 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " <<
index2algname( algIndex )
664 <<
" on processing slot " << iSlot <<
endmsg;
669 for (
auto& subslot : thisSlot.allSubSlots ) {
670 auto& subslotStates = subslot.algsStates;
671 for (
auto it = subslotStates.begin( AState::DATAREADY ); it != subslotStates.end( AState::DATAREADY ); ++it ) {
683 s << ( algo_index != -1 ?
index2algname( algo_index ) :
"START" ) <<
", " 684 << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", " 688 :
std::to_string( tbb::task_scheduler_init::default_num_threads() );
690 myfile.
open(
"IntraEventFSMOccupancy_" +
threads +
"T.csv", std::ios::app );
697 !thisSlot.algsStates.containsAny( {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
698 !subSlotAlgsInStates( thisSlot, {AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED} ) &&
699 !thisSlot.complete ) {
701 thisSlot.complete =
true;
705 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot " 706 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
713 thisSlot.eventContext.reset(
nullptr );
739 !subSlotAlgsInStates( slot, {AState::DATAREADY, AState::SCHEDULED} ) ) {
755 const uint slotIdx = eventContext->
slot();
757 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
780 outputMS <<
"Dumping scheduler state\n" 781 <<
"=========================================================================================\n" 782 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n" 783 <<
"=========================================================================================\n\n";
787 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping " 788 <<
"------------------\n\n";
792 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
793 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
799 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED ); ++it )
804 for (
auto it = slot.algsStates.begin( AState::SCHEDULED ); it != slot.algsStates.end( AState::SCHEDULED );
809 outputMS <<
" task: " <<
std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/" 810 << slot.eventContext->slot();
813 if ( timelineSvc.isValid() ) {
816 te.slot = slot.eventContext->slot();
817 te.event = slot.eventContext->evt();
819 if ( timelineSvc->getTimelineEvent( te ) )
822 outputMS <<
" thread.id: [unknown]";
834 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping " 835 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
840 if ( slot.complete )
continue;
842 outputMS <<
"[ slot: " 843 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
845 << ( slot.eventContext->valid() ?
std::to_string( slot.eventContext->evt() ) :
"[ctx invalid]" )
848 if ( 0 > iSlot || iSlot == slotCount ) {
855 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
"\n\n";
856 auto slotID = slot.eventContext->valid() ?
std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
857 for (
auto& ss : slot.allSubSlots ) {
858 outputMS <<
"[ slot: " << slotID <<
", sub-slot: " 859 << ( ss.eventContext->valid() ?
std::to_string( ss.eventContext->subSlot() ) :
"[ctx invalid]" )
860 <<
", entry: " << ss.entryPoint <<
", event: " 861 << ( ss.eventContext->valid() ?
std::to_string( ss.eventContext->evt() ) :
"[ctx invalid]" )
872 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
876 outputMS <<
"\n=========================================================================================\n" 877 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n" 878 <<
"=========================================================================================\n\n";
893 if ( sc.isSuccess() ) {
896 auto promote2ExecutedClosure = [
this, iAlgo, ialgoPtr, eventContext]() {
897 this->
m_actionsQueue.push( [
this, iAlgo, ialgoPtr, eventContext]() {
906 tbb::task* algoTask =
new ( tbb::task::allocate_root() )
909 tbb::task::enqueue( *algoTask );
913 promote2ExecutedClosure );
917 ON_DEBUG debug() <<
"Algorithm " << algName <<
" was submitted on event " << eventContext->
evt() <<
" in slot " 925 size_t const subSlotIndex = eventContext->
subSlot();
926 updateSc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
929 updateSc = thisSlot.
algsStates.
set( iAlgo, AState::SCHEDULED );
956 if ( sc.isSuccess() ) {
959 auto promote2ExecutedClosure = [
this, iAlgo, ialgoPtr, eventContext]() {
960 this->
m_actionsQueue.push( [
this, iAlgo, ialgoPtr, eventContext]() {
987 ON_DEBUG debug() <<
"[Asynchronous] Algorithm " << algName <<
" was submitted on event " << eventContext->
evt()
995 size_t const subSlotIndex = eventContext->
subSlot();
996 updateSc = thisSlot.
allSubSlots[subSlotIndex].algsStates.set( iAlgo, AState::SCHEDULED );
999 updateSc = thisSlot.
algsStates.
set( iAlgo, AState::SCHEDULED );
1003 <<
"[Asynchronous] Promoting " << algName <<
" to SCHEDULED on slot " << si <<
endmsg;
1007 <<
" on slot " << si <<
endmsg;
1023 error() <<
"[Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1024 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1032 ON_DEBUG debug() <<
"Trying to handle execution result of " << algo->name() <<
" on slot " << si <<
endmsg;
1036 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1040 int subSlotIndex = -1;
1043 subSlotIndex = eventContext->
subSlot();
1051 <<
"Promoting " << algo->name() <<
" on slot " << si <<
" to " <<
state <<
endmsg;
1053 ON_DEBUG debug() <<
"Algorithm " << algo->name() <<
" executed in slot " << si <<
". Algorithms scheduled are " 1060 return this->
updateStates( -1, iAlgo, subSlotIndex, si );
1077 error() <<
"[Asynchronous] [Event " << eventContext->
evt() <<
", Slot " << eventContext->
slot() <<
"] " 1078 <<
"Instance of algorithm " << algo->name() <<
" could not be properly put back." <<
endmsg;
1086 ON_DEBUG debug() <<
"[Asynchronous] Trying to handle execution result of " << algo->name() <<
" on slot " << si
1091 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1095 int subSlotIndex = -1;
1098 subSlotIndex = eventContext->
subSlot();
1106 <<
"[Asynchronous] Promoting " << algo->name() <<
" on slot " << si <<
" to " <<
state <<
endmsg;
1108 ON_DEBUG debug() <<
"[Asynchronous] Algorithm " << algo->name() <<
" executed in slot " << si
1115 return this->
updateStates( -1, iAlgo, subSlotIndex, si );
1129 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1137 auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.
release(),
1142 if ( viewContextPtr ) {
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
Gaudi::Property< bool > m_showDataFlow
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
StatusCode initialize() override
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Class representing an event slot.
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
Gaudi::Property< std::string > m_whiteboardSvcName
unsigned int m_IOBoundAlgosInFlight
Number of algorithms presently in flight.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
virtual StatusCode iterate(EventSlot &, const Cause &)=0
Infer the precedence effect caused by an execution flow event.
std::vector< unsigned int > m_actionsCounts
Bookkeeping of the number of actions in flight per slot.
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.
StatusCode finalize() override
virtual const std::string & type() const =0
The type of the algorithm.
Gaudi::Property< bool > m_dumpIntraEventDynamics
Gaudi::Property< bool > m_showDataDeps
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
A service to resolve the task execution precedence.
bool isValid() const
Allow for check if smart pointer is valid.
constexpr static const auto SUCCESS
bool filterPassed() const
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_useDataLoader
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
Gaudi::Property< std::string > m_optimizationMode
virtual StatusCode simulate(EventSlot &) const =0
Simulate execution flow.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
virtual bool CFRulesResolved(EventSlot &) const =0
Check if control flow rules are resolved.
virtual std::list< IAlgorithm * > getFlatAlgList()=0
Get the flat list of algorithms.
TYPE * get() const
Get interface pointer.
This class represents an entry point to all the event specific data.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
std::string fullKey() const
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
tbb::task * execute() override
Gaudi::Property< bool > m_checkDeps
Gaudi::Property< bool > m_useIOBoundAlgScheduler
virtual const AlgExecState & algExecState(const Gaudi::StringKey &algName, const EventContext &ctx) const =0
#define DECLARE_COMPONENT(type)
const unsigned int & getAlgoIndex() const
Get algorithm index.
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
virtual StatusCode terminatePool()=0
Finalize the thread pool.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
StatusCode service(const Gaudi::Utils::TypeNameString &name, T *&svc, bool createIf=true)
Templated method to access a service by name.
const std::string & name() const override
Retrieve name of the service.
unsigned int m_algosInFlight
Number of algorithms presently in flight.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
Gaudi::Property< bool > m_verboseSubSlots
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
StatusCode promoteToScheduled(unsigned int iAlgo, int si, EventContext *)
Algorithm promotion.
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
virtual void dumpPrecedenceRules(EventSlot &)=0
Dump precedence rules.
Gaudi::Property< bool > m_showControlFlow
This class is used for returning status codes from appropriate routines.
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
const StatusCode & execStatus() const
size_t sizeOfSubset(State state) const
StatusCode set(unsigned int iAlgo, State newState)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
Gaudi::Property< std::string > m_IOBoundAlgSchedulerSvcName
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
StatusCode finalize() override
Finalise.
virtual size_t getNumberOfStores() const =0
Get the number of 'slots'.
Gaudi::Property< int > m_threadPoolSize
virtual void dumpDataFlow() const =0
SmartIF< IThreadPoolSvc > m_threadPoolSvc
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
The IAlgorithm is the interface implemented by the Algorithm base class.
State
Execution states of the algorithms.
GAUDI_API void setCurrentContext(const EventContext *ctx)
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si, EventContext *)
ContextID_t subSlot() const
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
const DataObjIDColl & outputDataObjs() const override
const StatusCode & ignore() const
Ignore/check StatusCode.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Gaudi::Property< bool > m_simulateExecution
Iterator begin(State kind)
Gaudi::Property< bool > m_enableCondSvc
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Base class from which all concrete algorithm classes should be derived.
size_t m_maxAlgosInFlight
virtual uint getPriority(const std::string &) const =0
Get task priority.
constexpr static const auto FAILURE
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
unsigned int freeSlots() override
Get free slots number.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
bool complete
Flags completion of the event.
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
virtual StatusCode push(IAlgTask &task)=0
const DataObjIDColl & inputDataObjs() const override
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
StatusCode updateStates(int si=-1, int algo_index=-1, int sub_slot=-1, int source_slot=-1)
Loop on algorithm in the slots and promote them to successive states (-1 for algo_index means skippin...
virtual void dumpControlFlow() const =0
Dump precedence rules.
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
virtual bool isBlocking(const std::string &) const =0
Check if a task is CPU-blocking.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
virtual const std::string printState(EventSlot &) const =0
size_t m_maxEventsInFlight
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
const std::string & name() const override
The identifying name of the algorithm object.
AlgsExecutionStates algsStates
Vector of algorithms states.
T emplace_back(T... args)
std::thread m_thread
The thread in which the activate function runs.