The Gaudi Framework  v28r3 (cc1cf868)
PrecedenceRulesGraph.cpp
Go to the documentation of this file.
1 #include "PrecedenceRulesGraph.h"
2 #include "PRGraphVisitors.h"
3 
5 
6 namespace concurrency
7 {
8 
9  //---------------------------------------------------------------------------
10  std::string ControlFlowNode::stateToString( const int& stateId ) const
11  {
12 
13  if ( 0 == stateId )
14  return "FALSE";
15  else if ( 1 == stateId )
16  return "TRUE";
17  else
18  return "UNDEFINED";
19  }
20 
21  //---------------------------------------------------------------------------
23  {
24 
25  for ( auto node : m_children ) delete node;
26  }
27 
28  //---------------------------------------------------------------------------
30  {
31 
32  for ( auto daughter : m_children ) daughter->initialize( algname_index_map );
33  }
34 
35  //---------------------------------------------------------------------------
37  {
38 
39  if ( std::find( m_parents.begin(), m_parents.end(), node ) == m_parents.end() ) m_parents.push_back( node );
40  }
41 
42  //--------------------------------------------------------------------------
44  {
45 
46  if ( std::find( m_children.begin(), m_children.end(), node ) == m_children.end() ) m_children.push_back( node );
47  }
48 
49  //---------------------------------------------------------------------------
51  const std::vector<int>& node_decisions, const unsigned int& recursionLevel ) const
52  {
53 
54  output << std::string( recursionLevel, ' ' ) << m_nodeName << " (" << m_nodeIndex << ")"
55  << ", w/ decision: " << stateToString( node_decisions[m_nodeIndex] ) << "(" << node_decisions[m_nodeIndex]
56  << ")" << std::endl;
57  for ( auto daughter : m_children ) {
58  daughter->printState( output, states, node_decisions, recursionLevel + 2 );
59  }
60  }
61 
62  //---------------------------------------------------------------------------
63  int DecisionNode::updateState( AlgsExecutionStates& states, std::vector<int>& node_decisions ) const
64  {
65  // check whether we already had a result earlier
66  // if (-1 != node_decisions[m_nodeIndex] ) { return node_decisions[m_nodeIndex]; }
67  int decision = ( ( m_allPass && m_modePromptDecision ) ? 1 : -1 );
68  bool hasUndecidedChild = false;
69  for ( auto daughter : m_children ) {
70  if ( m_modePromptDecision && ( -1 != decision || hasUndecidedChild ) ) {
71  node_decisions[m_nodeIndex] = decision;
72  return decision;
73  } // if prompt decision, return once result is known already or we can't fully evaluate right now because one daugther
74  // decision is missing still
75  auto res = daughter->updateState( states, node_decisions );
76  if ( -1 == res ) {
77  hasUndecidedChild = true;
78  } else if ( false == m_modeOR && res == 0 ) {
79  decision = 0;
80  } // "and"-mode (once first result false, the overall decision is false)
81  else if ( true == m_modeOR && res == 1 ) {
82  decision = 1;
83  } // "or"-mode (once first result true, the overall decision is true)
84  }
85  // what to do with yet undefined answers depends on whether AND or OR mode applies
86  if ( !hasUndecidedChild && -1 == decision ) {
87  // OR mode: all results known, and none true -> reject
88  if ( true == m_modeOR ) {
89  decision = 0;
90  }
91  // AND mode: all results known, and no false -> accept
92  else {
93  decision = 1;
94  }
95  }
96  // in all other cases I stay with previous decisions
97  node_decisions[m_nodeIndex] = decision;
98  if ( m_allPass ) decision = 1;
99  return decision;
100  }
101 
102  //---------------------------------------------------------------------------
103  void DecisionNode::updateDecision( const int& slotNum, AlgsExecutionStates& states, std::vector<int>& node_decisions,
104  const AlgorithmNode* requestor ) const
105  {
106 
107  int decision = ( ( m_allPass && m_modePromptDecision ) ? 1 : -1 );
108  bool keepGoing = true;
109  bool hasUndecidedChild = false;
110  // std::cout << "++++++++++++++++++++BEGIN(UPDATING)++++++++++++++++++++" << std::endl;
111  // std::cout << "UPDATING DAUGHTERS of DECISION NODE: " << m_nodeName << std::endl;
112 
113  for ( auto daughter : m_children ) {
114  // if prompt decision, return once result is known already or we can't fully evaluate
115  // right now because one daughter decision is missing still
116  // std::cout << "----UPDATING DAUGHTER: " << daughter->getNodeName() << std::endl;
117  if ( m_modePromptDecision && !keepGoing ) {
118  node_decisions[m_nodeIndex] = decision;
119  // std::cout << "STOPPING ITERATION OVER (UPDATING) DECISION NODE CHILDREN: " << m_nodeName << std::endl;
120  break;
121  // return;
122  }
123 
124  // modified
125  int& res = node_decisions[daughter->getNodeIndex()];
126  if ( -1 == res ) {
127  hasUndecidedChild = true;
128  if ( typeid( *daughter ) != typeid( concurrency::DecisionNode ) ) {
129  auto algod = (AlgorithmNode*)daughter;
130  algod->promoteToControlReadyState( slotNum, states, node_decisions );
131  bool result = algod->promoteToDataReadyState( slotNum, requestor );
132  if ( result ) keepGoing = false;
133  } else {
134  daughter->updateDecision( slotNum, states, node_decisions, requestor );
135  }
136 
137  // "and"-mode (once first result false, the overall decision is false)
138  } else if ( false == m_modeOR && res == 0 ) {
139  decision = 0;
140  keepGoing = false;
141  // "or"-mode (once first result true, the overall decision is true)
142  } else if ( true == m_modeOR && res == 1 ) {
143  decision = 1;
144  keepGoing = false;
145  }
146  }
147 
148  // what to do with yet undefined answers depends on whether AND or OR mode applies
149  if ( !hasUndecidedChild && -1 == decision ) {
150  // OR mode: all results known, and none true -> reject
151  if ( true == m_modeOR ) {
152  decision = 0;
153  // AND mode: all results known, and no false -> accept
154  } else {
155  decision = 1;
156  }
157  }
158 
159  // in all other cases I stay with previous decisions
160  node_decisions[m_nodeIndex] = decision;
161 
162  // propagate decision upwards through the decision graph
163  if ( -1 != decision )
164  for ( auto p : m_parents ) p->updateDecision( slotNum, states, node_decisions, requestor );
165 
166  // std::cout << "++++++++++++++++++++END(UPDATING)++++++++++++++++++++" << std::endl;
167  }
168 
169  //---------------------------------------------------------------------------
171  std::vector<int>& node_decisions ) const
172  {
173  // std::cout << "REACHED DECISNODE " << m_nodeName << std::endl;
174  if ( -1 != node_decisions[m_nodeIndex] ) {
175  return true;
176  }
177 
178  for ( auto daughter : m_children ) {
179  auto res = node_decisions[daughter->getNodeIndex()];
180  if ( -1 == res ) {
181  daughter->promoteToControlReadyState( slotNum, states, node_decisions );
182  if ( m_modePromptDecision ) return true;
183  } else if ( m_modePromptDecision ) {
184  if ( ( false == m_modeOR && res == 0 ) || ( true == m_modeOR && res == 1 ) ) return true;
185  }
186  }
187 
188  return true;
189  }
190 
191  //---------------------------------------------------------------------------
193  {
194 
195  if ( visitor.visitEnter( *this ) ) {
196  // try to aggregate a decision
197  bool result = visitor.visit( *this );
198 
199  // if a decision was made for this node, propagate the result upwards
200  if ( result ) {
201  for ( auto parent : m_parents ) {
202  parent->accept( visitor );
203  }
204  return false;
205  }
206 
207  // if no decision can be made yet, request further information downwards
208  for ( auto child : m_children ) {
209  bool result = child->accept( visitor );
210  if (!m_modeConcurrent)
211  if ( result ) break; //stop on first unresolved child if its decision hub is sequential
212  }
213 
214  return true; // visitor was accepted to try to aggregate the node's decision
215  }
216 
217  return false; // visitor was rejected (since the decision node has an aggregated decision already)
218  }
219 
220  //---------------------------------------------------------------------------
222  {
223 
224  for ( auto node : m_outputs ) {
225  delete node;
226  }
227  }
228 
229  //---------------------------------------------------------------------------
231  {
232 
233  m_algoIndex = algname_index_map.at( m_algoName );
234  }
235 
236  //---------------------------------------------------------------------------
237  bool AlgorithmNode::promoteToControlReadyState( const int& /*slotNum*/, AlgsExecutionStates& states,
238  std::vector<int>& /*node_decisions*/ ) const
239  {
240 
241  auto& state = states[m_algoIndex];
242  bool result = false;
243 
244  if ( State::INITIAL == state ) {
245  states.updateState( m_algoIndex, State::CONTROLREADY ).ignore();
246  // std::cout << "----> UPDATING ALGORITHM to CONTROLREADY: " << m_algoName << std::endl;
247  result = true;
248  } else if ( State::CONTROLREADY == state ) {
249  result = true;
250  }
251 
252  return result;
253  }
254 
255  //---------------------------------------------------------------------------
256  bool AlgorithmNode::promoteToDataReadyState( const int& slotNum, const AlgorithmNode* /*requestor*/ ) const
257  {
258 
259  auto& states = m_graph->getAlgoStates( slotNum );
260  auto& state = states[m_algoIndex];
261  bool result = false;
262 
263  if ( State::CONTROLREADY == state ) {
264  if ( dataDependenciesSatisfied( slotNum ) ) {
265  // std::cout << "----> UPDATING ALGORITHM to DATAREADY: " << m_algoName << std::endl;
266  states.updateState( m_algoIndex, State::DATAREADY ).ignore();
267  result = true;
268 
269  // m_graph->addEdgeToExecutionPlan(requestor, this);
270 
271  /*
272  auto xtime = std::chrono::high_resolution_clock::now();
273  std::stringstream s;
274  s << getNodeName() << ", "
275  << (xtime-m_graph->getInitTime()).count() << "\n";
276  std::ofstream myfile;
277  myfile.open("DRTiming.csv", std::ios::app);
278  myfile << s.str();
279  myfile.close();
280  */
281  }
282  } else if ( State::DATAREADY == state ) {
283  result = true;
284  } else if ( State::SCHEDULED == state ) {
285  result = true;
286  }
287 
288  return result;
289  }
290 
291  //---------------------------------------------------------------------------
292  bool AlgorithmNode::dataDependenciesSatisfied( const int& slotNum ) const
293  {
294 
295  bool result = true; //return true if an algorithm has no data inputs
296  auto& states = m_graph->getAlgoStates( slotNum );
297 
298  for ( auto dataNode : m_inputs ) {
299  // return false if the input has no producers at all (normally this case must be
300  // forbidden, and must be invalidated at configuration time)
301  result = false;
302  for ( auto algoNode : dataNode->getProducers() )
303  if ( State::EVTACCEPTED == states[algoNode->getAlgoIndex()] ) {
304  result = true;
305  break; // skip checking other producers if one was found to be executed
306  }
307 
308  if (!result) break; // skip checking other inputs if this input was not produced yet
309  }
310 
311  return result;
312  }
313 
314  //---------------------------------------------------------------------------
316  const std::vector<int>& node_decisions, const unsigned int& recursionLevel ) const
317  {
318  output << std::string( recursionLevel, ' ' ) << m_nodeName << " (" << m_nodeIndex << ")"
319  << ", w/ decision: " << stateToString( node_decisions[m_nodeIndex] ) << "(" << node_decisions[m_nodeIndex]
320  << ")"
321  << ", in state: " << AlgsExecutionStates::stateNames[states[m_algoIndex]] << std::endl;
322  }
323 
324  //---------------------------------------------------------------------------
326  {
327  // check whether we already had a result earlier
328  // if (-1 != node_decisions[m_nodeIndex] ) { return node_decisions[m_nodeIndex]; }
329  // since we reached this point in the control flow, this algorithm is supposed to run
330  // if it hasn't already
331  const State& state = states[m_algoIndex];
332  unsigned int decision = -1;
333  if ( State::INITIAL == state ) {
334  states.updateState( m_algoIndex, State::CONTROLREADY ).ignore();
335  }
336  // now derive the proper result to pass back
337  if ( true == m_allPass ) {
338  decision = 1;
339  } else if ( State::EVTACCEPTED == state ) {
340  decision = !m_inverted;
341  } else if ( State::EVTREJECTED == state ) {
342  decision = m_inverted;
343  } else {
344  decision = -1; // result not known yet
345  }
346  node_decisions[m_nodeIndex] = decision;
347  return decision;
348  }
349 
350  //---------------------------------------------------------------------------
351  void AlgorithmNode::updateDecision( const int& slotNum, AlgsExecutionStates& states, std::vector<int>& node_decisions,
352  const AlgorithmNode* /*requestor*/ ) const
353  {
354 
355  const State& state = states[m_algoIndex];
356  int decision = -1;
357  //requestor = this;
358 
359  // now derive the proper result to pass back
360  if ( true == m_allPass ) {
361  decision = 1;
362  } else if ( State::EVTACCEPTED == state ) {
363  decision = !m_inverted;
364  } else if ( State::EVTREJECTED == state ) {
365  decision = m_inverted;
366  } else {
367  decision = -1; // result not known yet
368  }
369 
370  node_decisions[m_nodeIndex] = decision;
371 
372  if ( -1 != decision ) {
373  auto& slot = (*m_graph->m_eventSlots)[slotNum];
374  auto promoter = DataReadyPromoter(slot);
375  for ( auto output : m_outputs )
376  for ( auto consumer : output->getConsumers() )
377  if (State::CONTROLREADY == states[consumer->getAlgoIndex()])
378  consumer->accept(promoter);
379 
380  auto vis = concurrency::Supervisor(slot);
381  for ( auto p : m_parents ) {
382  //p->updateDecision( slotNum, states, node_decisions, requestor );
383  p->accept(vis);
384  }
385 
386  }
387  }
388 
389  //---------------------------------------------------------------------------
391  {
392 
393  if ( visitor.visitEnter( *this ) ) {
394  visitor.visit( *this );
395  return true; // visitor was accepted to promote the algorithm
396  }
397 
398  return false; // visitor was rejected (since the algorithm already produced a decision)
399  }
400 
401  //---------------------------------------------------------------------------
403  {
404 
405  if ( std::find( m_parents.begin(), m_parents.end(), node ) == m_parents.end() ) m_parents.push_back( node );
406  }
407 
408  //---------------------------------------------------------------------------
410  {
411 
412  if ( std::find( m_outputs.begin(), m_outputs.end(), node ) == m_outputs.end() ) m_outputs.push_back( node );
413  }
414 
415  //---------------------------------------------------------------------------
417  {
418 
419  if ( std::find( m_inputs.begin(), m_inputs.end(), node ) == m_inputs.end() ) m_inputs.push_back( node );
420  }
421 
422  //---------------------------------------------------------------------------
424  {
425 
426  m_headNode->initialize( algname_index_map );
427  // StatusCode sc = buildDataDependenciesRealm();
428  StatusCode sc = buildAugmentedDataDependenciesRealm();
429 
430  if ( !sc.isSuccess() ) error() << "Could not build the data dependency realm." << endmsg;
431 
432  return sc;
433  }
434 
435  //---------------------------------------------------------------------------
437  std::vector<EventSlot>& eventSlots )
438  {
439 
440  m_eventSlots = &eventSlots;
441  m_headNode->initialize( algname_index_map );
442  // StatusCode sc = buildDataDependenciesRealm();
443  StatusCode sc = buildAugmentedDataDependenciesRealm();
444 
445  if ( !sc.isSuccess() ) error() << "Could not build the data dependency realm." << endmsg;
446 
447  if (msgLevel(MSG::DEBUG))
448  debug() << dumpDataFlow() << endmsg;
449 
450  return sc;
451  }
452 
453  //---------------------------------------------------------------------------
455  {
456 
457  const std::string& algoName = algo->name();
458 
459  m_algoNameToAlgoInputsMap[algoName] = algo->inputDataObjs();
460  m_algoNameToAlgoOutputsMap[algoName] = algo->outputDataObjs();
461 
462  if (msgLevel(MSG::DEBUG)) {
463  debug() << "Inputs of " << algoName << ": ";
464  for (auto tag : algo->inputDataObjs())
465  debug() << tag << " | ";
466  debug() << endmsg;
467 
468  debug() << "Outputs of " << algoName << ": ";
469  for (auto tag : algo->outputDataObjs())
470  debug() << tag << " | ";
471  debug() << endmsg;
472  }
473  }
474 
475  //---------------------------------------------------------------------------
477  {
478 
479  StatusCode global_sc( StatusCode::SUCCESS );
480 
481  for ( auto algo : m_algoNameToAlgoNodeMap ) {
482 
483  auto targetNode = m_algoNameToAlgoNodeMap[algo.first];
484 
485  // Find producers for all the inputs of the target node
486  auto& targetInCollection = m_algoNameToAlgoInputsMap[algo.first];
487  for (auto inputTag : targetInCollection) {
488  for (auto producer : m_algoNameToAlgoOutputsMap) {
489  auto& outputs = m_algoNameToAlgoOutputsMap[producer.first];
490  for (auto outputTag : outputs) {
491  if (inputTag == outputTag) {
492  auto& known_producers = targetNode->getSupplierNodes();
493  auto valid_producer = m_algoNameToAlgoNodeMap[producer.first];
494  auto& known_consumers = valid_producer->getConsumerNodes();
495  if ( std::find( known_producers.begin(), known_producers.end(), valid_producer ) ==
496  known_producers.end() )
497  targetNode->addSupplierNode( valid_producer );
498  if ( std::find( known_consumers.begin(), known_consumers.end(), targetNode ) == known_consumers.end() )
499  valid_producer->addConsumerNode( targetNode );
500  }
501  }
502  }
503  }
504 
505  // Find consumers for all the outputs of the target node
506  auto& targetOutCollection = m_algoNameToAlgoOutputsMap[algo.first];
507  for (auto outputTag : targetOutCollection) {
508  for (auto consumer : m_algoNameToAlgoInputsMap) {
509  auto& inputs = m_algoNameToAlgoInputsMap[consumer.first];
510  for (auto inputTag : inputs) {
511  if (inputTag == outputTag) {
512  auto& known_consumers = targetNode->getConsumerNodes();
513  auto valid_consumer = m_algoNameToAlgoNodeMap[consumer.first];
514  auto& known_producers = valid_consumer->getSupplierNodes();
515  if ( std::find( known_producers.begin(), known_producers.end(), targetNode ) == known_producers.end() )
516  valid_consumer->addSupplierNode( targetNode );
517  if ( std::find( known_consumers.begin(), known_consumers.end(), valid_consumer ) ==
518  known_consumers.end() )
519  targetNode->addConsumerNode( valid_consumer );
520  }
521  }
522  }
523  }
524  }
525  return global_sc;
526  }
527 
528  //---------------------------------------------------------------------------
530  {
531 
532  StatusCode global_sc( StatusCode::SUCCESS, true );
533 
534  // Create the DataObjects (DO) realm (represented by DataNodes in the graph),
535  // connected to DO producers (AlgorithmNodes)
536  for (auto algo : m_algoNameToAlgoNodeMap) {
537 
538  auto& outCollection = m_algoNameToAlgoOutputsMap[algo.first];
539  for (auto outputTag : outCollection) {
540  const auto sc = addDataNode(outputTag);
541  if (!sc.isSuccess()) {
542  error() << "Extra producer (" << algo.first << ") for DataObject @ "
543  << outputTag
544  << " has been detected: this is not allowed." << endmsg;
545  global_sc = sc;
546  }
547  auto dataNode = getDataNode(outputTag);
548  dataNode->addProducerNode(algo.second);
549  algo.second->addOutputDataNode(dataNode);
550  }
551  }
552 
553  // Connect previously created DO realm to DO consumers (AlgorithmNodes)
554  for ( auto algo : m_algoNameToAlgoNodeMap ) {
555  auto& inCollection = m_algoNameToAlgoInputsMap[algo.first];
556  for (auto inputTag : inCollection) {
557  DataNode* dataNode = nullptr;
558  auto primaryPath = inputTag;
559  auto itP = m_dataPathToDataNodeMap.find(primaryPath);
560  if (itP != m_dataPathToDataNodeMap.end()) {
561  dataNode = getDataNode(primaryPath);
562  //if (!inCollection[inputTag].alternativeDataProductNames().empty())
563  // warning() << "Dropping all alternative data dependencies in the graph, but '" << primaryPath
564  // << "', for algorithm " << algo.first << endmsg;
565  //} else {
566  // for (auto alterPath : inCollection[inputTag].alternativeDataProductNames()) {
567  // auto itAP = m_dataPathToDataNodeMap.find(alterPath);
568  // if (itAP != m_dataPathToDataNodeMap.end()) {
569  // dataNode = getDataNode(alterPath);
570  // warning() << "Dropping all alternative data dependencies in the graph, but '" << alterPath
571  // << "', for algorithm " << algo.first << endmsg;
572  // break;
573  // }
574  //}
575  }
576 
577  if (dataNode) {
578  dataNode->addConsumerNode(algo.second);
579  algo.second->addInputDataNode(dataNode);
580  }
581  }
582  }
583 
584  return global_sc;
585  }
586 
587  //---------------------------------------------------------------------------
588  StatusCode PrecedenceRulesGraph::addAlgorithmNode( Algorithm* algo, const std::string& parentName, bool inverted,
589  bool allPass )
590  {
591 
593 
594  auto& algoName = algo->name();
595 
596  auto itP = m_decisionNameToDecisionHubMap.find( parentName );
597  concurrency::DecisionNode* parentNode;
598  if ( itP != m_decisionNameToDecisionHubMap.end() ) {
599  parentNode = itP->second;
600  auto itA = m_algoNameToAlgoNodeMap.find( algoName );
601  concurrency::AlgorithmNode* algoNode;
602  if ( itA != m_algoNameToAlgoNodeMap.end() ) {
603  algoNode = itA->second;
604  } else {
605  algoNode = new concurrency::AlgorithmNode( *this, m_nodeCounter, algoName, inverted, allPass, algo->isIOBound() );
606  ++m_nodeCounter;
607  m_algoNameToAlgoNodeMap[algoName] = algoNode;
608  if (msgLevel(MSG::DEBUG))
609  debug() << "AlgoNode " << algoName << " added @ " << algoNode << endmsg;
610  registerIODataObjects(algo);
611  }
612 
613  parentNode->addDaughterNode( algoNode );
614  algoNode->addParentNode( parentNode );
615  } else {
616  sc = StatusCode::FAILURE;
617  error() << "Decision hub node " << parentName << ", requested to be parent, is not registered."
618  << endmsg;
619  }
620 
621  return sc;
622  }
623 
624  //---------------------------------------------------------------------------
626  {
627 
628  return m_algoNameToAlgoNodeMap.at( algoName );
629  }
630 
631  //---------------------------------------------------------------------------
633  {
634 
635  StatusCode sc;
636 
637  auto itD = m_dataPathToDataNodeMap.find( dataPath );
638  concurrency::DataNode* dataNode;
639  if ( itD != m_dataPathToDataNodeMap.end() ) {
640  dataNode = itD->second;
641  sc = StatusCode::SUCCESS;
642  } else {
643  dataNode = new concurrency::DataNode( *this, dataPath );
644  m_dataPathToDataNodeMap[dataPath] = dataNode;
645  if (msgLevel(MSG::DEBUG))
646  debug() << " DataNode for " << dataPath << " added @ " << dataNode << endmsg;
647  sc = StatusCode::SUCCESS;
648  }
649 
650  return sc;
651  }
652 
653  //---------------------------------------------------------------------------
655  {
656 
657  return m_dataPathToDataNodeMap.at( dataPath );
658  }
659 
660  //---------------------------------------------------------------------------
662  bool modeConcurrent, bool modePromptDecision, bool modeOR, bool allPass)
663  {
664 
666 
667  auto& decisionHubName = decisionHubAlgo->name();
668 
669  auto itP = m_decisionNameToDecisionHubMap.find( parentName );
670  concurrency::DecisionNode* parentNode;
671  if ( itP != m_decisionNameToDecisionHubMap.end() ) {
672  parentNode = itP->second;
673  auto itA = m_decisionNameToDecisionHubMap.find( decisionHubName );
674  concurrency::DecisionNode* decisionHubNode;
675  if ( itA != m_decisionNameToDecisionHubMap.end() ) {
676  decisionHubNode = itA->second;
677  } else {
678  decisionHubNode =
679  new concurrency::DecisionNode( *this, m_nodeCounter, decisionHubName, modeConcurrent, modePromptDecision, modeOR, allPass);
680  ++m_nodeCounter;
681  m_decisionNameToDecisionHubMap[decisionHubName] = decisionHubNode;
682  if (msgLevel(MSG::DEBUG))
683  debug() << "Decision hub node " << decisionHubName << " added @ " << decisionHubNode << endmsg;
684  }
685 
686  parentNode->addDaughterNode( decisionHubNode );
687  decisionHubNode->addParentNode( parentNode );
688  } else {
689  sc = StatusCode::FAILURE;
690  error() << "Decision hub node " << parentName << ", requested to be parent, is not registered."
691  << endmsg;
692  }
693 
694  return sc;
695  }
696 
697  //---------------------------------------------------------------------------
698  void PrecedenceRulesGraph::addHeadNode( const std::string& headName, bool modeConcurrent, bool modePromptDecision, bool modeOR, bool allPass)
699  {
700 
701  auto itH = m_decisionNameToDecisionHubMap.find( headName );
702  if ( itH != m_decisionNameToDecisionHubMap.end() ) {
703  m_headNode = itH->second;
704  } else {
705  m_headNode = new concurrency::DecisionNode( *this, m_nodeCounter, headName, modeConcurrent, modePromptDecision, modeOR, allPass );
706  ++m_nodeCounter;
707  m_decisionNameToDecisionHubMap[headName] = m_headNode;
708  }
709  }
710 
711  //---------------------------------------------------------------------------
713  {
714  m_headNode->updateState( algo_states, node_decisions );
715  }
716 
717  //---------------------------------------------------------------------------
718  void PrecedenceRulesGraph::updateDecision( const std::string& algo_name, const int& slotNum,
719  AlgsExecutionStates& /*algo_states*/, std::vector<int>& /*node_decisions*/ ) const
720  {
721  //if (msgLevel(MSG::DEBUG))
722  // debug() << "(UPDATING)Setting decision of algorithm " << algo_name << " and propagating it upwards.." << endmsg;
723  //getAlgorithmNode( algo_name )->updateDecision( slotNum, algo_states, node_decisions );
724  auto& slot = (*m_eventSlots)[slotNum];
725  auto updater = DecisionUpdater(slot);
726  getAlgorithmNode( algo_name )->accept(updater);
727  }
728 
729  //---------------------------------------------------------------------------
731  {
732 
733  info() << "Starting ranking by data outputs .. " << endmsg;
734  for (auto& pair : m_algoNameToAlgoNodeMap) {
735  if (msgLevel(MSG::DEBUG))
736  debug() << " Ranking " << pair.first << "... " << endmsg;
737  pair.second->accept(ranker);
738  if (msgLevel(MSG::DEBUG))
739  debug() << " ... rank of " << pair.first << ": " << pair.second->getRank() << endmsg;
740  }
741  }
742 
743  //---------------------------------------------------------------------------
745  {
746 
748 
749  for (auto node : m_algoNameToAlgoInputsMap) {
750  DataObjIDColl collection = (node.second);
751  if (collection.empty())
752  result.push_back(getAlgorithmNode(node.first));
753  }
754 
755  return result;
756  }
757 
759  std::ostringstream ost;
760  dumpControlFlow(ost,m_headNode,0);
761  return ost.str();
762  }
763 
765  ControlFlowNode* node,
766  const int& indent) const {
767  ost << std::string(indent*2, ' ');
768  DecisionNode *dn = dynamic_cast<DecisionNode*> (node);
769  AlgorithmNode *an = dynamic_cast<AlgorithmNode*> (node);
770  if ( dn != 0 ) {
771  if (node != m_headNode) {
772  ost << node->getNodeName() << " [Seq] ";
773  ost << ( (dn->m_modeConcurrent) ? " [Concurrent] " : " [Sequential] " );
774  ost << ( (dn->m_modePromptDecision) ? " [Prompt] " : "" );
775  ost << ( (dn->m_modeOR) ? " [OR] " : "" );
776  ost << ( (dn->m_allPass) ? " [PASS] " : "" );
777  ost << "\n";
778  }
779  const std::vector<ControlFlowNode*>& dth = dn->getDaughters();
781  itr != dth.end(); ++itr) {
782  dumpControlFlow(ost,*itr,indent+1);
783  }
784  } else if (an != 0) {
785  ost << node->getNodeName() << " [Alg] ";
786  if (an != 0) {
787  auto ar = an->getAlgorithmRepresentatives();
788  ost << " [n= " << ar.at(0)->cardinality() << "]";
789  ost << ( (! ar.at(0)->isClonable()) ? " [unclonable] " : "" );
790  }
791  ost << "\n";
792  }
793 
794  }
795 
796  //---------------------------------------------------------------------------
798  {
799 
800  const char idt[] = " ";
801  std::ostringstream ost;
802 
803  ost << "\n" << idt << "====================================\n";
804  ost << idt << "Data origins and destinations:\n";
805  ost << idt << "====================================\n";
806 
807  for ( auto& pair : m_dataPathToDataNodeMap ) {
808 
809  for ( auto algoNode : pair.second->getProducers() ) ost << idt << " " << algoNode->getNodeName() << "\n";
810 
811  ost << idt << " V\n";
812  ost << idt << " o " << pair.first << "\n";
813  ost << idt << " V\n";
814 
815  for ( auto algoNode : pair.second->getConsumers() ) ost << idt << " " << algoNode->getNodeName() << "\n";
816 
817  ost << idt << "====================================\n";
818  }
819 
820  return ost.str();
821  }
822 
823  //---------------------------------------------------------------------------
824 
826  {
827  std::ofstream myfile;
828  myfile.open( "ExecutionPlan.graphml", std::ios::app );
829 
830  boost::dynamic_properties dp;
831  dp.property( "name", boost::get( &boost::AlgoNodeStruct::m_name, m_ExecPlan ) );
832  dp.property( "index", boost::get( &boost::AlgoNodeStruct::m_index, m_ExecPlan ) );
833  dp.property( "rank", boost::get( &boost::AlgoNodeStruct::m_rank, m_ExecPlan ) );
834  dp.property( "runtime", boost::get( &boost::AlgoNodeStruct::m_runtime, m_ExecPlan ) );
835 
836  boost::write_graphml( myfile, m_ExecPlan, dp );
837 
838  myfile.close();
839  }
840 
842  {
843 
844  boost::AlgoVertex source;
845  float runtime( 0. );
846  if ( u == nullptr ) {
847  auto itT = m_exec_plan_map.find( "ENTRY" );
848  if ( itT != m_exec_plan_map.end() ) {
849  source = itT->second;
850  } else {
851  source = boost::add_vertex( boost::AlgoNodeStruct( "ENTRY", -999, -999, 0 ), m_ExecPlan );
852  m_exec_plan_map["ENTRY"] = source;
853  }
854  } else {
855  auto itS = m_exec_plan_map.find( u->getNodeName() );
856  if ( itS != m_exec_plan_map.end() ) {
857  source = itS->second;
858  } else {
859  auto alg = dynamic_cast<Algorithm*>( u->getAlgorithmRepresentatives()[0] );
860  if ( alg == 0 ) {
861  fatal() << "could not convert IAlgorithm to Algorithm!" << endmsg;
862  } else {
863  try {
864  const Gaudi::Details::PropertyBase& p = alg->getProperty( "AvgRuntime" );
865  runtime = std::stof( p.toString() );
866  } catch(...) {
867  if (msgLevel(MSG::DEBUG))
868  debug() << "no AvgRuntime for " << alg->name() << endmsg;
869  runtime = 1.;
870  }
871  }
872  source = boost::add_vertex( boost::AlgoNodeStruct( u->getNodeName(), u->getAlgoIndex(), u->getRank(), runtime ),
873  m_ExecPlan );
874  m_exec_plan_map[u->getNodeName()] = source;
875  }
876  }
877 
878  boost::AlgoVertex target;
879  auto itP = m_exec_plan_map.find( v->getNodeName() );
880  if ( itP != m_exec_plan_map.end() ) {
881  target = itP->second;
882  } else {
883  auto alg = dynamic_cast<Algorithm*>( v->getAlgorithmRepresentatives()[0] );
884  if ( alg == 0 ) {
885  fatal() << "could not convert IAlgorithm to Algorithm!" << endmsg;
886  } else {
887  try {
888  const Gaudi::Details::PropertyBase& p = alg->getProperty( "AvgRuntime" );
889  runtime = std::stof( p.toString() );
890  } catch(...) {
891  if (msgLevel(MSG::DEBUG))
892  debug() << "no AvgRuntime for " << alg->name() << endmsg;
893  runtime = 1.;
894  }
895  }
896  target = boost::add_vertex( boost::AlgoNodeStruct( v->getNodeName(), v->getAlgoIndex(), v->getRank(), runtime ),
897  m_ExecPlan );
898  m_exec_plan_map[v->getNodeName()] = target;
899  }
900 
901  if (msgLevel(MSG::DEBUG))
902  debug() << "Edge added to execution plan" << endmsg;
903  boost::add_edge(source, target, m_ExecPlan);
904  }
905 
906 } // namespace
void dumpExecutionPlan()
dump to file encountered execution plan
bool promoteToControlReadyState(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
XXX: CF tests.
const unsigned int & getAlgoIndex() const
XXX: CF tests.
T empty(T...args)
T open(T...args)
StatusCode addAlgorithmNode(Algorithm *daughterAlgo, const std::string &parentName, bool inverted, bool allPass)
Add algorithm node.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:750
void addDaughterNode(ControlFlowNode *node)
Add a daughter node.
bool dataDependenciesSatisfied(const int &slotNum) const
Method to check whether the Algorithm has its all data dependency satisfied.
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
virtual bool visit(DecisionNode &)
Definition: IGraphVisitor.h:17
T stof(T...args)
T endl(T...args)
StatusCode addDecisionHubNode(Algorithm *daughterAlgo, const std::string &parentName, bool modeConcurrent, bool modePromptDecision, bool modeOR, bool allPass)
Add a node, which aggregates decisions of direct daughter nodes.
const std::vector< IAlgorithm * > & getAlgorithmRepresentatives() const
get Algorithm representatives
void addHeadNode(const std::string &headName, bool modeConcurrent, bool modePromptDecision, bool modeOR, bool allPass)
Add a node, which has no parents.
void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const override
XXX: CF tests.
std::string stateToString(const int &stateId) const
Translation between state id and name.
T end(T...args)
void addEdgeToExecutionPlan(const AlgorithmNode *u, const AlgorithmNode *v)
set cause-effect connection between two algorithms in the execution plan
StatusCode addDataNode(const DataObjID &dataPath)
Add DataNode that represents DataObject.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
void rankAlgorithms(IGraphVisitor &ranker) const
Rank Algorithm nodes by the number of data outputs.
bool m_allPass
Whether always passing regardless of daughter results.
virtual std::string toString() const =0
value -> string
virtual bool visitEnter(DecisionNode &) const
Definition: IGraphVisitor.h:16
STL class.
bool m_modeOR
Whether acting as "and" (false) or "or" node (true)
void addInputDataNode(DataNode *node)
Associate an AlgorithmNode, which is a data consumer of this one.
bool accept(IGraphVisitor &visitor) override
T push_back(T...args)
STL class.
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
A method to update algorithm node decision, and propagate it upwards.
const float & getRank() const
Get Algorithm rank.
void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const override
XXX: CF tests.
std::vector< EventSlot > * m_eventSlots
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
PrecedenceRulesGraph * m_graph
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
const DataObjIDColl & inputDataObjs() const override
std::string dumpDataFlow() const
Print out all data origins and destinations, as reflected in the EF graph.
T close(T...args)
const std::vector< ControlFlowNode * > & getDaughters() const
graph_traits< ExecPlan >::vertex_descriptor AlgoVertex
void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map) override
Initialize.
PropertyBase base class allowing PropertyBase* collections to be "homogeneous".
Definition: Property.h:32
const std::vector< AlgorithmNode * > getDataIndependentNodes() const
DataNode * getDataNode(const DataObjID &dataPath) const
Get DataNode by DataObject path using graph index.
bool m_modePromptDecision
Whether to evaluate the hub decision ASA its child decisions allow to do that.
StatusCode buildAugmentedDataDependenciesRealm()
Build data dependency realm WITH data object nodes participating.
AlgsExecutionStates & getAlgoStates(const int &slotNum) const
void addOutputDataNode(DataNode *node)
Associate an AlgorithmNode, which is a data supplier for this one.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find(T...args)
StatusCode initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize graph.
std::vector< InputHandle_t< In > > m_inputs
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
bool isIOBound() const
Definition: Algorithm.h:474
T begin(T...args)
void registerIODataObjects(const Algorithm *algo)
Register algorithm in the Data Dependency index.
bool m_modeConcurrent
Whether all daughters will be evaluated concurrently or sequentially.
int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
Method to set algos to CONTROLREADY, if possible.
void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const override
Print a string representing the control flow state.
void updateEventState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const
XXX CF tests. Is needed for older CF implementation.
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
bool accept(IGraphVisitor &visitor) override
int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
Method to set algos to CONTROLREADY, if possible.
bool promoteToControlReadyState(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
XXX: CF tests. Method to set algos to CONTROLREADY, if possible.
const std::string & getNodeName() const
void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const override
Print a string representing the control flow state.
StatusCode buildDataDependenciesRealm()
Build data dependency realm WITHOUT data object nodes: just interconnect algorithm nodes directly...
void ignore() const
Definition: StatusCode.h:106
std::string dumpControlFlow() const
Print out control flow of Algorithms and Sequences.
~DecisionNode() override
Destructor.
State
Execution states of the algorithms.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map) override
Initialize.
bool promoteToDataReadyState(const int &slotNum, const AlgorithmNode *requestor=nullptr) const
static std::map< State, std::string > stateNames
void addConsumerNode(AlgorithmNode *node)
Associate an AlgorithmNode, which is a data consumer of this one.
StatusCode updateState(unsigned int iAlgo, State newState)