36#include <unordered_map>
40#include <tbb/concurrent_priority_queue.h>
41#include <tbb/concurrent_queue.h>
42#include <tbb/task_arena.h>
119 using extends::extends;
147 std::unique_ptr<EventContext> viewContext )
override;
152 virtual void recordOccupancy(
int samplePeriod, std::function<
void( OccupancySnapshot )> callback )
override;
156 const std::map<std::string, DataObjIDColl>& outDeps )
const;
165 std::chrono::duration<int64_t, std::milli>
m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min();
166 std::chrono::system_clock::time_point
m_lastSnapshot = std::chrono::system_clock::now();
170 this,
"ThreadPoolSize", -1,
171 "Size of the global thread pool initialised by TBB; a value of -1 requests to use"
172 "all available hardware threads; -100 requests to bypass TBB executing "
173 "all algorithms in the scheduler's thread." };
175 this,
"maxParallelismExtra", 0,
176 "Allows to add some extra threads to the maximum parallelism set in TBB"
177 "The TBB max parallelism is set as: ThreadPoolSize + maxParallelismExtra + 1" };
180 this,
"MaxBlockingAlgosInFlight", 0,
"Maximum allowed number of simultaneously running CPU-blocking algorithms" };
182 this,
"SimulateExecution",
false,
183 "Flag to perform single-pass simulation of execution flow before the actual execution" };
185 "The following modes are currently available: PCE, COD, DRE, E" };
187 "Dump intra-event concurrency dynamics to csv file" };
189 this,
"PreemptiveBlockingTasks",
false,
190 "Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly." };
192 this,
"NumOffloadThreads", 2,
193 "Number of threads to use for CPU portion of asynchronous algorithms. Asynchronous algorithms must be flagged "
194 "and use Boost Fiber functionality to suspend while waiting for offloaded work." };
196 "Runtime check of Algorithm Input Data Dependencies" };
198 "Runtime check of Algorithm Output Data usage" };
201 "CheckOutputUsageIgnoreList",
203 "Ignore outputs of the Algorithms of this name when doing the check",
204 "OrderedSet<std::string>" };
207 "Attribute unmet input dependencies to this DataLoader Algorithm" };
212 "Show the INPUT and OUTPUT data dependencies of Algorithms" };
215 "Show the configuration of DataFlow between Algorithms" };
218 "Show the configuration of all Algorithms and Sequences" };
223 this,
"DataDepsGraphFile",
"",
224 "Name of the output file (.dot or .md extensions allowed) containing the data dependency graph for some selected "
228 this,
"DataDepsGraphAlgPattern",
".*",
229 "Regex pattern for selecting desired Algorithms by name, whose data dependency has to be included in the data "
233 this,
"DataDepsGraphObjectPattern",
".*",
234 "Regex pattern for selecting desired input or output by their full key" };
State
Execution states of the algorithms Must have contiguous integer values 0, 1... N.
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
SmartIF< IThreadPoolSvc > m_threadPoolSvc
Gaudi::Property< std::string > m_useDataLoader
void dumpState() override
Dump scheduler state for all slots.
void activate()
Activate scheduler.
Gaudi::Property< std::string > m_optimizationMode
bool next(TaskSpec &ts, bool asynchronous)
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
size_t m_maxAlgosInFlight
Gaudi::Property< bool > m_dumpIntraEventDynamics
std::chrono::system_clock::time_point m_lastSnapshot
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
Gaudi::Property< int > m_threadPoolSize
StatusCode finalize() override
Finalise.
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
std::function< void(OccupancySnapshot)> m_snapshotCallback
std::queue< TaskSpec > m_retryQueue
Gaudi::Property< bool > m_verboseSubSlots
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
AlgsExecutionStates::State AState
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
StatusCode deactivate()
Deactivate scheduler.
size_t m_maxEventsInFlight
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
unsigned int m_algosInFlight
Number of algorithms presently in flight.
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
Gaudi::Property< bool > m_showDataFlow
StatusCode schedule(TaskSpec &&)
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
Gaudi::Property< bool > m_checkDeps
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
Gaudi::Property< std::string > m_dataDepsGraphFile
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
Gaudi::Property< bool > m_showControlFlow
Gaudi::Property< bool > m_simulateExecution
StatusCode dumpGraphFile(const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
Gaudi::Property< std::string > m_whiteboardSvcName
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
std::atomic< bool > m_needsUpdate
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
Gaudi::Property< int > m_maxParallelismExtra
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
std::function< StatusCode()> action
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
Gaudi::Property< int > m_numOffloadThreads
Gaudi::Property< bool > m_checkOutput
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
StatusCode initialize() override
Initialise.
Gaudi::Property< bool > m_enableCondSvc
virtual void recordOccupancy(int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each...
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
tbb::task_arena * m_arena
unsigned int freeSlots() override
Get free slots number.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
std::unique_ptr< FiberManager > m_fiberManager
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledAsynchronousQueue
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
Gaudi::Property< bool > m_showDataDeps
std::thread m_thread
The thread in which the activate function runs.
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
This class represents an entry point to all the event specific data.
Implementation of property with value of concrete type.
The IAlgorithm is the interface implemented by the Algorithm base class.
Small smart pointer class with automatic reference counting for IInterface.
This class is used for returning status codes from appropriate routines.
Base class used to extend a class implementing other interfaces.
Comparison operator to sort the queues.
bool operator()(const TaskSpec &i, const TaskSpec &j) const
Struct to hold entries in the alg queues.
TaskSpec & operator=(const TaskSpec &)=delete
Assignment operator.
EventContext * contextPtr
TaskSpec(IAlgorithm *algPtr, unsigned int algIndex, const std::string &algName, unsigned int algRank, bool asynchronous, int slotIndex, EventContext *eventContext)
TaskSpec(const TaskSpec &)=default
Copy constructor (to keep a lambda capturing a TaskSpec storable as a std::function value)
TaskSpec()
Default constructor.
TaskSpec & operator=(TaskSpec &&)=default
Move assignment.
TaskSpec(TaskSpec &&)=default
Move constructor.
Class representing an event slot.