The Gaudi Framework  v29r0 (ff2e7097)
AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
2 #include "AlgoExecutionTask.h"
3 #include "IOBoundAlgTask.h"
4 
5 // Framework includes
7 #include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface
10 #include "GaudiKernel/IAlgorithm.h"
12 #include "GaudiKernel/SvcFactory.h"
14 
15 // C++
16 #include <algorithm>
17 #include <map>
18 #include <queue>
19 #include <sstream>
20 #include <unordered_set>
21 
22 // External libs
23 #include "boost/algorithm/string.hpp"
24 #include "boost/thread.hpp"
25 #include "boost/tokenizer.hpp"
26 // DP waiting for the TBB service
27 #include "tbb/task_scheduler_init.h"
28 
31 
32 // Instantiation of a static factory class used by clients to create instances of this service
34 
35 namespace
36 {
37  struct DataObjIDSorter {
38  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
39  };
40 
41  // Sort a DataObjIDColl in a well-defined, reproducible manner.
42  // Used for making debugging dumps.
43  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll )
44  {
46  v.reserve( coll.size() );
47  for ( const DataObjID& id : coll ) v.push_back( &id );
48  std::sort( v.begin(), v.end(), DataObjIDSorter() );
49  return v;
50  }
51 }
52 
53 //===========================================================================
54 // Infrastructure methods
55 
62 {
63 
64  // Initialise mother class (read properties, ...)
66  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
67 
68  // Get hold of the TBBSvc. This should initialize the thread pool
69  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
70  if ( !m_threadPoolSvc.isValid() ) {
71  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
72  return StatusCode::FAILURE;
73  }
74 
75  // Activate the scheduler in another thread.
76  info() << "Activating scheduler in a separate thread" << endmsg;
78 
79  while ( m_isActive != ACTIVE ) {
80  if ( m_isActive == FAILURE ) {
81  fatal() << "Terminating initialization" << endmsg;
82  return StatusCode::FAILURE;
83  } else {
84  info() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
85  sleep( 1 );
86  }
87  }
88 
89  if ( m_enableCondSvc ) {
90  // Get hold of the CondSvc
91  m_condSvc = serviceLocator()->service( "CondSvc" );
92  if ( !m_condSvc.isValid() ) {
93  warning() << "No CondSvc found, or not enabled. "
94  << "Will not manage CondAlgorithms" << endmsg;
95  m_enableCondSvc = false;
96  }
97  }
98 
99  // Get the algo resource pool
100  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
101  if ( !m_algResourcePool.isValid() ) {
102  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
103  return StatusCode::FAILURE;
104  }
105 
106  // Get the precedence service
107  m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
108  if ( !m_precSvc.isValid() ) {
109  fatal() << "Error retrieving PrecedenceSvc" << endmsg;
110  return StatusCode::FAILURE;
111  }
112  const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
113  if ( !precSvc ) {
114  fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
115  return StatusCode::FAILURE;
116  }
117 
118  m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
119  if ( !m_algExecStateSvc.isValid() ) {
120  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
121  return StatusCode::FAILURE;
122  }
123 
124  // Get Whiteboard
125  m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
126  if ( !m_whiteboard.isValid() ) {
127  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
128  return StatusCode::FAILURE;
129  }
130 
131  // Get dedicated scheduler for I/O-bound algorithms
132  if ( m_useIOBoundAlgScheduler ) {
133  m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
134  if ( !m_IOBoundAlgScheduler.isValid() )
135  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
136  }
137 
138  // Set the MaxEventsInFlight parameters from the number of WB stores
139  m_maxEventsInFlight = m_whiteboard->getNumberOfStores();
140 
141  // Set the number of free slots
142  m_freeSlots = m_maxEventsInFlight;
143 
144  // set global concurrency flags
146 
147  // Get the list of algorithms
148  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
149  const unsigned int algsNumber = algos.size();
150  info() << "Found " << algsNumber << " algorithms" << endmsg;
151 
152  /* Dependencies
153  1) Look for handles in algo, if none
154  2) Assume none are required
155  */
156 
157  DataObjIDColl globalInp, globalOutp;
158 
159  // figure out all outputs
160  for ( IAlgorithm* ialgoPtr : algos ) {
161  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
162  if ( !algoPtr ) {
163  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
164  }
165  for ( auto id : algoPtr->outputDataObjs() ) {
166  auto r = globalOutp.insert( id );
167  if ( !r.second ) {
168  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths "
169  "though, or control flow may guarantee only one runs...!"
170  << endmsg;
171  }
172  }
173  }
174 
175  std::ostringstream ostdd;
176  ostdd << "Data Dependencies for Algorithms:";
177 
178  std::vector<DataObjIDColl> m_algosDependencies;
179  for ( IAlgorithm* ialgoPtr : algos ) {
180  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
181  if ( nullptr == algoPtr ) {
182  fatal() << "Could not convert IAlgorithm into Algorithm for " << ialgoPtr->name()
183  << ": this will result in a crash." << endmsg;
184  return StatusCode::FAILURE;
185  }
186 
187  ostdd << "\n " << algoPtr->name();
188 
189  DataObjIDColl algoDependencies;
190  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
191  for ( const DataObjID* idp : sortedDataObjIDColl( algoPtr->inputDataObjs() ) ) {
192  DataObjID id = *idp;
193  ostdd << "\n o INPUT " << id;
194  if ( id.key().find( ":" ) != std::string::npos ) {
195  ostdd << " contains alternatives which require resolution...\n";
196  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
197  auto itok = std::find_if( tokens.begin(), tokens.end(), [&]( const std::string& t ) {
198  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
199  } );
200  if ( itok != tokens.end() ) {
201  ostdd << "found matching output for " << *itok << " -- updating scheduler info\n";
202  id.updateKey( *itok );
203  } else {
204  error() << "failed to find alternate in global output list"
205  << " for id: " << id << " in Alg " << algoPtr->name() << endmsg;
206  m_showDataDeps = true;
207  }
208  }
209  algoDependencies.insert( id );
210  globalInp.insert( id );
211  }
212  for ( const DataObjID* id : sortedDataObjIDColl( algoPtr->outputDataObjs() ) ) {
213  ostdd << "\n o OUTPUT " << *id;
214  if ( id->key().find( ":" ) != std::string::npos ) {
215  error() << " in Alg " << algoPtr->name() << " alternatives are NOT allowed for outputs! id: " << *id
216  << endmsg;
217  m_showDataDeps = true;
218  }
219  }
220  } else {
221  ostdd << "\n none";
222  }
223  m_algosDependencies.emplace_back( algoDependencies );
224  }
225 
226  if ( m_showDataDeps ) {
227  info() << ostdd.str() << endmsg;
228  }
229 
230  // Fill the containers to convert algo names to index
231  m_algname_vect.resize( algsNumber );
232  IAlgorithm* dataLoaderAlg( nullptr );
233  for ( IAlgorithm* algo : algos ) {
234  const std::string& name = algo->name();
235  auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
236  m_algname_index_map[name] = index;
237  m_algname_vect.at( index ) = name;
238  if ( algo->name() == m_useDataLoader ) {
239  dataLoaderAlg = algo;
240  }
241  }
242 
243  // Check if we have unmet global input dependencies
244  if ( m_checkDeps ) {
245  DataObjIDColl unmetDep;
246  for ( auto o : globalInp ) {
247  if ( globalOutp.find( o ) == globalOutp.end() ) {
248  unmetDep.insert( o );
249  }
250  }
251 
252  if ( unmetDep.size() > 0 ) {
253 
254  std::ostringstream ost;
255  for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
256  ost << "\n o " << *o << " required by Algorithm: ";
257  for ( size_t i = 0; i < m_algosDependencies.size(); ++i ) {
258  if ( m_algosDependencies[i].find( *o ) != m_algosDependencies[i].end() ) {
259  ost << "\n * " << m_algname_vect[i];
260  }
261  }
262  }
263 
264  if ( m_useDataLoader != "" ) {
265  // Find the DataLoader Alg
266  if ( dataLoaderAlg == nullptr ) {
267  fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
268  << "\" found, and unmet INPUT dependencies "
269  << "detected:\n"
270  << ost.str() << endmsg;
271  return StatusCode::FAILURE;
272  }
273 
274  info() << "Will attribute the following unmet INPUT dependencies to \"" << dataLoaderAlg->type() << "/"
275  << dataLoaderAlg->name() << "\" Algorithm" << ost.str() << endmsg;
276 
277  // Set the property Load of DataLoader Alg
278  Algorithm* dataAlg = dynamic_cast<Algorithm*>( dataLoaderAlg );
279  if ( !dataAlg ) {
280  fatal() << "Unable to dcast DataLoader \"" << m_useDataLoader.value() << "\" IAlg to Algorithm" << endmsg;
281  return StatusCode::FAILURE;
282  }
283 
284  for ( auto& id : unmetDep ) {
285  debug() << "adding OUTPUT dep \"" << id << "\" to " << dataLoaderAlg->type() << "/" << dataLoaderAlg->name()
286  << endmsg;
288  }
289 
290  } else {
291  fatal() << "Auto DataLoading not requested, "
292  << "and the following unmet INPUT dependencies were found:" << ost.str() << endmsg;
293  return StatusCode::FAILURE;
294  }
295 
296  } else {
297  info() << "No unmet INPUT data dependencies were found" << endmsg;
298  }
299  }
300 
301  // Shortcut for the message service
302  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
303  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
304 
305  m_eventSlots.assign( m_maxEventsInFlight, EventSlot( m_algosDependencies, algsNumber,
306  precSvc->getRules()->getControlFlowNodeCounter(), messageSvc ) );
307  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
308 
309  if ( m_threadPoolSize > 1 ) {
310  m_maxAlgosInFlight = (size_t)m_threadPoolSize;
311  }
312 
313  // Clearly inform about the level of concurrency
314  info() << "Concurrency level information:" << endmsg;
315  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
316  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
317 
318  if ( m_showControlFlow ) m_precSvc->dumpControlFlow();
319 
320  if ( m_showDataFlow ) m_precSvc->dumpDataFlow();
321 
322  // Simulate execution flow
323  if ( m_simulateExecution ) m_precSvc->simulate( m_eventSlots[0] );
324 
325  return sc;
326 }
327 //---------------------------------------------------------------------------
328 
333 {
334 
336  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
337 
338  sc = deactivate();
339  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
340 
341  info() << "Joining Scheduler thread" << endmsg;
342  m_thread.join();
343 
344  // Final error check after thread pool termination
345  if ( m_isActive == FAILURE ) {
346  error() << "problems in scheduler thread" << endmsg;
347  return StatusCode::FAILURE;
348  }
349 
350  return sc;
351 }
352 //---------------------------------------------------------------------------
364 {
365 
366  if ( msgLevel( MSG::DEBUG ) ) debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
367 
368  if ( m_threadPoolSvc->initPool( m_threadPoolSize ).isFailure() ) {
369  error() << "problems initializing ThreadPoolSvc" << endmsg;
370  m_isActive = FAILURE;
371  return;
372  }
373 
374  // Wait for actions pushed into the queue by finishing tasks.
375  action thisAction;
377 
378  m_isActive = ACTIVE;
379 
380  // Continue to wait if the scheduler is running or there is something to do
381  info() << "Start checking the actionsQueue" << endmsg;
382  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
383  m_actionsQueue.pop( thisAction );
384  sc = thisAction();
385  if ( sc != StatusCode::SUCCESS )
386  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
387  else
388  verbose() << "Action succeeded." << endmsg;
389  }
390 
391  info() << "Terminating thread-pool resources" << endmsg;
392  if ( m_threadPoolSvc->terminatePool().isFailure() ) {
393  error() << "Problems terminating thread pool" << endmsg;
394  m_isActive = FAILURE;
395  }
396 }
397 
398 //---------------------------------------------------------------------------
399 
407 {
408 
409  if ( m_isActive == ACTIVE ) {
410  // Drain the scheduler
411  m_actionsQueue.push( std::bind( &AvalancheSchedulerSvc::m_drain, this ) );
412  // This would be the last action
413  m_actionsQueue.push( [this]() -> StatusCode {
414  m_isActive = INACTIVE;
415  return StatusCode::SUCCESS;
416  } );
417  }
418 
419  return StatusCode::SUCCESS;
420 }
421 
422 //===========================================================================
423 
424 //===========================================================================
425 // Utils and shortcuts
426 
427 inline const std::string& AvalancheSchedulerSvc::index2algname( unsigned int index ) { return m_algname_vect[index]; }
428 
429 //---------------------------------------------------------------------------
430 
431 inline unsigned int AvalancheSchedulerSvc::algname2index( const std::string& algoname )
432 {
433  unsigned int index = m_algname_index_map[algoname];
434  return index;
435 }
436 
437 //===========================================================================
438 // EventSlot management
446 {
447 
448  if ( m_first ) {
449  m_first = false;
450  }
451 
452  if ( !eventContext ) {
453  fatal() << "Event context is nullptr" << endmsg;
454  return StatusCode::FAILURE;
455  }
456 
457  if ( m_freeSlots.load() == 0 ) {
458  if ( msgLevel( MSG::DEBUG ) ) debug() << "A free processing slot could not be found." << endmsg;
459  return StatusCode::FAILURE;
460  }
461 
462  // no problem as push new event is only called from one thread (event loop manager)
463  m_freeSlots--;
464 
465  auto action = [this, eventContext]() -> StatusCode {
466  // Event processing slot forced to be the same as the wb slot
467  const unsigned int thisSlotNum = eventContext->slot();
468  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
469  if ( !thisSlot.complete ) {
470  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
471  return StatusCode::FAILURE;
472  }
473 
474  debug() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
475  thisSlot.reset( eventContext );
476 
477  // promote to CR and DR the initial set of algorithms
478  Cause cs = {Cause::source::Root, "RootDecisionHub"};
479  m_precSvc->iterate( thisSlot, cs );
480 
481  return this->updateStates( thisSlotNum );
482  }; // end of lambda
483 
484  // Kick off the scheduling!
485  if ( msgLevel( MSG::VERBOSE ) ) {
486  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
487  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
488  }
489  m_actionsQueue.push( action );
490 
491  return StatusCode::SUCCESS;
492 }
493 
494 //---------------------------------------------------------------------------
496 {
497  StatusCode sc;
498  for ( auto context : eventContexts ) {
499  sc = pushNewEvent( context );
500  if ( sc != StatusCode::SUCCESS ) return sc;
501  }
502  return sc;
503 }
504 
505 //---------------------------------------------------------------------------
506 unsigned int AvalancheSchedulerSvc::freeSlots() { return std::max( m_freeSlots.load(), 0 ); }
507 
508 //---------------------------------------------------------------------------
513 {
514 
515  unsigned int slotNum = 0;
516  for ( auto& thisSlot : m_eventSlots ) {
517  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
518  updateStates( slotNum );
519  }
520  slotNum++;
521  }
522  return StatusCode::SUCCESS;
523 }
524 
525 //---------------------------------------------------------------------------
530 {
531  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
532  if ( m_freeSlots.load() == (int)m_maxEventsInFlight or m_isActive == INACTIVE ) {
533  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
534  // << " active: " << m_isActive << endmsg;
535  return StatusCode::FAILURE;
536  } else {
537  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
538  // << " active: " << m_isActive << endmsg;
539  m_finishedEvents.pop( eventContext );
540  m_freeSlots++;
541  if ( msgLevel( MSG::DEBUG ) )
542  debug() << "Popped slot " << eventContext->slot() << "(event " << eventContext->evt() << ")" << endmsg;
543  return StatusCode::SUCCESS;
544  }
545 }
546 
547 //---------------------------------------------------------------------------
552 {
553  if ( m_finishedEvents.try_pop( eventContext ) ) {
554  if ( msgLevel( MSG::DEBUG ) )
555  debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
556  << endmsg;
557  m_freeSlots++;
558  return StatusCode::SUCCESS;
559  }
560  return StatusCode::FAILURE;
561 }
562 
563 //---------------------------------------------------------------------------
570 {
571 
572  // Set the number of slots available to an error code
573  m_freeSlots.store( 0 );
574 
575  fatal() << "*** Event " << eventContext->evt() << " on slot " << eventContext->slot() << " failed! ***" << endmsg;
576 
577  std::ostringstream ost;
578  m_algExecStateSvc->dump( ost, *eventContext );
579 
580  info() << "Dumping Alg Exec State for slot " << eventContext->slot() << ":\n" << ost.str() << endmsg;
581 
582  dumpSchedulerState( -1 );
583 
584  // Empty queue and deactivate the service
585  action thisAction;
586  while ( m_actionsQueue.try_pop( thisAction ) ) {
587  };
588  deactivate();
589 
590  // Push into the finished events queue the failed context
591  EventContext* thisEvtContext;
592  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
593  m_finishedEvents.push( thisEvtContext );
594  };
595  m_finishedEvents.push( eventContext );
596 
597  return StatusCode::FAILURE;
598 }
599 
600 //===========================================================================
601 
602 //===========================================================================
603 // States Management
604 
615 {
616 
617  m_updateNeeded = true;
618 
619  StatusCode global_sc( StatusCode::FAILURE, true );
620 
621  // Sort from the oldest to the newest event
622  // Prepare a vector of pointers to the slots to avoid copies
623  std::vector<EventSlot*> eventSlotsPtrs;
624 
625  // Consider all slots if si <0 or just one otherwise
626  if ( si < 0 ) {
627  const int eventsSlotsSize( m_eventSlots.size() );
628  eventSlotsPtrs.reserve( eventsSlotsSize );
629  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
630  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
631  }
632  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
633  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
634  } else {
635  eventSlotsPtrs.push_back( &m_eventSlots[si] );
636  }
637 
638  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
639  int iSlot = thisSlotPtr->eventContext->slot();
640 
641  // Cache the states of the algos to improve readability and performance
642  auto& thisSlot = m_eventSlots[iSlot];
643  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
644 
645  // Perform the I->CR->DR transitions
646  if ( !algo_name.empty() ) {
647  Cause cs = {Cause::source::Task, algo_name};
648  m_precSvc->iterate( thisSlot, cs );
649  }
650 
651  StatusCode partial_sc( StatusCode::FAILURE, true );
652 
653  // Perform DR->SCHEDULED
654  if ( !m_optimizationMode.empty() ) {
655  auto comp_nodes = [this]( const uint& i, const uint& j ) {
656  return ( m_precSvc->getPriority( index2algname( i ) ) < m_precSvc->getPriority( index2algname( j ) ) );
657  };
659  comp_nodes, std::vector<uint>() );
660  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
661  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it )
662  buffer.push( *it );
663  /*std::stringstream s;
664  auto buffer2 = buffer;
665  while (!buffer2.empty()) {
666  s << m_precSvc->getPriority(index2algname(buffer2.top())) << ", ";
667  buffer2.pop();
668  }
669  info() << "DRBuffer is: [ " << s.str() << " ] <--" << algo_name << " executed" << endmsg;*/
670 
671  /*while (!buffer.empty()) {
672  partial_sc = promoteToScheduled(buffer.top(), iSlot);
673  if (partial_sc.isFailure()) {
674  if (msgLevel(MSG::VERBOSE))
675  verbose() << "Could not apply transition from "
676  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
677  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
678  if (m_useIOBoundAlgScheduler) {
679  partial_sc = promoteToAsyncScheduled(buffer.top(), iSlot);
680  if (msgLevel(MSG::VERBOSE))
681  if (partial_sc.isFailure())
682  verbose() << "[Asynchronous] Could not apply transition from "
683  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
684  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot <<
685  endmsg;
686  }
687  }
688  buffer.pop();
689  }*/
690  while ( !buffer.empty() ) {
691  bool IOBound = false;
692  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.top() ) );
693 
694  if ( !IOBound )
695  partial_sc = promoteToScheduled( buffer.top(), iSlot );
696  else
697  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot );
698 
699  if ( msgLevel( MSG::VERBOSE ) )
700  if ( partial_sc.isFailure() )
701  verbose() << "Could not apply transition from "
702  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY] << " for algorithm "
703  << index2algname( buffer.top() ) << " on processing slot " << iSlot << endmsg;
704 
705  buffer.pop();
706  }
707 
708  } else {
709  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
710  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
711  uint algIndex = *it;
712 
713  bool IOBound = false;
714  if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( algIndex ) );
715 
716  if ( !IOBound )
717  partial_sc = promoteToScheduled( algIndex, iSlot );
718  else
719  partial_sc = promoteToAsyncScheduled( algIndex, iSlot );
720 
721  if ( msgLevel( MSG::VERBOSE ) )
722  if ( partial_sc.isFailure() )
723  verbose() << "Could not apply transition from "
724  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY] << " for algorithm "
725  << index2algname( algIndex ) << " on processing slot " << iSlot << endmsg;
726  }
727  }
728 
729  if ( m_dumpIntraEventDynamics ) {
731  s << algo_name << ", " << thisAlgsStates.sizeOfSubset( State::CONTROLREADY ) << ", "
732  << thisAlgsStates.sizeOfSubset( State::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( State::SCHEDULED )
733  << ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
734  auto threads = ( m_threadPoolSize != -1 ) ? std::to_string( m_threadPoolSize )
735  : std::to_string( tbb::task_scheduler_init::default_num_threads() );
736  std::ofstream myfile;
737  myfile.open( "IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app );
738  myfile << s.str();
739  myfile.close();
740  }
741 
742  // Not complete because this would mean that the slot is already free!
743  if ( !thisSlot.complete && m_precSvc->CFRulesResolved( thisSlot ) &&
744  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
745  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
746  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
747 
748  thisSlot.complete = true;
749  // if the event did not fail, add it to the finished events
750  // otherwise it is taken care of in the error handling already
751  if ( m_algExecStateSvc->eventStatus( *thisSlot.eventContext ) == EventStatus::Success ) {
752  m_finishedEvents.push( thisSlot.eventContext );
753  if ( msgLevel( MSG::DEBUG ) )
754  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot " << thisSlot.eventContext->slot()
755  << ")." << endmsg;
756  }
757 
758  // now let's return the fully evaluated result of the control flow
759  if ( msgLevel( MSG::DEBUG ) ) debug() << m_precSvc->printState( thisSlot ) << endmsg;
760 
761  thisSlot.eventContext = nullptr;
762  } else {
763  StatusCode eventStalledSC = isStalled( iSlot );
764  if ( !eventStalledSC.isSuccess() ) {
765  m_algExecStateSvc->setEventStatus( EventStatus::AlgStall, *thisSlot.eventContext );
766  eventFailed( thisSlot.eventContext ).ignore();
767  }
768  }
769  } // end loop on slots
770 
771  verbose() << "States Updated." << endmsg;
772 
773  return global_sc;
774 }
775 
776 //---------------------------------------------------------------------------
777 
785 {
786  // Get the slot
787  EventSlot& thisSlot = m_eventSlots[iSlot];
788 
789  if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
791 
792  info() << "About to declare a stall" << endmsg;
793  fatal() << "*** Stall detected! ***\n" << endmsg;
794  dumpSchedulerState( iSlot );
795  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
796 
797  return StatusCode::FAILURE;
798  }
799  return StatusCode::SUCCESS;
800 }
801 
802 //---------------------------------------------------------------------------
803 
810 {
811 
812  // To have just one big message
813  std::ostringstream outputMessageStream;
814 
815  outputMessageStream << "============================== Execution Task State ============================="
816  << std::endl;
817  dumpState( outputMessageStream );
818 
819  outputMessageStream << std::endl
820  << "============================== Scheduler State ================================="
821  << std::endl;
822 
823  int slotCount = -1;
824  for ( auto& thisSlot : m_eventSlots ) {
825  slotCount++;
826  if ( thisSlot.complete ) continue;
827 
828  // dump temporal and topological precedence analysis (if enabled in the PrecedenceSvc)
829  if ( msgLevel( MSG::DEBUG ) ) m_precSvc->dumpPrecedenceRules( thisSlot );
830 
831  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
832  << " event: " << thisSlot.eventContext->evt() << " -----------" << std::endl;
833 
834  if ( 0 > iSlot or iSlot == slotCount ) {
835  outputMessageStream << "Algorithms states:" << std::endl;
836 
837  const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
838  for ( unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
839  outputMessageStream << " o " << index2algname( algoIdx ) << " ["
840  << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]] << "] Data deps: ";
841  DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
842  const int depsSize = deps.size();
843  if ( depsSize == 0 ) outputMessageStream << " none";
844 
845  DataObjIDColl missing;
846  for ( auto d : deps ) {
847  outputMessageStream << d << " ";
848  if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
849  // outputMessageStream << "[missing] ";
850  missing.insert( d );
851  }
852  }
853 
854  if ( !missing.empty() ) {
855  outputMessageStream << ". The following are missing: ";
856  for ( auto d : missing ) {
857  outputMessageStream << d << " ";
858  }
859  }
860 
861  outputMessageStream << std::endl;
862  }
863 
864  // Snapshot of the WhiteBoard
865  outputMessageStream << "\nWhiteboard contents: " << std::endl;
866  for ( auto& product : wbSlotContent ) outputMessageStream << " o " << product << std::endl;
867 
868  // Snapshot of the ControlFlow
869  outputMessageStream << "\nControl Flow:" << std::endl;
870  outputMessageStream << m_precSvc->printState( thisSlot ) << std::endl;
871  }
872  }
873 
874  outputMessageStream << "=================================== END ======================================" << std::endl;
875 
876  info() << "Dumping Scheduler State " << std::endl << outputMessageStream.str() << endmsg;
877 }
878 
879 //---------------------------------------------------------------------------
880 
882 {
883 
884  if ( m_algosInFlight == m_maxAlgosInFlight ) return StatusCode::FAILURE;
885 
886  const std::string& algName( index2algname( iAlgo ) );
887  IAlgorithm* ialgoPtr = nullptr;
888  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
889 
890  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
891  EventContext* eventContext( m_eventSlots[si].eventContext );
892  if ( !eventContext ) {
893  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si << ")" << endmsg;
894  return StatusCode::FAILURE;
895  }
896 
897  ++m_algosInFlight;
898  auto promote2ExecutedClosure = std::bind( &AvalancheSchedulerSvc::promoteToExecuted, this, iAlgo,
899  eventContext->slot(), ialgoPtr, eventContext );
900  // Avoid to use tbb if the pool size is 1 and run in this thread
901  if ( -100 != m_threadPoolSize ) {
902 
903  // this parent task is needed to promote an Algorithm as EXECUTED,
904  // it will be started as soon as the child task (see below) is completed
905  tbb::task* triggerAlgoStateUpdate =
906  new ( tbb::task::allocate_root() ) enqueueSchedulerActionTask( this, promote2ExecutedClosure );
907  // setting parent's refcount to 1 is made here only for consistency
908  // (in this case since it is not scheduled explicitly and there it has only one child task)
909  triggerAlgoStateUpdate->set_ref_count( 1 );
910  // the child task that executes an Algorithm
911  tbb::task* algoTask = new ( triggerAlgoStateUpdate->allocate_child() )
912  AlgoExecutionTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
913  // schedule the algoTask
914  tbb::task::enqueue( *algoTask );
915 
916  } else {
917  AlgoExecutionTask theTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
918  theTask.execute();
919  promote2ExecutedClosure();
920  }
921 
922  if ( msgLevel( MSG::DEBUG ) )
923  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot " << si
924  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
925 
926  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
927 
928  if ( msgLevel( MSG::VERBOSE ) ) dumpSchedulerState( -1 );
929 
930  if ( updateSc.isSuccess() )
931  if ( msgLevel( MSG::VERBOSE ) )
932  verbose() << "Promoting " << index2algname( iAlgo ) << " to SCHEDULED on slot " << si << endmsg;
933  return updateSc;
934  } else {
935  if ( msgLevel( MSG::DEBUG ) )
936  debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si << endmsg;
937  return sc;
938  }
939 }
940 
941 //---------------------------------------------------------------------------
942 
944 {
945 
946  if ( m_IOBoundAlgosInFlight == m_maxIOBoundAlgosInFlight ) return StatusCode::FAILURE;
947 
948  // bool IOBound = m_precSvc->isBlocking(algName);
949 
950  const std::string& algName( index2algname( iAlgo ) );
951  IAlgorithm* ialgoPtr = nullptr;
952  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
953 
954  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
955  EventContext* eventContext( m_eventSlots[si].eventContext );
956  if ( !eventContext ) {
957  fatal() << "[Asynchronous] Event context for algorithm " << algName << " is a nullptr (slot " << si << ")"
958  << endmsg;
959  return StatusCode::FAILURE;
960  }
961 
962  ++m_IOBoundAlgosInFlight;
963  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from
964  // tbb::task)? it seems it works..
965  IOBoundAlgTask* theTask = new ( tbb::task::allocate_root() )
966  IOBoundAlgTask( ialgoPtr, eventContext, serviceLocator(), m_algExecStateSvc );
967  m_IOBoundAlgScheduler->push( *theTask );
968 
969  if ( msgLevel( MSG::DEBUG ) )
970  debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event " << eventContext->evt()
971  << " in slot " << si << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
972 
973  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
974 
975  if ( updateSc.isSuccess() )
976  if ( msgLevel( MSG::VERBOSE ) )
977  verbose() << "[Asynchronous] Promoting " << index2algname( iAlgo ) << " to SCHEDULED on slot " << si << endmsg;
978  return updateSc;
979  } else {
980  if ( msgLevel( MSG::DEBUG ) )
981  debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot "
982  << si << endmsg;
983  return sc;
984  }
985 }
986 
987 //---------------------------------------------------------------------------
992  EventContext* eventContext )
993 {
994  // Put back the instance
995  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
996  if ( !castedAlgo ) fatal() << "The casting did not succeed!" << endmsg;
997  // EventContext* eventContext = castedAlgo->getContext();
998 
999  // Check if the execution failed
1000  if ( m_algExecStateSvc->eventStatus( *eventContext ) != EventStatus::Success ) eventFailed( eventContext ).ignore();
1001 
1002  Gaudi::Hive::setCurrentContext( eventContext );
1003  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1004 
1005  if ( !sc.isSuccess() ) {
1006  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1007  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1008  return StatusCode::FAILURE;
1009  }
1010 
1011  m_algosInFlight--;
1012 
1013  EventSlot& thisSlot = m_eventSlots[si];
1014 
1015  if ( msgLevel( MSG::DEBUG ) )
1016  debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
1017  << m_algosInFlight << endmsg;
1018 
1019  // Schedule an update of the status of the algorithms
1020  auto updateAction = std::bind( &AvalancheSchedulerSvc::updateStates, this, -1, algo->name() );
1021  m_actionsQueue.push( updateAction );
1022  m_updateNeeded = false;
1023 
1024  if ( msgLevel( MSG::DEBUG ) )
1025  debug() << "Trying to handle execution result of " << index2algname( iAlgo ) << " on slot " << si << endmsg;
1026  State state;
1027  if ( algo->filterPassed() ) {
1028  state = State::EVTACCEPTED;
1029  } else {
1030  state = State::EVTREJECTED;
1031  }
1032 
1033  sc = thisSlot.algsStates.updateState( iAlgo, state );
1034 
1035  if ( sc.isSuccess() )
1036  if ( msgLevel( MSG::VERBOSE ) )
1037  verbose() << "Promoting " << index2algname( iAlgo ) << " on slot " << si << " to "
1039 
1040  return sc;
1041 }
1042 
1043 //---------------------------------------------------------------------------
1048  EventContext* eventContext )
1049 {
1050  // Put back the instance
1051  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
1052  if ( !castedAlgo ) fatal() << "[Asynchronous] The casting did not succeed!" << endmsg;
1053  // EventContext* eventContext = castedAlgo->getContext();
1054 
1055  // Check if the execution failed
1056  if ( m_algExecStateSvc->eventStatus( *eventContext ) != EventStatus::Success ) eventFailed( eventContext ).ignore();
1057 
1058  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1059 
1060  if ( !sc.isSuccess() ) {
1061  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1062  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1063  return StatusCode::FAILURE;
1064  }
1065 
1066  m_IOBoundAlgosInFlight--;
1067 
1068  EventSlot& thisSlot = m_eventSlots[si];
1069 
1070  if ( msgLevel( MSG::DEBUG ) )
1071  debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1072  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1073 
1074  // Schedule an update of the status of the algorithms
1075  auto updateAction = std::bind( &AvalancheSchedulerSvc::updateStates, this, -1, algo->name() );
1076  m_actionsQueue.push( updateAction );
1077  m_updateNeeded = false;
1078 
1079  if ( msgLevel( MSG::DEBUG ) )
1080  debug() << "[Asynchronous] Trying to handle execution result of " << index2algname( iAlgo ) << " on slot " << si
1081  << endmsg;
1082  State state;
1083  if ( algo->filterPassed() ) {
1084  state = State::EVTACCEPTED;
1085  } else {
1086  state = State::EVTREJECTED;
1087  }
1088 
1089  sc = thisSlot.algsStates.updateState( iAlgo, state );
1090 
1091  if ( sc.isSuccess() )
1092  if ( msgLevel( MSG::VERBOSE ) )
1093  verbose() << "[Asynchronous] Promoting " << index2algname( iAlgo ) << " on slot " << si << " to "
1095 
1096  return sc;
1097 }
1098 
1099 //===========================================================================
1101 {
1102 
1103  std::lock_guard<std::mutex> lock( m_ssMut );
1104  m_sState.push_back( SchedulerState( a, e, t ) );
1105 }
1106 
1107 //===========================================================================
1109 {
1110 
1111  std::lock_guard<std::mutex> lock( m_ssMut );
1112 
1113  for ( std::list<SchedulerState>::iterator itr = m_sState.begin(); itr != m_sState.end(); ++itr ) {
1114  if ( *itr == a ) {
1115  m_sState.erase( itr );
1116  return true;
1117  }
1118  }
1119 
1120  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
1121  return false;
1122 }
1123 
1124 //===========================================================================
1126 {
1127 
1128  std::lock_guard<std::mutex> lock( m_ssMut );
1129 
1130  for ( auto it : m_sState ) {
1131  ost << " " << it << std::endl;
1132  }
1133 }
1134 
1135 //===========================================================================
1137 {
1138 
1139  std::lock_guard<std::mutex> lock( m_ssMut );
1140 
1141  std::ostringstream ost;
1142  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1143  dumpState( ost );
1144 
1145  info() << ost.str() << endmsg;
1146 }
bool algsPresent(State state) const
const concurrency::PrecedenceRulesGraph * getRules() const
Precedence rules accessor.
Definition: PrecedenceSvc.h:68
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
StatusCode initialize() override
Definition: Service.cpp:64
const unsigned int & getAlgoIndex() const
Get algorithm index.
T empty(T...args)
T open(T...args)
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:731
StatusCode finalize() override
Definition: Service.cpp:174
ContextID_t slot() const
Definition: EventContext.h:40
StatusCode initialize() override
Initialise.
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
Algorithm promotion.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:75
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
A service to resolve the task execution precedence.
Definition: PrecedenceSvc.h:21
Header file for class GaudiAlgorithm.
T to_string(T...args)
T endl(T...args)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
void activate()
Activate scheduler.
T end(T...args)
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
size_t sizeOfSubset(State state) const
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si)
This class represents an entry point to all the event specific data.
Definition: EventContext.h:24
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:86
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
void addAlg(Algorithm *, EventContext *, pthread_t)
virtual const std::string & type() const =0
The type of the algorithm.
tbb::task * execute() override
ContextEvt_t evt() const
Definition: EventContext.h:39
STL class.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
T push_back(T...args)
static std::list< SchedulerState > m_sState
STL class.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:28
const DataObjIDColl & inputDataObjs() const override
T close(T...args)
T bind(T...args)
StatusCode finalize() override
Finalise.
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:211
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
T max(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
GAUDI_API void setCurrentContext(const EventContext *ctx)
T insert(T...args)
void addDependency(const DataObjID &id, const Gaudi::DataHandle::Mode &mode) override
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find_if(T...args)
T size(T...args)
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
STL class.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:25
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:68
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
T begin(T...args)
Iterator begin(State kind)
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
Definition: EventSlot.h:11
string s
Definition: gaudirun.py:253
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
unsigned int freeSlots() override
Get free slots number.
T sort(T...args)
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
State
Execution states of the algorithms.
T for_each(T...args)
std::string fullKey() const
Definition: DataObjID.cpp:54
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:209
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
unsigned int getControlFlowNodeCounter() const
Get total number of control flow graph nodes.
T reserve(T...args)
static std::map< State, std::string > stateNames
T emplace_back(T...args)
StatusCode m_drain()
Drain the actions present in the queue.
Iterator end(State kind)
StatusCode updateState(unsigned int iAlgo, State newState)