36#include <unordered_set>
39#include <boost/algorithm/string.hpp>
40#include <boost/thread.hpp>
41#include <boost/tokenizer.hpp>
46#define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
47#define ON_VERBOSE if ( msgLevel( MSG::VERBOSE ) )
50 struct DataObjIDSorter {
56 std::vector<const DataObjID*> sortedDataObjIDColl(
const DataObjIDColl& coll ) {
57 std::vector<const DataObjID*> v;
58 v.reserve( coll.size() );
59 for (
const DataObjID&
id : coll ) v.push_back( &
id );
60 std::sort( v.begin(), v.end(), DataObjIDSorter() );
64 bool subSlotAlgsInStates(
const EventSlot& slot, std::initializer_list<AlgsExecutionStates::State> testStates ) {
66 [testStates](
const EventSlot& ss ) { return ss.algsStates.containsAny( testStates ); } );
87 fatal() <<
"Error retrieving ThreadPoolSvc" <<
endmsg;
97 fatal() <<
"Cannot find valid TBB task_arena" <<
endmsg;
102 info() <<
"Activating scheduler in a separate thread" <<
endmsg;
103 std::binary_semaphore fiber_manager_initalized{ 0 };
104 m_thread = std::thread( [
this, &fiber_manager_initalized]() {
107 fiber_manager_initalized.release();
111 fiber_manager_initalized.acquire();
127 warning() <<
"No CondSvc found, or not enabled. "
128 <<
"Will not manage CondAlgorithms" <<
endmsg;
136 fatal() <<
"Error retrieving AlgoResourcePool" <<
endmsg;
142 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
149 fatal() <<
"Error retrieving EventDataSvc interface IHiveWhiteBoard." <<
endmsg;
161 const unsigned int algsNumber = algos.size();
162 if ( algsNumber != 0 ) {
163 info() <<
"Found " << algsNumber <<
" algorithms" <<
endmsg;
177 std::map<std::string, DataObjIDColl> algosOutputDependenciesMap;
181 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm: this will result in a crash." <<
endmsg;
187 globalOutp.insert(
id );
188 algoOutputs.insert(
id );
190 algosOutputDependenciesMap[algoPtr->
name()] = algoOutputs;
193 std::ostringstream ostdd;
194 ostdd <<
"Data Dependencies for Algorithms:";
196 std::map<std::string, DataObjIDColl> algosInputDependenciesMap;
199 if (
nullptr == algoPtr ) {
200 fatal() <<
"Could not convert IAlgorithm into Gaudi::Algorithm for " << ialgoPtr->name()
201 <<
": this will result in a crash." <<
endmsg;
209 ostdd <<
"\n " << algoPtr->
name();
211 auto write_owners = [&avis, &ostdd](
const DataObjID& id ) {
220 ostdd <<
"\n o INPUT " << id;
222 algoDependencies.insert(
id );
223 globalInp.insert(
id );
226 ostdd <<
"\n o OUTPUT " << *id;
228 if ( id->key().find(
":" ) != std::string::npos ) {
229 error() <<
" in Alg " << algoPtr->
name() <<
" alternatives are NOT allowed for outputs! id: " << *
id
237 algosInputDependenciesMap[algoPtr->
name()] = algoDependencies;
244 if (
dumpGraphFile( algosInputDependenciesMap, algosOutputDependenciesMap ).isFailure() ) {
253 std::set<std::string> requiredInputKeys;
254 for (
auto o : globalInp ) {
257 requiredInputKeys.insert( o.key() );
258 if ( globalOutp.find( o ) == globalOutp.end() ) unmetDepInp.insert( o );
261 for (
auto o : globalOutp ) {
262 if ( globalInp.find( o ) == globalInp.end() && requiredInputKeys.find( o.key() ) == requiredInputKeys.end() ) {
266 auto it = algosOutputDependenciesMap.find( algoName );
267 if ( it != algosOutputDependenciesMap.end() ) {
268 if ( it->second.find( o ) != it->second.end() ) {
274 if ( !ignored ) { unusedOutp.insert( o ); }
281 if ( unmetDepInp.size() > 0 ) {
283 auto printUnmet = [&](
auto msg ) {
284 for (
const DataObjID* o : sortedDataObjIDColl( unmetDepInp ) ) {
285 msg <<
" o " << *o <<
" required by Algorithm: " <<
endmsg;
287 for (
const auto& p : algosInputDependenciesMap )
288 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
298 dataLoaderAlg = algo;
302 if ( dataLoaderAlg ==
nullptr ) {
304 <<
"\" found, and unmet INPUT dependencies "
306 printUnmet(
fatal() );
310 info() <<
"Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->
type() <<
"/"
311 << dataLoaderAlg->name() <<
"\" Algorithm" <<
endmsg;
312 printUnmet(
info() );
317 fatal() <<
"Unable to dcast DataLoader \"" <<
m_useDataLoader.value() <<
"\" IAlg to Gaudi::Algorithm"
322 for (
auto&
id : unmetDepInp ) {
323 ON_DEBUG debug() <<
"adding OUTPUT dep \"" <<
id <<
"\" to " << dataLoaderAlg->
type() <<
"/"
324 << dataLoaderAlg->name() <<
endmsg;
329 fatal() <<
"Auto DataLoading not requested, "
330 <<
"and the following unmet INPUT dependencies were found:" <<
endmsg;
331 printUnmet(
fatal() );
336 info() <<
"No unmet INPUT data dependencies were found" <<
endmsg;
341 if ( unusedOutp.size() > 0 ) {
343 auto printUnusedOutp = [&](
auto msg ) {
344 for (
const DataObjID* o : sortedDataObjIDColl( unusedOutp ) ) {
345 msg <<
" o " << *o <<
" produced by Algorithm: " <<
endmsg;
347 for (
const auto& p : algosOutputDependenciesMap )
348 if ( p.second.find( *o ) != p.second.end() )
msg <<
" * " << p.first <<
endmsg;
352 fatal() <<
"The following unused OUTPUT items were found:" <<
endmsg;
353 printUnusedOutp(
fatal() );
356 info() <<
"No unused OUTPUT items were found" <<
endmsg;
363 fatal() <<
"Error retrieving PrecedenceSvc" <<
endmsg;
368 fatal() <<
"Unable to dcast PrecedenceSvc" <<
endmsg;
375 const std::string&
name = algo->name();
383 if ( !messageSvc.
isValid() )
error() <<
"Error retrieving MessageSvc interface IMessageSvc." <<
endmsg;
394 info() <<
"Concurrency level information:" <<
endmsg;
400 info() <<
"Task scheduling settings:" <<
endmsg;
401 info() <<
" o Avalanche generation mode: "
403 info() <<
" o Preemptive scheduling of CPU-blocking tasks: "
435 info() <<
"Joining Scheduler thread" <<
endmsg;
440 error() <<
"problems in scheduler thread" <<
endmsg;
463 error() <<
"problems initializing ThreadPoolSvc" <<
endmsg;
481 verbose() <<
"Action did not succeed (which is not bad per se)." <<
endmsg;
492 verbose() <<
"Iteration did not succeed (which is not bad per se)." <<
endmsg;
502 error() <<
"Problems terminating thread pool" <<
endmsg;
548 if ( !eventContext ) {
563 const unsigned int thisSlotNum = eventContext->
slot();
566 fatal() <<
"The slot " << thisSlotNum <<
" is supposed to be a finished event but it's not" <<
endmsg;
571 thisSlot.
reset( eventContext );
578 if (
m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
579 error() <<
"Failed to call IPrecedenceSvc::iterate for slot " << thisSlotNum <<
endmsg;
583 if ( this->
iterate().isFailure() ) {
584 error() <<
"Failed to call AvalancheSchedulerSvc::updateStates for slot " << thisSlotNum <<
endmsg;
593 verbose() <<
"Pushing the action to update the scheduler for slot " << eventContext->
slot() <<
endmsg;
606 for (
auto context : eventContexts ) {
649 ON_DEBUG debug() <<
"Try Pop successful slot " << eventContext->
slot() <<
"(event " << eventContext->
evt() <<
")"
672 for (
unsigned int retryIndex = 0; retryIndex < retries; ++retryIndex ) {
675 global_sc =
schedule( std::move( retryTS ) );
679 OccupancySnapshot nextSnap;
680 auto now = std::chrono::system_clock::now();
684 if ( !thisSlot.eventContext )
continue;
686 int iSlot = thisSlot.eventContext->slot();
698 if ( nextSnap.states.empty() ) {
704 std::vector<int>& slotStateTotals = nextSnap.states[iSlot];
705 slotStateTotals.resize( AState::MAXVALUE );
706 for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
707 slotStateTotals[state] = thisSlot.algsStates.sizeOfSubset(
AState( state ) );
711 for (
auto& subslot : thisSlot.allSubSlots ) {
712 for ( uint8_t state = 0; state < AState::MAXVALUE; ++state ) {
713 slotStateTotals[state] += subslot.algsStates.sizeOfSubset(
AState( state ) );
719 const auto& drAlgs = thisAlgsStates.
algsInState( AState::DATAREADY );
720 for ( uint algIndex : drAlgs ) {
723 bool asynchronous{
m_precSvc->isAsynchronous( algName ) };
726 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, asynchronous, iSlot, thisSlot.eventContext.get() ) );
729 <<
"Could not apply transition from " << AState::DATAREADY <<
" for algorithm " << algName
730 <<
" on processing slot " << iSlot <<
endmsg;
734 for (
auto& subslot : thisSlot.allSubSlots ) {
735 const auto& drAlgsSubSlot = subslot.algsStates.algsInState( AState::DATAREADY );
736 for ( uint algIndex : drAlgsSubSlot ) {
739 bool asynchronous{
m_precSvc->isAsynchronous( algName ) };
741 schedule(
TaskSpec(
nullptr, algIndex, algName, rank, asynchronous, iSlot, subslot.eventContext.get() ) );
747 s <<
"START, " << thisAlgsStates.
sizeOfSubset( AState::CONTROLREADY ) <<
", "
749 <<
", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() <<
"\n";
751 : std::to_string( std::thread::hardware_concurrency() );
752 std::ofstream myfile;
753 myfile.open(
"IntraEventFSMOccupancy_" + threads +
"T.csv", std::ios::app );
759 if (
m_precSvc->CFRulesResolved( thisSlot ) &&
760 !thisSlot.algsStates.containsAny(
761 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
762 !subSlotAlgsInStates( thisSlot,
763 { AState::CONTROLREADY, AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
764 !thisSlot.complete ) {
766 thisSlot.complete =
true;
770 ON_DEBUG debug() <<
"Event " << thisSlot.eventContext->evt() <<
" finished (slot "
771 << thisSlot.eventContext->slot() <<
")." <<
endmsg;
778 thisSlot.eventContext.reset(
nullptr );
788 if ( !nextSnap.states.empty() ) {
802 auto slotIndex = contextPtr->
slot();
808 auto subSlotIndex = contextPtr->
subSlot();
815 <<
", subslot:" << subSlotIndex <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
825 <<
", event:" << contextPtr->
evt() <<
"]" <<
endmsg;
843 if ( !slot.
algsStates.
containsAny( { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) &&
844 !subSlotAlgsInStates( slot, { AState::DATAREADY, AState::SCHEDULED, AState::RESOURCELESS } ) ) {
860 const uint slotIdx = eventContext->
slot();
862 error() <<
"Event " << eventContext->
evt() <<
" on slot " << slotIdx <<
" failed" <<
endmsg;
883 std::ostringstream outputMS;
885 outputMS <<
"Dumping scheduler state\n"
886 <<
"=========================================================================================\n"
887 <<
"++++++++++++++++++++++++++++++++++++ SCHEDULER STATE ++++++++++++++++++++++++++++++++++++\n"
888 <<
"=========================================================================================\n\n";
892 outputMS <<
"------------------ Last schedule: Task/Event/Slot/Thread/State Mapping "
893 <<
"------------------\n\n";
897 if ( !timelineSvc.isValid() || !timelineSvc->isEnabled() ) {
898 outputMS <<
"WARNING Enable TimelineSvc in record mode (RecordTimeline = True) to trace the mapping\n";
905 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
906 for ( uint algIndex : schedAlgs ) {
914 const auto& schedAlgs = slot.algsStates.algsInState( AState::SCHEDULED );
915 for ( uint algIndex : schedAlgs ) {
919 outputMS <<
" task: " << std::setw( indt ) << algoName <<
" evt/slot: " << slot.eventContext->evt() <<
"/"
920 << slot.eventContext->slot();
923 if ( timelineSvc.isValid() ) {
926 te.slot = slot.eventContext->slot();
927 te.event = slot.eventContext->evt();
929 if ( timelineSvc->getTimelineEvent( te ) )
930 outputMS <<
" thread.id: 0x" << std::hex << te.thread << std::dec;
932 outputMS <<
" thread.id: [unknown]";
937 outputMS <<
" state: [" <<
m_algExecStateSvc->algExecState( algoName, *( slot.eventContext ) ) <<
"]\n";
944 outputMS <<
"\n---------------------------- Task/CF/FSM Mapping "
945 << ( 0 > iSlot ?
"[all slots] --" :
"[target slot] " ) <<
"--------------------------\n\n";
948 bool wasAlgError = ( iSlot >= 0 ) ?
m_eventSlots[iSlot].algsStates.containsAny( { AState::ERROR } ) ||
949 subSlotAlgsInStates(
m_eventSlots[iSlot], { AState::ERROR } )
954 if ( slot.complete )
continue;
956 outputMS <<
"[ slot: "
957 << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]" )
959 << ( slot.eventContext->valid() ? std::to_string( slot.eventContext->evt() ) :
"[ctx invalid]" );
961 if ( slot.eventContext->eventID().isValid() ) { outputMS <<
", eventID: " << slot.eventContext->eventID(); }
962 outputMS <<
" ]:\n\n";
964 if ( 0 > iSlot || iSlot == slotCount ) {
968 outputMS <<
"ERROR alg(s):";
970 const auto& errorAlgs = slot.algsStates.algsInState( AState::ERROR );
971 for ( uint algIndex : errorAlgs ) {
975 if ( errorCount == 0 ) outputMS <<
" in subslot(s)";
979 outputMS <<
m_precSvc->printState( slot ) <<
"\n";
984 outputMS <<
"\nNumber of sub-slots: " << slot.allSubSlots.size() <<
"\n\n";
985 auto slotID = slot.eventContext->valid() ? std::to_string( slot.eventContext->slot() ) :
"[ctx invalid]";
986 for (
auto& ss : slot.allSubSlots ) {
987 outputMS <<
"[ slot: " << slotID <<
", sub-slot: "
988 << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->subSlot() ) :
"[ctx invalid]" )
989 <<
", entry: " << ss.entryPoint <<
", event: "
990 << ( ss.eventContext->valid() ? std::to_string( ss.eventContext->evt() ) :
"[ctx invalid]" )
993 outputMS <<
"ERROR alg(s):";
994 const auto& errorAlgs = ss.algsStates.algsInState( AState::ERROR );
995 for ( uint algIndex : errorAlgs ) { outputMS <<
" " <<
index2algname( algIndex ); }
999 outputMS <<
m_precSvc->printState( ss ) <<
"\n";
1008 if ( 0 <= iSlot && !wasAlgError ) {
1009 outputMS <<
"\n------------------------------ Algorithm Execution States -----------------------------\n\n";
1013 outputMS <<
"\n=========================================================================================\n"
1014 <<
"++++++++++++++++++++++++++++++++++++++ END OF DUMP ++++++++++++++++++++++++++++++++++++++\n"
1015 <<
"=========================================================================================\n\n";
1035 unsigned int algIndex{ ts.algIndex };
1036 std::string_view algName( ts.algName );
1037 unsigned int algRank{ ts.algRank };
1038 bool asynchronous{ ts.asynchronous };
1039 int slotIndex{ ts.slotIndex };
1042 if ( asynchronous ) {
1050 if ( !asynchronous ) {
1058 sc =
revise( algIndex, contextPtr, AState::SCHEDULED );
1060 ON_DEBUG debug() <<
"Scheduled " << algName <<
" [slot:" << slotIndex <<
", event:" << contextPtr->evt()
1061 <<
", rank:" << algRank <<
", asynchronous:" << ( asynchronous ?
"yes" :
"no" )
1071 sc =
revise( ts.algIndex, ts.contextPtr, AState::SCHEDULED );
1077 sc =
revise( ts.algIndex, ts.contextPtr, AState::RESOURCELESS );
1100 ? ( algstate.
filterPassed() ? AState::EVTACCEPTED : AState::EVTREJECTED )
1104 auto sc =
revise( ts.algIndex, ts.contextPtr, state,
true );
1106 ON_DEBUG debug() <<
"Executed " << ts.algName <<
" [slot:" << ts.slotIndex <<
", event:" << ts.contextPtr->evt()
1107 <<
", rank:" << ts.algRank <<
", asynchronous:" << ( ts.asynchronous ?
"yes" :
"no" )
1124 std::unique_ptr<EventContext> viewContext ) {
1127 fatal() <<
"Attempted to nest EventViews at node " << nodeName <<
": this is not supported" <<
endmsg;
1135 auto action = [
this, slotIndex = sourceContext->
slot(), viewContextPtr = viewContext.release(),
1140 if ( viewContextPtr ) {
1142 auto viewContext = std::unique_ptr<EventContext>( viewContextPtr );
1143 topSlot.
addSubSlot( std::move( viewContext ), nodeName );
1165 auto action = [
this, samplePeriod, callback = std::move( callback )]() ->
StatusCode {
1166 if ( samplePeriod < 0 ) {
1169 this->
m_snapshotInterval = std::chrono::duration<int64_t, std::milli>( samplePeriod );
1179 const std::map<std::string, DataObjIDColl>& outDeps )
const {
1181 assert( inDeps.size() == outDeps.size() );
1184 info() <<
"Dumping data dependencies graph to file: " << g.fileName() <<
endmsg;
1187 std::set<std::size_t> definedObjects;
1194 std::size_t algoIndex = 0ul;
1195 for (
const auto& [algName, ideps] : inDeps ) {
1196 if ( not std::regex_search( algName, algNameRegex ) )
continue;
1197 std::string algIndex =
"Alg_" + std::to_string( algoIndex );
1198 g.addNode( algName, algIndex );
1201 for (
const auto& dep : ideps ) {
1202 if ( not std::regex_search( dep.fullKey(), objNameRegex ) )
continue;
1204 const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1205 std::string objIndex =
"obj_" + std::to_string( dep.hash() );
1206 if ( inserted ) g.addNode( dep.key(), objIndex );
1208 g.addEdge( dep.key(), objIndex, algName, algIndex );
1211 const auto& odeps = outDeps.at( algName );
1212 for (
const auto& dep : odeps ) {
1213 if ( not std::regex_search( dep.fullKey(), objNameRegex ) )
continue;
1215 const auto [itr, inserted] = definedObjects.insert( dep.hash() );
1216 std::string objIndex =
"obj_" + std::to_string( dep.hash() );
1217 if ( inserted ) g.addNode( dep.key(), objIndex );
1219 g.addEdge( algName, algIndex, dep.key(), objIndex );
std::unordered_set< DataObjID, DataObjID_Hasher > DataObjIDColl
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
#define DECLARE_COMPONENT(type)
Provide serialization function (output only) for some common STL classes (vectors,...
wrapper on an Algorithm state.
const StatusCode & execStatus() const
bool filterPassed() const
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
const boost::container::flat_set< int > algsInState(State state) const
size_t sizeOfSubset(State state) const
bool containsAny(std::initializer_list< State > l) const
check if the collection contains at least one state of any listed types
StatusCode set(unsigned int iAlgo, State newState)
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Gaudi::Property< std::string > m_useDataLoader
void dumpState() override
Dump scheduler state for all slots.
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_optimizationMode
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
size_t m_maxAlgosInFlight
Gaudi::Property< bool > m_dumpIntraEventDynamics
std::chrono::system_clock::time_point m_lastSnapshot
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Gaudi::Property< int > m_threadPoolSize
StatusCode finalize() override
Finalise.
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
std::function< void(OccupancySnapshot)> m_snapshotCallback
std::queue< TaskSpec > m_retryQueue
Gaudi::Property< bool > m_verboseSubSlots
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
AlgsExecutionStates::State AState
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
StatusCode deactivate()
Deactivate scheduler.
size_t m_maxEventsInFlight
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
unsigned int m_algosInFlight
Number of algorithms presently in flight.
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
Gaudi::Property< bool > m_showDataFlow
StatusCode schedule(TaskSpec &&)
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
Gaudi::Property< bool > m_checkDeps
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
Gaudi::Property< std::string > m_dataDepsGraphFile
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Gaudi::Property< bool > m_showControlFlow
Gaudi::Property< bool > m_simulateExecution
StatusCode dumpGraphFile(const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
Gaudi::Property< std::string > m_whiteboardSvcName
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
std::atomic< bool > m_needsUpdate
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
Gaudi::Property< int > m_maxParallelismExtra
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
std::function< StatusCode()> action
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
Gaudi::Property< int > m_numOffloadThreads
Gaudi::Property< bool > m_checkOutput
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
StatusCode initialize() override
Initialise.
Gaudi::Property< bool > m_enableCondSvc
virtual void recordOccupancy(int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each...
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::task_arena * m_arena
unsigned int freeSlots() override
Get free slots number.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
std::unique_ptr< FiberManager > m_fiberManager
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledAsynchronousQueue
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Gaudi::Property< bool > m_showDataDeps
std::thread m_thread
The thread in which the activate function runs.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & msg() const
shortcut for the method msgStream(MSG::INFO)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MSG::Level msgLevel() const
std::vector< std::string > owners_names_of(const DataObjID &id, bool with_main=false) const
const DataObjIDColl & outputDataObjs() const override
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
const DataObjIDColl & inputDataObjs() const override
std::string fullKey() const
combination of the key and the ClassName, mostly for debugging
This class represents an entry point to all the event specific data.
ContextID_t subSlot() const
Base class from which all concrete algorithm classes should be derived.
void acceptDHVisitor(IDataHandleVisitor *) const override
const std::string & name() const override
The identifying name of the algorithm object.
utilities to dump graphs in different formats
The IAlgorithm is the interface implemented by the Algorithm base class.
virtual const std::string & type() const =0
The type of the algorithm.
virtual SmartIF< IService > & service(const Gaudi::Utils::TypeNameString &typeName, const bool createIf=true)=0
Returns a smart pointer to a service.
A service to resolve the task execution precedence.
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
StatusCode finalize() override
const std::string & name() const override
Retrieve name of the service.
StatusCode initialize() override
Small smart pointer class with automatic reference counting for IInterface.
bool isValid() const
Allow for check if smart pointer is valid.
This class is used for returning status codes from appropriate routines.
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
constexpr static const auto SUCCESS
constexpr static const auto FAILURE
A service which initializes a TBB thread pool.
unsigned int getAlgoIndex() const
Get algorithm index.
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
GAUDI_API void setCurrentContext(const EventContext *ctx)
std::ostream & operator<<(std::ostream &s, const std::pair< T1, T2 > &p)
Serialize an std::pair in a python like format. E.g. "(1, 2)".
Struct to hold entries in the alg queues.
Class representing an event slot.
std::unique_ptr< EventContext > eventContext
Cache for the eventContext.
std::vector< EventSlot > allSubSlots
Actual sub-slot instances.
bool complete
Flags completion of the event.
void addSubSlot(std::unique_ptr< EventContext > viewContext, const std::string &nodeName)
Add a subslot to the slot (this constructs a new slot and registers it with the parent one)
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot (thread-unsafe)
AlgsExecutionStates algsStates
Vector of algorithms states.
void disableSubSlots(const std::string &nodeName)
Disable event views for a given CF view node by registering an empty container Contact B.