36#include <unordered_map> 
   40#include <tbb/concurrent_priority_queue.h> 
   41#include <tbb/concurrent_queue.h> 
   42#include <tbb/task_arena.h> 
  119  using extends::extends;
 
  147                                        std::unique_ptr<EventContext> viewContext ) 
override;
 
  152  virtual void recordOccupancy( 
int samplePeriod, std::function<
void( OccupancySnapshot )> callback ) 
override;
 
  156                            const std::map<std::string, DataObjIDColl>& outDeps ) 
const;
 
  165  std::chrono::duration<int64_t, std::milli> 
m_snapshotInterval = std::chrono::duration<int64_t, std::milli>::min();
 
  166  std::chrono::system_clock::time_point      
m_lastSnapshot     = std::chrono::system_clock::now();
 
  170      this, 
"ThreadPoolSize", -1,
 
  171      "Size of the global thread pool initialised by TBB; a value of -1 requests to use" 
  172      "all available hardware threads; -100 requests to bypass TBB executing " 
  173      "all algorithms in the scheduler's thread." };
 
 
  175      this, 
"maxParallelismExtra", 0,
 
  176      "Allows to add some extra threads to the maximum parallelism set in TBB" 
  177      "The TBB max parallelism is set as: ThreadPoolSize + maxParallelismExtra + 1" };
 
 
  180      this, 
"MaxBlockingAlgosInFlight", 0, 
"Maximum allowed number of simultaneously running CPU-blocking algorithms" };
 
 
  182      this, 
"SimulateExecution", 
false,
 
  183      "Flag to perform single-pass simulation of execution flow before the actual execution" };
 
 
  185                                                   "The following modes are currently available: PCE, COD, DRE,  E" };
 
 
  187                                                  "Dump intra-event concurrency dynamics to csv file" };
 
 
  189      this, 
"PreemptiveBlockingTasks", 
false,
 
  190      "Enable preemptive scheduling of CPU-blocking algorithms. Blocking algorithms must be flagged accordingly." };
 
 
  192      this, 
"NumOffloadThreads", 2,
 
  193      "Number of threads to use for CPU portion of asynchronous algorithms. Asynchronous algorithms must be flagged " 
  194      "and use Boost Fiber functionality to suspend while waiting for offloaded work." };
 
 
  196                                     "Runtime check of Algorithm Input Data Dependencies" };
 
 
  198                                       "Runtime check of Algorithm Output Data usage" };
 
 
  201      "CheckOutputUsageIgnoreList",
 
  203      "Ignore outputs of the Algorithms of this name when doing the check",
 
  204      "OrderedSet<std::string>" };
 
 
  207                                                "Attribute unmet input dependencies to this DataLoader Algorithm" };
 
 
  212                                        "Show the INPUT and OUTPUT data dependencies of Algorithms" };
 
 
  215                                        "Show the configuration of DataFlow between Algorithms" };
 
 
  218                                           "Show the configuration of all Algorithms and Sequences" };
 
 
  223      this, 
"DataDepsGraphFile", 
"",
 
  224      "Name of the output file (.dot or .md extensions allowed) containing the data dependency graph for some selected " 
 
  228      this, 
"DataDepsGraphAlgPattern", 
".*",
 
  229      "Regex pattern for selecting desired Algorithms by name, whose data dependency has to be included in the data " 
 
  233      this, 
"DataDepsGraphObjectPattern", 
".*",
 
  234      "Regex pattern for selecting desired input or output by their full key" };
 
 
 
State
Execution states of the algorithms Must have contiguous integer values 0, 1... N.
 
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
 
Gaudi::Property< std::vector< std::string > > m_checkOutputIgnoreList
 
SmartIF< IThreadPoolSvc > m_threadPoolSvc
 
Gaudi::Property< std::string > m_useDataLoader
 
void dumpState() override
Dump scheduler state for all slots.
 
void activate()
Activate scheduler.
 
Gaudi::Property< std::string > m_optimizationMode
 
bool next(TaskSpec &ts, bool asynchronous)
 
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is available.
 
size_t m_maxAlgosInFlight
 
Gaudi::Property< bool > m_dumpIntraEventDynamics
 
std::chrono::system_clock::time_point m_lastSnapshot
 
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
 
Gaudi::Property< int > m_threadPoolSize
 
StatusCode finalize() override
Finalise.
 
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledQueue
Queues for scheduled algorithms.
 
std::function< void(OccupancySnapshot)> m_snapshotCallback
 
std::queue< TaskSpec > m_retryQueue
 
Gaudi::Property< bool > m_verboseSubSlots
 
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
 
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
 
