AvalancheSchedulerSvc.cpp
Go to the documentation of this file.
2 #include "AlgResourcePool.h"
3 #include "AlgoExecutionTask.h"
4 #include "PRGraphVisitors.h"
5 #include "IOBoundAlgTask.h"
6 
7 // Framework includes
9 #include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface
11 #include "GaudiKernel/IAlgorithm.h"
13 #include "GaudiKernel/SvcFactory.h"
16 
17 // C++
18 #include <algorithm>
19 #include <map>
20 #include <queue>
21 #include <sstream>
22 #include <unordered_set>
23 
24 // External libs
25 #include "boost/thread.hpp"
26 #include "boost/tokenizer.hpp"
27 #include "boost/algorithm/string.hpp"
28 // DP waiting for the TBB service
29 #include "tbb/task_scheduler_init.h"
30 
33 
34 // Instantiation of a static factory class used by clients to create instances of this service
36 
37 //===========================================================================
38 // Infrastructure methods
39 
40 
46 
47  // Initialise mother class (read properties, ...)
49  if ( !sc.isSuccess() ) warning() << "Base class could not be initialized" << endmsg;
50 
51  // Get hold of the TBBSvc. This should initialize the thread pool
52  m_threadPoolSvc = serviceLocator()->service( "ThreadPoolSvc" );
53  if ( !m_threadPoolSvc.isValid() ) {
54  fatal() << "Error retrieving ThreadPoolSvc" << endmsg;
55  return StatusCode::FAILURE;
56  }
57 
58  // Activate the scheduler in another thread.
59  info() << "Activating scheduler in a separate thread" << endmsg;
61 
62  while ( m_isActive != ACTIVE ) {
63  if ( m_isActive == FAILURE ) {
64  fatal() << "Terminating initialization" << endmsg;
65  return StatusCode::FAILURE;
66  } else {
67  info() << "Waiting for AvalancheSchedulerSvc to activate" << endmsg;
68  sleep( 1 );
69  }
70  }
71 
72  // Get the algo resource pool
73  m_algResourcePool = serviceLocator()->service( "AlgResourcePool" );
74  if ( !m_algResourcePool.isValid() ) {
75  fatal() << "Error retrieving AlgoResourcePool" << endmsg;
76  return StatusCode::FAILURE;
77  }
78 
79  m_algExecStateSvc = serviceLocator()->service("AlgExecStateSvc");
80  if (!m_algExecStateSvc.isValid()) {
81  fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
82  return StatusCode::FAILURE;
83  }
84 
85  // Get Whiteboard
86  m_whiteboard = serviceLocator()->service( m_whiteboardSvcName );
87  if ( !m_whiteboard.isValid() ) {
88  fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg;
89  return StatusCode::FAILURE;
90  }
91 
92  // Get dedicated scheduler for I/O-bound algorithms
93  if ( m_useIOBoundAlgScheduler ) {
94  m_IOBoundAlgScheduler = serviceLocator()->service( m_IOBoundAlgSchedulerSvcName );
95  if ( !m_IOBoundAlgScheduler.isValid() )
96  fatal() << "Error retrieving IOBoundSchedulerAlgSvc interface IAccelerator." << endmsg;
97  }
98 
99  // Check the MaxEventsInFlight parameters and react
100  // Deprecated for the moment
101  size_t numberOfWBSlots = m_whiteboard->getNumberOfStores();
102  if ( m_maxEventsInFlight != 0 ) {
103  warning() << "Property MaxEventsInFlight was set. This works but it's deprecated. "
104  << "Please migrate your code options files." << endmsg;
105 
106  if ( m_maxEventsInFlight != (int)numberOfWBSlots ) {
107  warning() << "In addition, the number of events in flight (" << m_maxEventsInFlight
108  << ") differs from the slots in the whiteboard (" << numberOfWBSlots
109  << "). Setting the number of events in flight to " << numberOfWBSlots << endmsg;
110  }
111  }
112 
113  // Align the two quantities
114  m_maxEventsInFlight = numberOfWBSlots;
115 
116  // Set the number of free slots
117  m_freeSlots = m_maxEventsInFlight;
118 
119  // set global concurrency flags
121 
122  if ( m_algosDependencies.size() != 0 ) {
123  warning() << " ##### Property AlgosDependencies is deprecated and ignored."
124  << " FIX your job options #####" << endmsg;
125  }
126 
127  // Get the list of algorithms
128  const std::list<IAlgorithm*>& algos = m_algResourcePool->getFlatAlgList();
129  const unsigned int algsNumber = algos.size();
130  info() << "Found " << algsNumber << " algorithms" << endmsg;
131 
132  /* Dependencies
133  1) Look for handles in algo, if none
134  2) Assume none are required
135  */
136 
137  DataObjIDColl globalInp, globalOutp;
138 
139  // figure out all outputs
140  for (IAlgorithm* ialgoPtr : algos) {
141  Algorithm* algoPtr = dynamic_cast<Algorithm*>(ialgoPtr);
142  if (!algoPtr) {
143  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
144  }
145  for (auto id : algoPtr->outputDataObjs()) {
146  auto r = globalOutp.insert(id);
147  if (!r.second) {
148  warning() << "multiple algorithms declare " << id << " as output! could be a single instance in multiple paths though, or control flow may guarantee only one runs...!" << endmsg;
149  }
150  }
151  }
152  info() << "outputs:\n" ;
153  for (const auto& i : globalOutp ) {
154  info() << i << '\n' ;
155  }
156  info() << endmsg;
157 
158 
159 
160  info() << "Data Dependencies for Algorithms:";
161 
162  std::vector<DataObjIDColl> m_algosDependencies;
163  for ( IAlgorithm* ialgoPtr : algos ) {
164  Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
165  if ( nullptr == algoPtr )
166  fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg;
167 
168  info() << "\n " << algoPtr->name();
169 
170  // FIXME
171  DataObjIDColl algoDependencies;
172  if ( !algoPtr->inputDataObjs().empty() || !algoPtr->outputDataObjs().empty() ) {
173  for ( auto id : algoPtr->inputDataObjs() ) {
174  info() << "\n o INPUT " << id;
175  if (id.key().find(":")!=std::string::npos) {
176  info() << " contains alternatives which require resolution... " << endmsg;
177  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(),boost::char_separator<char>{":"}};
178  auto itok = std::find_if( tokens.begin(), tokens.end(),
179  [&](const std::string& t) {
180  return globalOutp.find( DataObjID{t} ) != globalOutp.end();
181  } );
182  if (itok!=tokens.end()) {
183  info() << "found matching output for " << *itok << " -- updating scheduler info" << endmsg;
184  id.updateKey(*itok);
185  } else {
186  error() << "failed to find alternate in global output list" << endmsg;
187  }
188  }
189  algoDependencies.insert( id );
190  globalInp.insert( id );
191  }
192  for ( auto id : algoPtr->outputDataObjs() ) {
193  info() << "\n o OUTPUT " << id;
194  if (id.key().find(":")!=std::string::npos) {
195  info() << " alternatives are NOT allowed for outputs..." << endmsg;
196  }
197  }
198  } else {
199  info() << "\n none";
200  }
201  m_algosDependencies.emplace_back( algoDependencies );
202  }
203  info() << endmsg;
204 
205  // Fill the containers to convert algo names to index
206  m_algname_vect.reserve( algsNumber );
207  unsigned int index = 0;
208  for ( IAlgorithm* algo : algos ) {
209  const std::string& name = algo->name();
210  m_algname_index_map[name] = index;
211  m_algname_vect.emplace_back( name );
212  index++;
213  }
214 
215  // Check if we have unmet global input dependencies
216  if ( m_checkDeps ) {
217  DataObjIDColl unmetDep;
218  for ( auto o : globalInp ) {
219  if ( globalOutp.find( o ) == globalOutp.end() ) {
220  unmetDep.insert( o );
221  }
222  }
223 
224  if ( unmetDep.size() > 0 ) {
225  fatal() << "The following unmet INPUT data dependencies were found: ";
226  for ( auto& o : unmetDep ) {
227  fatal() << "\n o " << o << " required by Algorithm: ";
228  for ( size_t i = 0; i < m_algosDependencies.size(); ++i ) {
229  if ( m_algosDependencies[i].find( o ) != m_algosDependencies[i].end() ) {
230  fatal() << "\n * " << m_algname_vect[i];
231  }
232  }
233  }
234  fatal() << endmsg;
235  return StatusCode::FAILURE;
236  } else {
237  info() << "No unmet INPUT data dependencies were found" << endmsg;
238  }
239  }
240 
241  // prepare the control flow part
242  const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>( m_algResourcePool.get() );
243  sc = m_efManager.initialize( algPool->getPRGraph(), m_algname_index_map, m_eventSlots, m_optimizationMode );
244  unsigned int controlFlowNodeNumber = m_efManager.getPrecedenceRulesGraph()->getControlFlowNodeCounter();
245 
246  // Shortcut for the message service
247  SmartIF<IMessageSvc> messageSvc( serviceLocator() );
248  if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
249 
250  m_eventSlots.assign( m_maxEventsInFlight,
251  EventSlot( m_algosDependencies, algsNumber, controlFlowNodeNumber, messageSvc ) );
252  std::for_each( m_eventSlots.begin(), m_eventSlots.end(), []( EventSlot& slot ) { slot.complete = true; } );
253 
254  // Clearly inform about the level of concurrency
255  info() << "Concurrency level information:" << endmsg;
256  info() << " o Number of events in flight: " << m_maxEventsInFlight << endmsg;
257  info() << " o Number of algorithms in flight: " << m_maxAlgosInFlight << endmsg;
258  info() << " o TBB thread pool size: " << m_threadPoolSize << endmsg;
259 
260  // Simulating execution flow by only analyzing the graph topology and logic
261  if ( m_simulateExecution ) {
262  auto vis = concurrency::RunSimulator( 0 );
263  m_efManager.simulateExecutionFlow( vis );
264  }
265 
266  return sc;
267 }
268 //---------------------------------------------------------------------------
269 
274 
276  if ( !sc.isSuccess() ) warning() << "Base class could not be finalized" << endmsg;
277 
278  sc = deactivate();
279  if ( !sc.isSuccess() ) warning() << "Scheduler could not be deactivated" << endmsg;
280 
281  info() << "Joining Scheduler thread" << endmsg;
282  m_thread.join();
283 
284  // Final error check after thread pool termination
285  if ( m_isActive == FAILURE ) {
286  error() << "problems in scheduler thread" << endmsg;
287  return StatusCode::FAILURE;
288  }
289 
290  // m_efManager.getPrecedenceRulesGraph()->dumpExecutionPlan();
291 
292  return sc;
293 }
294 //---------------------------------------------------------------------------
306 
307  if (msgLevel(MSG::DEBUG))
308  debug() << "AvalancheSchedulerSvc::activate()" << endmsg;
309 
311  error() << "problems initializing ThreadPoolSvc" << endmsg;
313  return;
314  }
315 
316  // Wait for actions pushed into the queue by finishing tasks.
317  action thisAction;
319 
320  m_isActive = ACTIVE;
321 
322  // Continue to wait if the scheduler is running or there is something to do
323  info() << "Start checking the actionsQueue" << endmsg;
324  while ( m_isActive == ACTIVE or m_actionsQueue.size() != 0 ) {
325  m_actionsQueue.pop( thisAction );
326  sc = thisAction();
327  if ( sc != StatusCode::SUCCESS )
328  verbose() << "Action did not succeed (which is not bad per se)." << endmsg;
329  else
330  verbose() << "Action succeeded." << endmsg;
331  }
332 
333  info() << "Terminating thread-pool resources" << endmsg;
335  error() << "Problems terminating thread pool" << endmsg;
337  }
338 }
339 
340 //---------------------------------------------------------------------------
341 
349 
350  if ( m_isActive == ACTIVE ) {
351  // Drain the scheduler
353  // This would be the last action
354  m_actionsQueue.push( [this]() -> StatusCode {
356  return StatusCode::SUCCESS;
357  } );
358  }
359 
360  return StatusCode::SUCCESS;
361 }
362 
363 //===========================================================================
364 
365 //===========================================================================
366 // Utils and shortcuts
367 
368 inline const std::string& AvalancheSchedulerSvc::index2algname( unsigned int index ) {
369  return m_algname_vect[index];
370 }
371 
372 //---------------------------------------------------------------------------
373 
374 inline unsigned int AvalancheSchedulerSvc::algname2index( const std::string& algoname ) {
375  unsigned int index = m_algname_index_map[algoname];
376  return index;
377 }
378 
379 //===========================================================================
380 // EventSlot management
388 
389  if ( m_first ) {
390  m_first = false;
391  }
392 
393  if ( !eventContext ) {
394  fatal() << "Event context is nullptr" << endmsg;
395  return StatusCode::FAILURE;
396  }
397 
398  if ( m_freeSlots.load() == 0 ) {
399  if ( msgLevel( MSG::DEBUG ) ) debug() << "A free processing slot could not be found." << endmsg;
400  return StatusCode::FAILURE;
401  }
402 
403  // no problem as push new event is only called from one thread (event loop manager)
404  m_freeSlots--;
405 
406  auto action = [this, eventContext]() -> StatusCode {
407  // Event processing slot forced to be the same as the wb slot
408  const unsigned int thisSlotNum = eventContext->slot();
409  EventSlot& thisSlot = m_eventSlots[thisSlotNum];
410  if ( !thisSlot.complete ) {
411  fatal() << "The slot " << thisSlotNum << " is supposed to be a finished event but it's not" << endmsg;
412  return StatusCode::FAILURE;
413  }
414 
415  info() << "Executing event " << eventContext->evt() << " on slot " << thisSlotNum << endmsg;
416  thisSlot.reset( eventContext );
417 
418  // promote to CR and DR the initial set of algorithms
419  auto vis = concurrency::Trigger( thisSlotNum );
421 
422  return this->updateStates( thisSlotNum );
423  }; // end of lambda
424 
425  // Kick off the scheduling!
426  if ( msgLevel( MSG::VERBOSE ) ) {
427  verbose() << "Pushing the action to update the scheduler for slot " << eventContext->slot() << endmsg;
428  verbose() << "Free slots available " << m_freeSlots.load() << endmsg;
429  }
430  m_actionsQueue.push( action );
431 
432  return StatusCode::SUCCESS;
433 }
434 
435 //---------------------------------------------------------------------------
437  StatusCode sc;
438  for ( auto context : eventContexts ) {
439  sc = pushNewEvent( context );
440  if ( sc != StatusCode::SUCCESS ) return sc;
441  }
442  return sc;
443 }
444 
445 //---------------------------------------------------------------------------
447  return std::max( m_freeSlots.load(), 0 );
448 }
449 
450 //---------------------------------------------------------------------------
455 
456  unsigned int slotNum = 0;
457  for ( auto& thisSlot : m_eventSlots ) {
458  if ( not thisSlot.algsStates.allAlgsExecuted() and not thisSlot.complete ) {
459  updateStates( slotNum );
460  }
461  slotNum++;
462  }
463  return StatusCode::SUCCESS;
464 }
465 
466 //---------------------------------------------------------------------------
471  // debug() << "popFinishedEvent: queue size: " << m_finishedEvents.size() << endmsg;
472  if ( m_freeSlots.load() == m_maxEventsInFlight or m_isActive == INACTIVE ) {
473  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
474  // << " active: " << m_isActive << endmsg;
475  return StatusCode::FAILURE;
476  } else {
477  // debug() << "freeslots: " << m_freeSlots << "/" << m_maxEventsInFlight
478  // << " active: " << m_isActive << endmsg;
479  m_finishedEvents.pop( eventContext );
480  m_freeSlots++;
481  if (msgLevel(MSG::DEBUG))
482  debug() << "Popped slot " << eventContext->slot() << "(event "
483  << eventContext->evt() << ")" << endmsg;
484  return StatusCode::SUCCESS;
485  }
486 }
487 
488 //---------------------------------------------------------------------------
493  if ( m_finishedEvents.try_pop( eventContext ) ) {
494  if ( msgLevel( MSG::DEBUG ) )
495  debug() << "Try Pop successful slot " << eventContext->slot() << "(event " << eventContext->evt() << ")"
496  << endmsg;
497  m_freeSlots++;
498  return StatusCode::SUCCESS;
499  }
500  return StatusCode::FAILURE;
501 }
502 
503 //---------------------------------------------------------------------------
510 
511  // Set the number of slots available to an error code
512  m_freeSlots.store( 0 );
513 
514  fatal() << "*** Event " << eventContext->evt() << " on slot "
515  << eventContext->slot() << " failed! ***" << endmsg;
516 
517  std::ostringstream ost;
518  m_algExecStateSvc->dump(ost, *eventContext);
519 
520  info() << "Dumping Alg Exec State for slot " << eventContext->slot()
521  << ":\n" << ost.str() << endmsg;
522 
523  dumpSchedulerState(-1);
524 
525  // Empty queue and deactivate the service
526  action thisAction;
527  while ( m_actionsQueue.try_pop( thisAction ) ) {
528  };
529  deactivate();
530 
531  // Push into the finished events queue the failed context
532  EventContext* thisEvtContext;
533  while ( m_finishedEvents.try_pop( thisEvtContext ) ) {
534  m_finishedEvents.push( thisEvtContext );
535  };
536  m_finishedEvents.push( eventContext );
537 
538  return StatusCode::FAILURE;
539 }
540 
541 //===========================================================================
542 
543 //===========================================================================
544 // States Management
545 
556 
557  m_updateNeeded = true;
558 
559  StatusCode global_sc( StatusCode::FAILURE, true );
560 
561  // Sort from the oldest to the newest event
562  // Prepare a vector of pointers to the slots to avoid copies
563  std::vector<EventSlot*> eventSlotsPtrs;
564 
565  // Consider all slots if si <0 or just one otherwise
566  if ( si < 0 ) {
567  const int eventsSlotsSize( m_eventSlots.size() );
568  eventSlotsPtrs.reserve( eventsSlotsSize );
569  for ( auto slotIt = m_eventSlots.begin(); slotIt != m_eventSlots.end(); slotIt++ ) {
570  if ( !slotIt->complete ) eventSlotsPtrs.push_back( &( *slotIt ) );
571  }
572  std::sort( eventSlotsPtrs.begin(), eventSlotsPtrs.end(),
573  []( EventSlot* a, EventSlot* b ) { return a->eventContext->evt() < b->eventContext->evt(); } );
574  } else {
575  eventSlotsPtrs.push_back( &m_eventSlots[si] );
576  }
577 
578  for ( EventSlot* thisSlotPtr : eventSlotsPtrs ) {
579  int iSlot = thisSlotPtr->eventContext->slot();
580 
581  // Cache the states of the algos to improve readability and performance
582  auto& thisSlot = m_eventSlots[iSlot];
583  AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
584 
585  // Take care of the control ready update
586  if ( !algo_name.empty() )
587  m_efManager.updateDecision( algo_name, iSlot, thisAlgsStates, thisSlot.controlFlowState );
588 
589  StatusCode partial_sc( StatusCode::FAILURE, true );
590  // first update CONTROLREADY to DATAREADY
591 
592  // now update DATAREADY to SCHEDULED
593  if ( !m_optimizationMode.empty() ) {
594  auto comp_nodes = [this]( const uint& i, const uint& j ) {
597  };
599  comp_nodes, std::vector<uint>() );
600  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
601  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it )
602  buffer.push( *it );
603  /*std::stringstream s;
604  auto buffer2 = buffer;
605  while (!buffer2.empty()) {
606  s << m_efManager.getPrecedenceRulesGraph()->getAlgorithmNode(index2algname(buffer2.top()))->getRank() << ", ";
607  buffer2.pop();
608  }
609  info() << "DRBuffer is: [ " << s.str() << " ] <--" << algo_name << " executed" << endmsg;*/
610 
611  /*while (!buffer.empty()) {
612  partial_sc = promoteToScheduled(buffer.top(), iSlot);
613  if (partial_sc.isFailure()) {
614  if (msgLevel(MSG::VERBOSE))
615  verbose() << "Could not apply transition from "
616  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
617  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
618  if (m_useIOBoundAlgScheduler) {
619  partial_sc = promoteToAsyncScheduled(buffer.top(), iSlot);
620  if (msgLevel(MSG::VERBOSE))
621  if (partial_sc.isFailure())
622  verbose() << "[Asynchronous] Could not apply transition from "
623  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
624  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
625  }
626  }
627  buffer.pop();
628  }*/
629  while ( !buffer.empty() ) {
630  bool IOBound = false;
633 
634  if ( !IOBound )
635  partial_sc = promoteToScheduled( buffer.top(), iSlot );
636  else
637  partial_sc = promoteToAsyncScheduled( buffer.top(), iSlot );
638 
639  if (msgLevel(MSG::VERBOSE))
640  if (partial_sc.isFailure())
641  verbose() << "Could not apply transition from "
642  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
643  << " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
644 
645  buffer.pop();
646  }
647 
648  } else {
649  for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
650  it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it ) {
651  uint algIndex = *it;
652 
653  bool IOBound = false;
656 
657  if ( !IOBound )
658  partial_sc = promoteToScheduled( algIndex, iSlot );
659  else
660  partial_sc = promoteToAsyncScheduled( algIndex, iSlot );
661 
662  if (msgLevel(MSG::VERBOSE))
663  if (partial_sc.isFailure())
664  verbose() << "Could not apply transition from "
665  << AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
666  << " for algorithm " << index2algname(algIndex) << " on processing slot " << iSlot << endmsg;
667  }
668  }
669 
671  auto now = std::chrono::system_clock::now();
673  s << algo_name << ", " << thisAlgsStates.sizeOfSubset(State::CONTROLREADY) << ", "
674  << thisAlgsStates.sizeOfSubset(State::DATAREADY) << ", "
675  << thisAlgsStates.sizeOfSubset(State::SCHEDULED) << ", "
677  << "\n";
678  auto threads = (m_threadPoolSize != -1) ? std::to_string(m_threadPoolSize)
679  : std::to_string(tbb::task_scheduler_init::default_num_threads());
680  std::ofstream myfile;
681  myfile.open( "IntraEventConcurrencyDynamics_" + threads + "T.csv", std::ios::app );
682  myfile << s.str();
683  myfile.close();
684  }
685 
686  // Not complete because this would mean that the slot is already free!
687  if ( !thisSlot.complete && m_efManager.rootDecisionResolved( thisSlot.controlFlowState ) &&
688  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::CONTROLREADY ) &&
689  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::DATAREADY ) &&
690  !thisSlot.algsStates.algsPresent( AlgsExecutionStates::SCHEDULED ) ) {
691 
692  thisSlot.complete = true;
693  // if the event did not fail, add it to the finished events
694  // otherwise it is taken care of in the error handling already
695  if(m_algExecStateSvc->eventStatus(*thisSlot.eventContext) == EventStatus::Success) {
696  m_finishedEvents.push(thisSlot.eventContext);
697  if (msgLevel(MSG::DEBUG))
698  debug() << "Event " << thisSlot.eventContext->evt() << " finished (slot "
699  << thisSlot.eventContext->slot() << ")." << endmsg;
700  }
701  // now let's return the fully evaluated result of the control flow
702  if ( msgLevel( MSG::DEBUG ) ) {
704  m_efManager.printEventState( ss, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
705  debug() << ss.str() << endmsg;
706  }
707 
708  thisSlot.eventContext = nullptr;
709  } else {
710  StatusCode eventStalledSC = isStalled(iSlot);
711  if (! eventStalledSC.isSuccess()) {
712  m_algExecStateSvc->setEventStatus(EventStatus::AlgStall, *thisSlot.eventContext);
713  eventFailed(thisSlot.eventContext).ignore();
714  }
715  }
716  } // end loop on slots
717 
718  verbose() << "States Updated." << endmsg;
719 
720  return global_sc;
721 }
722 
723 //---------------------------------------------------------------------------
724 
732  // Get the slot
733  EventSlot& thisSlot = m_eventSlots[iSlot];
734 
735  if ( m_actionsQueue.empty() && m_algosInFlight == 0 && m_IOBoundAlgosInFlight == 0 &&
737 
738  info() << "About to declare a stall" << endmsg;
739  fatal() << "*** Stall detected! ***\n" << endmsg;
740  dumpSchedulerState( iSlot );
741  // throw GaudiException ("Stall detected",name(),StatusCode::FAILURE);
742 
743  return StatusCode::FAILURE;
744  }
745  return StatusCode::SUCCESS;
746 }
747 
748 //---------------------------------------------------------------------------
749 
756 
757  // To have just one big message
758  std::ostringstream outputMessageStream;
759 
760  outputMessageStream << "============================== Execution Task State ============================="
761  << std::endl;
762  dumpState( outputMessageStream );
763 
764  outputMessageStream << std::endl
765  << "============================== Scheduler State ================================="
766  << std::endl;
767 
768  int slotCount = -1;
769  for ( auto thisSlot : m_eventSlots ) {
770  slotCount++;
771  if ( thisSlot.complete ) continue;
772 
773  outputMessageStream << "----------- slot: " << thisSlot.eventContext->slot()
774  << " event: " << thisSlot.eventContext->evt() << " -----------" << std::endl;
775 
776  if ( 0 > iSlot or iSlot == slotCount ) {
777  outputMessageStream << "Algorithms states:" << std::endl;
778 
779  const DataObjIDColl& wbSlotContent( thisSlot.dataFlowMgr.content() );
780  for ( unsigned int algoIdx = 0; algoIdx < thisSlot.algsStates.size(); ++algoIdx ) {
781  outputMessageStream << " o " << index2algname( algoIdx ) << " ["
782  << AlgsExecutionStates::stateNames[thisSlot.algsStates[algoIdx]] << "] Data deps: ";
783  DataObjIDColl deps( thisSlot.dataFlowMgr.dataDependencies( algoIdx ) );
784  const int depsSize = deps.size();
785  if ( depsSize == 0 ) outputMessageStream << " none";
786 
787  DataObjIDColl missing;
788  for ( auto d : deps ) {
789  outputMessageStream << d << " ";
790  if ( wbSlotContent.find( d ) == wbSlotContent.end() ) {
791  // outputMessageStream << "[missing] ";
792  missing.insert( d );
793  }
794  }
795 
796  if ( !missing.empty() ) {
797  outputMessageStream << ". The following are missing: ";
798  for ( auto d : missing ) {
799  outputMessageStream << d << " ";
800  }
801  }
802 
803  outputMessageStream << std::endl;
804  }
805 
806  // Snapshot of the WhiteBoard
807  outputMessageStream << "\nWhiteboard contents: " << std::endl;
808  for ( auto& product : wbSlotContent ) outputMessageStream << " o " << product << std::endl;
809 
810  // Snapshot of the ControlFlow
811  outputMessageStream << "\nControl Flow:" << std::endl;
812  std::stringstream cFlowStateStringStream;
813  m_efManager.printEventState( cFlowStateStringStream, thisSlot.algsStates, thisSlot.controlFlowState, 0 );
814 
815  outputMessageStream << cFlowStateStringStream.str() << std::endl;
816  }
817  }
818 
819  outputMessageStream << "=================================== END ======================================" << std::endl;
820 
821  info() << "Dumping Scheduler State " << std::endl << outputMessageStream.str() << endmsg;
822 }
823 
824 //---------------------------------------------------------------------------
825 
827 
829 
830  const std::string& algName( index2algname( iAlgo ) );
831  IAlgorithm* ialgoPtr = nullptr;
832  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
833 
834  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
835  EventContext* eventContext( m_eventSlots[si].eventContext );
836  if ( !eventContext )
837  fatal() << "Event context for algorithm " << algName << " is a nullptr (slot " << si << ")" << endmsg;
838 
839  ++m_algosInFlight;
840  auto promote2ExecutedClosure = std::bind(&AvalancheSchedulerSvc::promoteToExecuted,
841  this,
842  iAlgo,
843  eventContext->slot(),
844  ialgoPtr,
845  eventContext);
846  // Avoid to use tbb if the pool size is 1 and run in this thread
847  if (-100 != m_threadPoolSize) {
848 
849  // this parent task is needed to promote an Algorithm as EXECUTED,
850  // it will be started as soon as the child task (see below) is completed
851  tbb::task* triggerAlgoStateUpdate = new(tbb::task::allocate_root())
852  enqueueSchedulerActionTask(this, promote2ExecutedClosure);
853  // setting parent's refcount to 1 is made here only for consistency
854  // (in this case since it is not scheduled explicitly and there it has only one child task)
855  triggerAlgoStateUpdate->set_ref_count(1);
856  // the child task that executes an Algorithm
857  tbb::task* algoTask = new(triggerAlgoStateUpdate->allocate_child())
858  AlgoExecutionTask(ialgoPtr, iAlgo, eventContext, serviceLocator(), m_algExecStateSvc);
859  // schedule the algoTask
860  tbb::task::enqueue( *algoTask);
861 
862  } else {
863  AlgoExecutionTask theTask(ialgoPtr, iAlgo, eventContext, serviceLocator(), m_algExecStateSvc);
864  theTask.execute();
865  promote2ExecutedClosure();
866  }
867 
868  if ( msgLevel( MSG::DEBUG ) )
869  debug() << "Algorithm " << algName << " was submitted on event " << eventContext->evt() << " in slot " << si
870  << ". Algorithms scheduled are " << m_algosInFlight << endmsg;
871 
872  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
873 
874  if ( msgLevel( MSG::VERBOSE ) ) dumpSchedulerState( -1 );
875 
876  if (updateSc.isSuccess())
877  if (msgLevel(MSG::VERBOSE))
878  verbose() << "Promoting " << index2algname(iAlgo) << " to SCHEDULED on slot "
879  << si << endmsg;
880  return updateSc;
881  } else {
882  if ( msgLevel( MSG::DEBUG ) )
883  debug() << "Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot " << si << endmsg;
884  return sc;
885  }
886 }
887 
888 //---------------------------------------------------------------------------
889 
891 
893 
894  // bool IOBound = m_efManager.getPrecedenceRulesGraph()->getAlgorithmNode(algName)->isIOBound();
895 
896  const std::string& algName( index2algname( iAlgo ) );
897  IAlgorithm* ialgoPtr = nullptr;
898  StatusCode sc( m_algResourcePool->acquireAlgorithm( algName, ialgoPtr ) );
899 
900  if ( sc.isSuccess() ) { // if we managed to get an algorithm instance try to schedule it
901  EventContext* eventContext( m_eventSlots[si].eventContext );
902  if ( !eventContext )
903  fatal() << "[Asynchronous] Event context for algorithm " << algName << " is a nullptr (slot " << si << ")"
904  << endmsg;
905 
907  // Can we use tbb-based overloaded new-operator for a "custom" task (an algorithm wrapper, not derived from tbb::task)? it seems it works..
908  IOBoundAlgTask* theTask = new( tbb::task::allocate_root() )
909  IOBoundAlgTask(ialgoPtr, iAlgo, eventContext, serviceLocator(), m_algExecStateSvc);
910  m_IOBoundAlgScheduler->push(*theTask);
911 
912  if (msgLevel(MSG::DEBUG))
913  debug() << "[Asynchronous] Algorithm " << algName << " was submitted on event "
914  << eventContext->evt() << " in slot " << si
915  << ". algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
916 
917  StatusCode updateSc( m_eventSlots[si].algsStates.updateState( iAlgo, AlgsExecutionStates::SCHEDULED ) );
918 
919  if (updateSc.isSuccess())
920  if (msgLevel(MSG::VERBOSE))
921  verbose() << "[Asynchronous] Promoting " << index2algname(iAlgo)
922  << " to SCHEDULED on slot " << si << endmsg;
923  return updateSc;
924  } else {
925  if ( msgLevel( MSG::DEBUG ) )
926  debug() << "[Asynchronous] Could not acquire instance for algorithm " << index2algname( iAlgo ) << " on slot "
927  << si << endmsg;
928  return sc;
929  }
930 }
931 
932 //---------------------------------------------------------------------------
937  EventContext* eventContext ) {
938  // Put back the instance
939  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
940  if ( !castedAlgo ) fatal() << "The casting did not succeed!" << endmsg;
941  // EventContext* eventContext = castedAlgo->getContext();
942 
943  // Check if the execution failed
944  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
945  eventFailed(eventContext).ignore();
946 
947  Gaudi::Hive::setCurrentContext(eventContext);
948  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
949 
950  if ( !sc.isSuccess() ) {
951  error() << "[Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
952  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
953  return StatusCode::FAILURE;
954  }
955 
956  m_algosInFlight--;
957 
958  EventSlot& thisSlot = m_eventSlots[si];
959 
960  if ( msgLevel( MSG::DEBUG ) )
961  debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
962  << m_algosInFlight << endmsg;
963 
964  // Schedule an update of the status of the algorithms
965  auto updateAction = std::bind( &AvalancheSchedulerSvc::updateStates, this, -1, algo->name() );
966  m_actionsQueue.push( updateAction );
967  m_updateNeeded = false;
968 
969  if ( msgLevel( MSG::DEBUG ) )
970  debug() << "Trying to handle execution result of " << index2algname( iAlgo ) << " on slot " << si << endmsg;
971  State state;
972  if ( algo->filterPassed() ) {
973  state = State::EVTACCEPTED;
974  } else {
975  state = State::EVTREJECTED;
976  }
977 
978  sc = thisSlot.algsStates.updateState( iAlgo, state );
979 
980  if (sc.isSuccess())
981  if (msgLevel(MSG::VERBOSE))
982  verbose() << "Promoting " << index2algname(iAlgo) << " on slot " << si << " to "
984 
985  return sc;
986 }
987 
988 //---------------------------------------------------------------------------
993  EventContext* eventContext ) {
994  // Put back the instance
995  Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
996  if ( !castedAlgo ) fatal() << "[Asynchronous] The casting did not succeed!" << endmsg;
997  // EventContext* eventContext = castedAlgo->getContext();
998 
999  // Check if the execution failed
1000  if (m_algExecStateSvc->eventStatus(*eventContext) != EventStatus::Success)
1001  eventFailed(eventContext).ignore();
1002 
1003  StatusCode sc = m_algResourcePool->releaseAlgorithm( algo->name(), algo );
1004 
1005  if ( !sc.isSuccess() ) {
1006  error() << "[Asynchronous] [Event " << eventContext->evt() << ", Slot " << eventContext->slot() << "] "
1007  << "Instance of algorithm " << algo->name() << " could not be properly put back." << endmsg;
1008  return StatusCode::FAILURE;
1009  }
1010 
1012 
1013  EventSlot& thisSlot = m_eventSlots[si];
1014 
1015  if (msgLevel(MSG::DEBUG))
1016  debug() << "[Asynchronous] Algorithm " << algo->name() << " executed in slot " << si
1017  << ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
1018 
1019  // Schedule an update of the status of the algorithms
1020  auto updateAction = std::bind( &AvalancheSchedulerSvc::updateStates, this, -1, algo->name() );
1021  m_actionsQueue.push( updateAction );
1022  m_updateNeeded = false;
1023 
1024  if (msgLevel(MSG::DEBUG))
1025  debug() << "[Asynchronous] Trying to handle execution result of "
1026  << index2algname(iAlgo) << " on slot " << si << endmsg;
1027  State state;
1028  if ( algo->filterPassed() ) {
1029  state = State::EVTACCEPTED;
1030  } else {
1031  state = State::EVTREJECTED;
1032  }
1033 
1034  sc = thisSlot.algsStates.updateState( iAlgo, state );
1035 
1036  if (sc.isSuccess())
1037  if (msgLevel(MSG::VERBOSE))
1038  verbose() << "[Asynchronous] Promoting " << index2algname(iAlgo) << " on slot "
1039  << si << " to " << AlgsExecutionStates::stateNames[state] << endmsg;
1040 
1041  return sc;
1042 }
1043 
1044 //===========================================================================
1046 {
1047 
1049  m_sState.push_back( SchedulerState( a, e, t ) );
1050 }
1051 
1052 //===========================================================================
1054 {
1055 
1057 
1058  for ( std::list<SchedulerState>::iterator itr = m_sState.begin(); itr != m_sState.end(); ++itr ) {
1059  if ( *itr == a ) {
1060  m_sState.erase( itr );
1061  return true;
1062  }
1063  }
1064 
1065  error() << "could not find Alg " << a->name() << " in Scheduler!" << endmsg;
1066  return false;
1067 }
1068 
1069 //===========================================================================
1071 {
1072 
1074 
1075  for ( auto it : m_sState ) {
1076  ost << " " << it << std::endl;
1077  }
1078 }
1079 
1080 //===========================================================================
1082 {
1083 
1085 
1086  std::ostringstream ost;
1087  ost << "dumping Executing Threads: [" << m_sState.size() << "]" << std::endl;
1088  dumpState( ost );
1089 
1090  info() << ost.str() << endmsg;
1091 }
virtual StatusCode initPool(const int &poolSize)=0
Initializes the thread pool.
bool algsPresent(State state) const
Wrapper around I/O-bound Gaudi-algorithms.
StatusCode tryPopFinishedEvent(EventContext *&eventContext) override
Try to fetch an event from the scheduler.
StatusCode initialize() override
Definition: Service.cpp:64
virtual StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false)=0
Acquire a certain algorithm using its name.
T empty(T...args)
const std::chrono::system_clock::time_point getInitTime() const
unsigned int m_IOBoundAlgosInFlight
Number of algoritms presently in flight.
T open(T...args)
void printEventState(std::stringstream &ss, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print the state of the control flow for a given event.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:715
virtual concurrency::PrecedenceRulesGraph * getPRGraph() const
StatusCode finalize() override
Definition: Service.cpp:174
ContextID_t slot() const
Definition: EventContext.h:41
Gaudi::Property< bool > m_dumpIntraEventDynamics
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
void dumpSchedulerState(int iSlot)
Dump the state of the scheduler.
StatusCode promoteToScheduled(unsigned int iAlgo, int si)
Algorithm promotion.
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
virtual void dump(std::ostringstream &ost, const EventContext &ctx) const =0
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
EventContext * eventContext
Cache for the eventContext.
Definition: EventSlot.h:32
StatusCode isStalled(int si)
Check if the scheduling is in a stall.
Header file for class GaudiAlgorithm.
T to_string(T...args)
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
T endl(T...args)
virtual bool filterPassed() const =0
Did this algorithm pass or fail its filter criterion for the last event?
void touchReadyAlgorithms(IGraphVisitor &visitor) const
Promote all algorithms, ready to be executed, to DataReady state.
T duration_cast(T...args)
void activate()
Activate scheduler.
T end(T...args)
Gaudi::Property< std::string > m_optimizationMode
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
size_t sizeOfSubset(State state) const
A visitor, performing full top-down traversals of a graph.
StatusCode promoteToAsyncScheduled(unsigned int iAlgo, int si)
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
This class represents an entry point to all the event specific data.
Definition: EventContext.h:25
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:84
unsigned int algname2index(const std::string &algoname)
Convert a name to an integer.
bool isIOBound() const
Check if algorithm is I/O-bound.
void addAlg(Algorithm *, EventContext *, pthread_t)
std::atomic< ActivationState > m_isActive
Flag to track if the scheduler is active or not.
std::unordered_map< std::string, unsigned int > m_algname_index_map
Map to bookkeep the information necessary to the name2index conversion.
tbb::task * execute() override
ContextEvt_t evt() const
Definition: EventContext.h:40
STL class.
Gaudi::Property< bool > m_useIOBoundAlgScheduler
std::atomic_int m_freeSlots
Atomic to account for asyncronous updates by the scheduler wrt the rest.
virtual StatusCode terminatePool()=0
Finalize the thread pool.
StatusCode pushNewEvents(std::vector< EventContext * > &eventContexts) override
unsigned int m_algosInFlight
Number of algoritms presently in flight.
T push_back(T...args)
static std::list< SchedulerState > m_sState
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
STL class.
bool rootDecisionResolved(const std::vector< int > &node_decisions) const
Check whether root decision was resolved.
const float & getRank() const
Get Algorithm rank.
SmartIF< IAlgResourcePool > m_algResourcePool
Cache for the algorithm resource pool.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
StatusCode popFinishedEvent(EventContext *&eventContext) override
Blocks until an event is availble.
T join(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
const DataObjIDColl & inputDataObjs() const override
tbb::concurrent_bounded_queue< EventContext * > m_finishedEvents
Queue of finished events.
std::vector< std::string > m_algname_vect
Vector to bookkeep the information necessary to the index2name conversion.
T close(T...args)
virtual void setEventStatus(const EventStatus::Status &sc, const EventContext &ctx)=0
T bind(T...args)
StatusCode finalize() override
Finalise.
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:242
concurrency::ExecutionFlowManager m_efManager
Member to take care of the control flow.
bool complete
Flags completion of the event.
Definition: EventSlot.h:39
Gaudi::Property< int > m_threadPoolSize
SmartIF< IThreadPoolSvc > m_threadPoolSvc
SmartIF< IAccelerator > m_IOBoundAlgScheduler
A shortcut to IO-bound algorithm scheduler.
T max(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
GAUDI_API void setCurrentContext(const EventContext *ctx)
bool m_updateNeeded
Keep track of update actions scheduled.
T insert(T...args)
Gaudi::Property< int > m_maxEventsInFlight
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find_if(T...args)
T size(T...args)
StatusCode pushNewEvent(EventContext *eventContext) override
Make an event available to the scheduler.
Gaudi::Property< unsigned int > m_maxAlgosInFlight
STL class.
void reset(EventContext *theeventContext)
Reset all resources in order to reuse the slot.
Definition: EventSlot.h:26
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
StatusCode eventFailed(EventContext *eventContext)
Method to check if an event failed and take appropriate actions.
T begin(T...args)
Iterator begin(State kind)
virtual const EventStatus::Status & eventStatus(const EventContext &ctx) const =0
const std::string & index2algname(unsigned int index)
Convert an integer to a name.
Class representing the event slot.
Definition: EventSlot.h:11
string s
Definition: gaudirun.py:245
StatusCode promoteToExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the AlgoExecutionTask.
unsigned int freeSlots() override
Get free slots number.
std::vector< EventSlot > m_eventSlots
Vector of events slots.
T sort(T...args)
StatusCode promoteToAsyncExecuted(unsigned int iAlgo, int si, IAlgorithm *algo, EventContext *)
The call to this method is triggered only from within the IOBoundAlgTask.
StatusCode deactivate()
Deactivate scheduler.
virtual StatusCode push(IAlgTask &task)=0
void ignore() const
Definition: StatusCode.h:106
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
PrecedenceRulesGraph * getPrecedenceRulesGraph() const
Get the flow graph instance.
StatusCode updateStates(int si=-1, const std::string &algo_name=std::string())
Loop on algorithm in the slots and promote them to successive states (-1 means all slots...
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
State
Execution states of the algorithms.
T for_each(T...args)
virtual StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo)=0
Release a certain algorithm.
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
STL class.
SmartIF< IAlgExecStateSvc > m_algExecStateSvc
Algorithm execution state manager.
Gaudi::Property< unsigned int > m_maxIOBoundAlgosInFlight
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
tbb::concurrent_bounded_queue< action > m_actionsQueue
Queue where closures are stored and picked for execution.
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
T reserve(T...args)
static std::map< State, std::string > stateNames
T emplace_back(T...args)
std::thread m_thread
The thread in which the activate function runs.
StatusCode m_drain()
Drain the actions present in the queue.
Iterator end(State kind)
StatusCode updateState(unsigned int iAlgo, State newState)