SmartIF< ICondSvc > m_condSvc
A shortcut to service for Conditions handling.
 
AlgsExecutionStates::State AState
 
bool isStalled(const EventSlot &) const
Check if scheduling in a particular slot is in a stall.
 
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
 
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
 
StatusCode revise(unsigned int iAlgo, EventContext *contextPtr, AState state, bool iterate=false)
 
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
 
StatusCode deactivate()
Deactivate scheduler.
 
size_t m_maxEventsInFlight
 
Gaudi::Property< unsigned int > m_maxBlockingAlgosInFlight
 
unsigned int m_algosInFlight
Number of algorithms presently in flight.
 
unsigned int m_blockingAlgosInFlight
Number of algorithms presently in flight.
 
Gaudi::Property< std::string > m_dataDepsGraphObjectPattern
 
Gaudi::Property< bool > m_showDataFlow
 
StatusCode schedule(TaskSpec &&)
 
SmartIF< IPrecedenceSvc > m_precSvc
A shortcut to the Precedence Service.
 
Gaudi::Property< bool > m_checkDeps
 
std::chrono::duration< int64_t, std::milli > m_snapshotInterval
 
Gaudi::Property< std::string > m_dataDepsGraphFile
 
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
 
Gaudi::Property< bool > m_showControlFlow
 
Gaudi::Property< bool > m_simulateExecution
 
StatusCode dumpGraphFile(const std::map< std::string, DataObjIDColl > &inDeps, const std::map< std::string, DataObjIDColl > &outDeps) const
 
Gaudi::Property< std::string > m_whiteboardSvcName
 
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
 
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
 
std::atomic< bool > m_needsUpdate
 
virtual StatusCode scheduleEventView(const EventContext *sourceContext, const std::string &nodeName, std::unique_ptr< EventContext > viewContext) override
Method to inform the scheduler about event views.
 
Gaudi::Property< int > m_maxParallelismExtra
 
StatusCode signoff(const TaskSpec &)
The call to this method is triggered only from within the AlgTask.
 
std::function< StatusCode()> action
 
Gaudi::Property< std::string > m_dataDepsGraphAlgoPattern
 
Gaudi::Property< int > m_numOffloadThreads
 
Gaudi::Property< bool > m_checkOutput
 
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
 
StatusCode initialize() override
Initialise.
 
Gaudi::Property< bool > m_enableCondSvc
 
virtual void recordOccupancy(int samplePeriod, std::function< void(OccupancySnapshot)> callback) override
Sample occupancy at fixed interval (ms) Negative value to deactivate, 0 to snapshot every change Each...
 
void eventFailed(EventContext *eventContext)
Method to execute if an event failed.
 
Gaudi::Property< bool > m_enablePreemptiveBlockingTasks
 
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
 
tbb::task_arena * m_arena
 
unsigned int freeSlots() override
Get free slots number.
 
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
 
std::unique_ptr< FiberManager > m_fiberManager
 
SmartIF< IHiveWhiteBoard > m_whiteboard
A shortcut to the whiteboard.
 
tbb::concurrent_priority_queue< TaskSpec, AlgQueueSort > m_scheduledAsynchronousQueue
 
StatusCode iterate()
Loop on all slots to schedule DATAREADY algorithms and sign off ready events.
 
Gaudi::Property< bool > m_showDataDeps
 
std::thread m_thread
The thread in which the activate function runs.
 
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
 
std::vector< EventSlot > m_eventSlots
Vector of events slots.
 
This class represents an entry point to all the event specific data.
 
Implementation of property with value of concrete type.
 
The IAlgorithm is the interface implemented by the Algorithm base class.
 
Small smart pointer class with automatic reference counting for IInterface.
 
This class is used for returning status codes from appropriate routines.
 
Base class used to extend a class implementing other interfaces.
 
Comparison operator to sort the queues.
 
bool operator()(const TaskSpec &i, const TaskSpec &j) const
 
Struct to hold entries in the alg queues.
 
TaskSpec & operator=(const TaskSpec &)=delete
Assignment operator.
 
EventContext * contextPtr
 
TaskSpec(IAlgorithm *algPtr, unsigned int algIndex, const std::string &algName, unsigned int algRank, bool asynchronous, int slotIndex, EventContext *eventContext)
 
TaskSpec(const TaskSpec &)=default
Copy constructor (to keep a lambda capturing a TaskSpec storable as a std::function value)
 
TaskSpec()
Default constructor.
 
TaskSpec & operator=(TaskSpec &&)=default
Move assignment.
 
TaskSpec(TaskSpec &&)=default
Move constructor.
 
Class representing an event slot.