All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
PrecedenceRulesGraph.cpp
Go to the documentation of this file.
1 #include "PrecedenceRulesGraph.h"
2 #include "PRGraphVisitors.h"
3 
5 
6 namespace concurrency
7 {
8 
9  //---------------------------------------------------------------------------
10  std::string ControlFlowNode::stateToString( const int& stateId ) const
11  {
12 
13  if ( 0 == stateId )
14  return "FALSE";
15  else if ( 1 == stateId )
16  return "TRUE";
17  else
18  return "UNDEFINED";
19  }
20 
21  //---------------------------------------------------------------------------
23  {
24 
25  for ( auto node : m_children ) delete node;
26  }
27 
28  //---------------------------------------------------------------------------
30  {
31 
32  for ( auto daughter : m_children ) daughter->initialize( algname_index_map );
33  }
34 
35  //---------------------------------------------------------------------------
37  {
38 
39  if ( std::find( m_parents.begin(), m_parents.end(), node ) == m_parents.end() ) m_parents.push_back( node );
40  }
41 
42  //--------------------------------------------------------------------------
44  {
45 
46  if ( std::find( m_children.begin(), m_children.end(), node ) == m_children.end() ) m_children.push_back( node );
47  }
48 
49  //---------------------------------------------------------------------------
51  const std::vector<int>& node_decisions, const unsigned int& recursionLevel ) const
52  {
53 
54  output << std::string( recursionLevel, ' ' ) << m_nodeName << " (" << m_nodeIndex << ")"
55  << ", w/ decision: " << stateToString( node_decisions[m_nodeIndex] ) << "(" << node_decisions[m_nodeIndex]
56  << ")" << std::endl;
57  for ( auto daughter : m_children ) {
58  daughter->printState( output, states, node_decisions, recursionLevel + 2 );
59  }
60  }
61 
62  //---------------------------------------------------------------------------
63  int DecisionNode::updateState( AlgsExecutionStates& states, std::vector<int>& node_decisions ) const
64  {
65  // check whether we already had a result earlier
66  // if (-1 != node_decisions[m_nodeIndex] ) { return node_decisions[m_nodeIndex]; }
67  int decision = ( ( m_allPass && m_modePromptDecision ) ? 1 : -1 );
68  bool hasUndecidedChild = false;
69  for ( auto daughter : m_children ) {
70  if ( m_modePromptDecision && ( -1 != decision || hasUndecidedChild ) ) {
71  node_decisions[m_nodeIndex] = decision;
72  return decision;
73  } // if prompt decision, return once result is known already or we can't fully evaluate right now because one daugther
74  // decision is missing still
75  auto res = daughter->updateState( states, node_decisions );
76  if ( -1 == res ) {
77  hasUndecidedChild = true;
78  } else if ( false == m_modeOR && res == 0 ) {
79  decision = 0;
80  } // "and"-mode (once first result false, the overall decision is false)
81  else if ( true == m_modeOR && res == 1 ) {
82  decision = 1;
83  } // "or"-mode (once first result true, the overall decision is true)
84  }
85  // what to do with yet undefined answers depends on whether AND or OR mode applies
86  if ( !hasUndecidedChild && -1 == decision ) {
87  // OR mode: all results known, and none true -> reject
88  if ( true == m_modeOR ) {
89  decision = 0;
90  }
91  // AND mode: all results known, and no false -> accept
92  else {
93  decision = 1;
94  }
95  }
96  // in all other cases I stay with previous decisions
97  node_decisions[m_nodeIndex] = decision;
98  if ( m_allPass ) decision = 1;
99  return decision;
100  }
101 
102  //---------------------------------------------------------------------------
103  void DecisionNode::updateDecision( const int& slotNum, AlgsExecutionStates& states, std::vector<int>& node_decisions,
104  const AlgorithmNode* requestor ) const
105  {
106 
107  int decision = ( ( m_allPass && m_modePromptDecision ) ? 1 : -1 );
108  bool keepGoing = true;
109  bool hasUndecidedChild = false;
110  // std::cout << "++++++++++++++++++++BEGIN(UPDATING)++++++++++++++++++++" << std::endl;
111  // std::cout << "UPDATING DAUGHTERS of DECISION NODE: " << m_nodeName << std::endl;
112 
113  for ( auto daughter : m_children ) {
114  // if prompt decision, return once result is known already or we can't fully evaluate
115  // right now because one daughter decision is missing still
116  // std::cout << "----UPDATING DAUGHTER: " << daughter->getNodeName() << std::endl;
117  if ( m_modePromptDecision && !keepGoing ) {
118  node_decisions[m_nodeIndex] = decision;
119  // std::cout << "STOPPING ITERATION OVER (UPDATING) DECISION NODE CHILDREN: " << m_nodeName << std::endl;
120  break;
121  // return;
122  }
123 
124  // modified
125  int& res = node_decisions[daughter->getNodeIndex()];
126  if ( -1 == res ) {
127  hasUndecidedChild = true;
128  if ( typeid( *daughter ) != typeid( concurrency::DecisionNode ) ) {
129  auto algod = (AlgorithmNode*)daughter;
130  algod->promoteToControlReadyState( slotNum, states, node_decisions );
131  bool result = algod->promoteToDataReadyState( slotNum, requestor );
132  if ( result ) keepGoing = false;
133  } else {
134  daughter->updateDecision( slotNum, states, node_decisions, requestor );
135  }
136 
137  // "and"-mode (once first result false, the overall decision is false)
138  } else if ( false == m_modeOR && res == 0 ) {
139  decision = 0;
140  keepGoing = false;
141  // "or"-mode (once first result true, the overall decision is true)
142  } else if ( true == m_modeOR && res == 1 ) {
143  decision = 1;
144  keepGoing = false;
145  }
146  }
147 
148  // what to do with yet undefined answers depends on whether AND or OR mode applies
149  if ( !hasUndecidedChild && -1 == decision ) {
150  // OR mode: all results known, and none true -> reject
151  if ( true == m_modeOR ) {
152  decision = 0;
153  // AND mode: all results known, and no false -> accept
154  } else {
155  decision = 1;
156  }
157  }
158 
159  // in all other cases I stay with previous decisions
160  node_decisions[m_nodeIndex] = decision;
161 
162  // propagate decision upwards through the decision graph
163  if ( -1 != decision )
164  for ( auto p : m_parents ) p->updateDecision( slotNum, states, node_decisions, requestor );
165 
166  // std::cout << "++++++++++++++++++++END(UPDATING)++++++++++++++++++++" << std::endl;
167  }
168 
169  //---------------------------------------------------------------------------
171  std::vector<int>& node_decisions ) const
172  {
173  // std::cout << "REACHED DECISNODE " << m_nodeName << std::endl;
174  if ( -1 != node_decisions[m_nodeIndex] ) {
175  return true;
176  }
177 
178  for ( auto daughter : m_children ) {
179  auto res = node_decisions[daughter->getNodeIndex()];
180  if ( -1 == res ) {
181  daughter->promoteToControlReadyState( slotNum, states, node_decisions );
182  if ( m_modePromptDecision ) return true;
183  } else if ( m_modePromptDecision ) {
184  if ( ( false == m_modeOR && res == 0 ) || ( true == m_modeOR && res == 1 ) ) return true;
185  }
186  }
187 
188  return true;
189  }
190 
191  //---------------------------------------------------------------------------
193  {
194 
195  if ( visitor.visitEnter( *this ) ) {
196  // try to aggregate a decision
197  bool result = visitor.visit( *this );
198 
199  // if a decision was made for this node, propagate the result upwards
200  if ( result ) {
201  for ( auto parent : m_parents ) {
202  parent->accept( visitor );
203  }
204  return false;
205  }
206 
207  // if no decision can be made yet, request further information downwards
208  for ( auto child : m_children ) {
209  bool result = child->accept( visitor );
210  if (!m_modeConcurrent)
211  if ( result ) break; //stop on first unresolved child if its decision hub is sequential
212  }
213 
214  return true; // visitor was accepted to try to aggregate the node's decision
215  }
216 
217  return false; // visitor was rejected (since the decision node has an aggregated decision already)
218  }
219 
220  //---------------------------------------------------------------------------
222  {
223 
224  for ( auto node : m_outputs ) {
225  delete node;
226  }
227  }
228 
229  //---------------------------------------------------------------------------
231  {
232 
233  m_algoIndex = algname_index_map.at( m_algoName );
234  }
235 
236  //---------------------------------------------------------------------------
237  bool AlgorithmNode::promoteToControlReadyState( const int& /*slotNum*/, AlgsExecutionStates& states,
238  std::vector<int>& /*node_decisions*/ ) const
239  {
240 
241  auto& state = states[m_algoIndex];
242  bool result = false;
243 
244  if ( State::INITIAL == state ) {
245  states.updateState( m_algoIndex, State::CONTROLREADY ).ignore();
246  // std::cout << "----> UPDATING ALGORITHM to CONTROLREADY: " << m_algoName << std::endl;
247  result = true;
248  } else if ( State::CONTROLREADY == state ) {
249  result = true;
250  }
251 
252  return result;
253  }
254 
255  //---------------------------------------------------------------------------
256  bool AlgorithmNode::promoteToDataReadyState( const int& slotNum, const AlgorithmNode* /*requestor*/ ) const
257  {
258 
259  auto& states = m_graph->getAlgoStates( slotNum );
260  auto& state = states[m_algoIndex];
261  bool result = false;
262 
263  if ( State::CONTROLREADY == state ) {
264  if ( dataDependenciesSatisfied( slotNum ) ) {
265  // std::cout << "----> UPDATING ALGORITHM to DATAREADY: " << m_algoName << std::endl;
266  states.updateState( m_algoIndex, State::DATAREADY ).ignore();
267  result = true;
268 
269  // m_graph->addEdgeToExecutionPlan(requestor, this);
270 
271  /*
272  auto xtime = std::chrono::high_resolution_clock::now();
273  std::stringstream s;
274  s << getNodeName() << ", "
275  << (xtime-m_graph->getInitTime()).count() << "\n";
276  std::ofstream myfile;
277  myfile.open("DRTiming.csv", std::ios::app);
278  myfile << s.str();
279  myfile.close();
280  */
281  }
282  } else if ( State::DATAREADY == state ) {
283  result = true;
284  } else if ( State::SCHEDULED == state ) {
285  result = true;
286  }
287 
288  return result;
289  }
290 
291  //---------------------------------------------------------------------------
292  bool AlgorithmNode::dataDependenciesSatisfied( const int& slotNum ) const
293  {
294 
295  bool result = true; //return true if an algorithm has no data inputs
296  auto& states = m_graph->getAlgoStates( slotNum );
297 
298  for ( auto dataNode : m_inputs ) {
299  // return false if the input has no producers at all (normally this case must be
300  // forbidden, and must be invalidated at configuration time)
301  result = false;
302  for ( auto algoNode : dataNode->getProducers() )
303  if ( State::EVTACCEPTED == states[algoNode->getAlgoIndex()] ) {
304  result = true;
305  break; // skip checking other producers if one was found to be executed
306  }
307 
308  if (!result) break; // skip checking other inputs if this input was not produced yet
309  }
310 
311  return result;
312  }
313 
314  //---------------------------------------------------------------------------
316  {
317 
318  bool result = true;
319  for ( auto dataNode : m_inputs ) {
320 
321  result = false;
322  for ( auto algoNode : dataNode->getProducers() )
323  if ( State::EVTACCEPTED == states[algoNode->getAlgoIndex()] ) {
324  result = true;
325  break;
326  }
327 
328  if ( !result ) break;
329  }
330 
331  return result;
332  }
333 
334  //---------------------------------------------------------------------------
336  const std::vector<int>& node_decisions, const unsigned int& recursionLevel ) const
337  {
338  output << std::string( recursionLevel, ' ' ) << m_nodeName << " (" << m_nodeIndex << ")"
339  << ", w/ decision: " << stateToString( node_decisions[m_nodeIndex] ) << "(" << node_decisions[m_nodeIndex]
340  << ")"
341  << ", in state: " << AlgsExecutionStates::stateNames[states[m_algoIndex]] << std::endl;
342  }
343 
344  //---------------------------------------------------------------------------
346  {
347  // check whether we already had a result earlier
348  // if (-1 != node_decisions[m_nodeIndex] ) { return node_decisions[m_nodeIndex]; }
349  // since we reached this point in the control flow, this algorithm is supposed to run
350  // if it hasn't already
351  const State& state = states[m_algoIndex];
352  unsigned int decision = -1;
353  if ( State::INITIAL == state ) {
354  states.updateState( m_algoIndex, State::CONTROLREADY ).ignore();
355  }
356  // now derive the proper result to pass back
357  if ( true == m_allPass ) {
358  decision = 1;
359  } else if ( State::EVTACCEPTED == state ) {
360  decision = !m_inverted;
361  } else if ( State::EVTREJECTED == state ) {
362  decision = m_inverted;
363  } else {
364  decision = -1; // result not known yet
365  }
366  node_decisions[m_nodeIndex] = decision;
367  return decision;
368  }
369 
370  //---------------------------------------------------------------------------
371  void AlgorithmNode::updateDecision( const int& slotNum, AlgsExecutionStates& states, std::vector<int>& node_decisions,
372  const AlgorithmNode* requestor ) const
373  {
374 
375  const State& state = states[m_algoIndex];
376  int decision = -1;
377  requestor = this;
378 
379  // now derive the proper result to pass back
380  if ( true == m_allPass ) {
381  decision = 1;
382  } else if ( State::EVTACCEPTED == state ) {
383  decision = !m_inverted;
384  } else if ( State::EVTREJECTED == state ) {
385  decision = m_inverted;
386  } else {
387  decision = -1; // result not known yet
388  }
389 
390  node_decisions[m_nodeIndex] = decision;
391 
392  if ( -1 != decision ) {
393  for ( auto output : m_outputs )
394  for ( auto consumer : output->getConsumers() ) consumer->promoteToDataReadyState( slotNum, requestor );
395 
396  auto vis = concurrency::Supervisor( slotNum );
397  for ( auto p : m_parents ) {
398  //p->updateDecision( slotNum, states, node_decisions, requestor );
399  p->accept(vis);
400  }
401 
402  }
403  }
404 
405  //---------------------------------------------------------------------------
407  {
408 
409  if ( visitor.visitEnter( *this ) ) {
410  visitor.visit( *this );
411  return true; // visitor was accepted to promote the algorithm
412  }
413 
414  return false; // visitor was rejected (since the algorithm already produced a decision)
415  }
416 
417  //---------------------------------------------------------------------------
419  {
420 
421  if ( std::find( m_parents.begin(), m_parents.end(), node ) == m_parents.end() ) m_parents.push_back( node );
422  }
423 
424  //---------------------------------------------------------------------------
426  {
427 
428  if ( std::find( m_outputs.begin(), m_outputs.end(), node ) == m_outputs.end() ) m_outputs.push_back( node );
429  }
430 
431  //---------------------------------------------------------------------------
433  {
434 
435  if ( std::find( m_inputs.begin(), m_inputs.end(), node ) == m_inputs.end() ) m_inputs.push_back( node );
436  }
437 
438  //---------------------------------------------------------------------------
440  {
441 
442  m_headNode->initialize( algname_index_map );
443  // StatusCode sc = buildDataDependenciesRealm();
444  StatusCode sc = buildAugmentedDataDependenciesRealm();
445 
446  if ( !sc.isSuccess() ) error() << "Could not build the data dependency realm." << endmsg;
447 
448  return sc;
449  }
450 
451  //---------------------------------------------------------------------------
453  std::vector<EventSlot>& eventSlots )
454  {
455 
456  m_eventSlots = &eventSlots;
457  m_headNode->initialize( algname_index_map );
458  // StatusCode sc = buildDataDependenciesRealm();
459  StatusCode sc = buildAugmentedDataDependenciesRealm();
460 
461  if ( !sc.isSuccess() ) error() << "Could not build the data dependency realm." << endmsg;
462 
463  if (msgLevel(MSG::DEBUG))
464  debug() << dumpDataFlow() << endmsg;
465 
466  return sc;
467  }
468 
469  //---------------------------------------------------------------------------
471  {
472 
473  const std::string& algoName = algo->name();
474 
475  DataObjIDColl inputObjs, outputObjs;
476  DHHVisitor avis( inputObjs, outputObjs );
477  algo->acceptDHVisitor( &avis );
478 
479  m_algoNameToAlgoInputsMap[algoName] = inputObjs;
480  m_algoNameToAlgoOutputsMap[algoName] = outputObjs;
481 
482  if (msgLevel(MSG::DEBUG)) {
483  debug() << "Inputs of " << algoName << ": ";
484  for (auto tag : inputObjs)
485  debug() << tag << " | ";
486  debug() << endmsg;
487 
488  debug() << "Outputs of " << algoName << ": ";
489  for (auto tag : outputObjs)
490  debug() << tag << " | ";
491  debug() << endmsg;
492  }
493  }
494 
495  //---------------------------------------------------------------------------
497  {
498 
499  StatusCode global_sc( StatusCode::SUCCESS );
500 
501  for ( auto algo : m_algoNameToAlgoNodeMap ) {
502 
503  auto targetNode = m_algoNameToAlgoNodeMap[algo.first];
504 
505  // Find producers for all the inputs of the target node
506  auto& targetInCollection = m_algoNameToAlgoInputsMap[algo.first];
507  for (auto inputTag : targetInCollection) {
508  for (auto producer : m_algoNameToAlgoOutputsMap) {
509  auto& outputs = m_algoNameToAlgoOutputsMap[producer.first];
510  for (auto outputTag : outputs) {
511  if (inputTag == outputTag) {
512  auto& known_producers = targetNode->getSupplierNodes();
513  auto valid_producer = m_algoNameToAlgoNodeMap[producer.first];
514  auto& known_consumers = valid_producer->getConsumerNodes();
515  if ( std::find( known_producers.begin(), known_producers.end(), valid_producer ) ==
516  known_producers.end() )
517  targetNode->addSupplierNode( valid_producer );
518  if ( std::find( known_consumers.begin(), known_consumers.end(), targetNode ) == known_consumers.end() )
519  valid_producer->addConsumerNode( targetNode );
520  }
521  }
522  }
523  }
524 
525  // Find consumers for all the outputs of the target node
526  auto& targetOutCollection = m_algoNameToAlgoOutputsMap[algo.first];
527  for (auto outputTag : targetOutCollection) {
528  for (auto consumer : m_algoNameToAlgoInputsMap) {
529  auto& inputs = m_algoNameToAlgoInputsMap[consumer.first];
530  for (auto inputTag : inputs) {
531  if (inputTag == outputTag) {
532  auto& known_consumers = targetNode->getConsumerNodes();
533  auto valid_consumer = m_algoNameToAlgoNodeMap[consumer.first];
534  auto& known_producers = valid_consumer->getSupplierNodes();
535  if ( std::find( known_producers.begin(), known_producers.end(), targetNode ) == known_producers.end() )
536  valid_consumer->addSupplierNode( targetNode );
537  if ( std::find( known_consumers.begin(), known_consumers.end(), valid_consumer ) ==
538  known_consumers.end() )
539  targetNode->addConsumerNode( valid_consumer );
540  }
541  }
542  }
543  }
544  }
545  return global_sc;
546  }
547 
548  //---------------------------------------------------------------------------
550  {
551 
552  StatusCode global_sc( StatusCode::SUCCESS, true );
553 
554  // Create the DataObjects (DO) realm (represented by DataNodes in the graph),
555  // connected to DO producers (AlgorithmNodes)
556  for (auto algo : m_algoNameToAlgoNodeMap) {
557 
558  auto& outCollection = m_algoNameToAlgoOutputsMap[algo.first];
559  for (auto outputTag : outCollection) {
560  const auto sc = addDataNode(outputTag);
561  if (!sc.isSuccess()) {
562  error() << "Extra producer (" << algo.first << ") for DataObject @ "
563  << outputTag
564  << " has been detected: this is not allowed." << endmsg;
565  global_sc = sc;
566  }
567  auto dataNode = getDataNode(outputTag);
568  dataNode->addProducerNode(algo.second);
569  algo.second->addOutputDataNode(dataNode);
570  }
571  }
572 
573  // Connect previously created DO realm to DO consumers (AlgorithmNodes)
574  for ( auto algo : m_algoNameToAlgoNodeMap ) {
575  auto& inCollection = m_algoNameToAlgoInputsMap[algo.first];
576  for (auto inputTag : inCollection) {
577  DataNode* dataNode = nullptr;
578  auto primaryPath = inputTag;
579  auto itP = m_dataPathToDataNodeMap.find(primaryPath);
580  if (itP != m_dataPathToDataNodeMap.end()) {
581  dataNode = getDataNode(primaryPath);
582  //if (!inCollection[inputTag].alternativeDataProductNames().empty())
583  // warning() << "Dropping all alternative data dependencies in the graph, but '" << primaryPath
584  // << "', for algorithm " << algo.first << endmsg;
585  //} else {
586  // for (auto alterPath : inCollection[inputTag].alternativeDataProductNames()) {
587  // auto itAP = m_dataPathToDataNodeMap.find(alterPath);
588  // if (itAP != m_dataPathToDataNodeMap.end()) {
589  // dataNode = getDataNode(alterPath);
590  // warning() << "Dropping all alternative data dependencies in the graph, but '" << alterPath
591  // << "', for algorithm " << algo.first << endmsg;
592  // break;
593  // }
594  //}
595  }
596 
597  if (dataNode) {
598  dataNode->addConsumerNode(algo.second);
599  algo.second->addInputDataNode(dataNode);
600  }
601  }
602  }
603 
604  return global_sc;
605  }
606 
607  //---------------------------------------------------------------------------
608  StatusCode PrecedenceRulesGraph::addAlgorithmNode( Algorithm* algo, const std::string& parentName, bool inverted,
609  bool allPass )
610  {
611 
613 
614  auto& algoName = algo->name();
615 
616  auto itP = m_decisionNameToDecisionHubMap.find( parentName );
617  concurrency::DecisionNode* parentNode;
618  if ( itP != m_decisionNameToDecisionHubMap.end() ) {
619  parentNode = itP->second;
620  auto itA = m_algoNameToAlgoNodeMap.find( algoName );
621  concurrency::AlgorithmNode* algoNode;
622  if ( itA != m_algoNameToAlgoNodeMap.end() ) {
623  algoNode = itA->second;
624  } else {
625  algoNode = new concurrency::AlgorithmNode( *this, m_nodeCounter, algoName, inverted, allPass, algo->isIOBound() );
626  ++m_nodeCounter;
627  m_algoNameToAlgoNodeMap[algoName] = algoNode;
628  if (msgLevel(MSG::DEBUG))
629  debug() << "AlgoNode " << algoName << " added @ " << algoNode << endmsg;
630  registerIODataObjects(algo);
631  }
632 
633  parentNode->addDaughterNode( algoNode );
634  algoNode->addParentNode( parentNode );
635  } else {
636  sc = StatusCode::FAILURE;
637  error() << "Decision hub node " << parentName << ", requested to be parent, is not registered."
638  << endmsg;
639  }
640 
641  return sc;
642  }
643 
644  //---------------------------------------------------------------------------
646  {
647 
648  return m_algoNameToAlgoNodeMap.at( algoName );
649  }
650 
651  //---------------------------------------------------------------------------
653  {
654 
655  StatusCode sc;
656 
657  auto itD = m_dataPathToDataNodeMap.find( dataPath );
658  concurrency::DataNode* dataNode;
659  if ( itD != m_dataPathToDataNodeMap.end() ) {
660  dataNode = itD->second;
661  sc = StatusCode::SUCCESS;
662  } else {
663  dataNode = new concurrency::DataNode( *this, dataPath );
664  m_dataPathToDataNodeMap[dataPath] = dataNode;
665  if (msgLevel(MSG::DEBUG))
666  debug() << " DataNode for " << dataPath << " added @ " << dataNode << endmsg;
667  sc = StatusCode::SUCCESS;
668  }
669 
670  return sc;
671  }
672 
673  //---------------------------------------------------------------------------
675  {
676 
677  return m_dataPathToDataNodeMap.at( dataPath );
678  }
679 
680  //---------------------------------------------------------------------------
682  bool modeConcurrent, bool modePromptDecision, bool modeOR, bool allPass)
683  {
684 
686 
687  auto& decisionHubName = decisionHubAlgo->name();
688 
689  auto itP = m_decisionNameToDecisionHubMap.find( parentName );
690  concurrency::DecisionNode* parentNode;
691  if ( itP != m_decisionNameToDecisionHubMap.end() ) {
692  parentNode = itP->second;
693  auto itA = m_decisionNameToDecisionHubMap.find( decisionHubName );
694  concurrency::DecisionNode* decisionHubNode;
695  if ( itA != m_decisionNameToDecisionHubMap.end() ) {
696  decisionHubNode = itA->second;
697  } else {
698  decisionHubNode =
699  new concurrency::DecisionNode( *this, m_nodeCounter, decisionHubName, modeConcurrent, modePromptDecision, modeOR, allPass);
700  ++m_nodeCounter;
701  m_decisionNameToDecisionHubMap[decisionHubName] = decisionHubNode;
702  if (msgLevel(MSG::DEBUG))
703  debug() << "Decision hub node " << decisionHubName << " added @ " << decisionHubNode << endmsg;
704  }
705 
706  parentNode->addDaughterNode( decisionHubNode );
707  decisionHubNode->addParentNode( parentNode );
708  } else {
709  sc = StatusCode::FAILURE;
710  error() << "Decision hub node " << parentName << ", requested to be parent, is not registered."
711  << endmsg;
712  }
713 
714  return sc;
715  }
716 
717  //---------------------------------------------------------------------------
718  void PrecedenceRulesGraph::addHeadNode( const std::string& headName, bool modeConcurrent, bool modePromptDecision, bool modeOR, bool allPass)
719  {
720 
721  auto itH = m_decisionNameToDecisionHubMap.find( headName );
722  if ( itH != m_decisionNameToDecisionHubMap.end() ) {
723  m_headNode = itH->second;
724  } else {
725  m_headNode = new concurrency::DecisionNode( *this, m_nodeCounter, headName, modeConcurrent, modePromptDecision, modeOR, allPass );
726  ++m_nodeCounter;
727  m_decisionNameToDecisionHubMap[headName] = m_headNode;
728  }
729  }
730 
731  //---------------------------------------------------------------------------
733  {
734  m_headNode->updateState( algo_states, node_decisions );
735  }
736 
737  //---------------------------------------------------------------------------
738  void PrecedenceRulesGraph::updateDecision( const std::string& algo_name, const int& slotNum,
739  AlgsExecutionStates& algo_states, std::vector<int>& node_decisions ) const
740  {
741  //if (msgLevel(MSG::DEBUG))
742  // debug() << "(UPDATING)Setting decision of algorithm " << algo_name << " and propagating it upwards.." << endmsg;
743  getAlgorithmNode( algo_name )->updateDecision( slotNum, algo_states, node_decisions );
744  }
745 
746  //---------------------------------------------------------------------------
748  {
749 
750  info() << "Starting ranking by data outputs .. " << endmsg;
751  for (auto& pair : m_algoNameToAlgoNodeMap) {
752  if (msgLevel(MSG::DEBUG))
753  debug() << " Ranking " << pair.first << "... " << endmsg;
754  pair.second->accept(ranker);
755  if (msgLevel(MSG::DEBUG))
756  debug() << " ... rank of " << pair.first << ": " << pair.second->getRank() << endmsg;
757  }
758  }
759 
760  //---------------------------------------------------------------------------
762  {
763 
765 
766  for (auto node : m_algoNameToAlgoInputsMap) {
767  DataObjIDColl collection = (node.second);
768  if (collection.empty())
769  result.push_back(getAlgorithmNode(node.first));
770  }
771 
772  return result;
773  }
774 
775  //---------------------------------------------------------------------------
777  {
778 
779  const char idt[] = " ";
780  std::ostringstream ost;
781 
782  ost << "\n" << idt << "====================================\n";
783  ost << idt << "Data origins and destinations:\n";
784  ost << idt << "====================================\n";
785 
786  for ( auto& pair : m_dataPathToDataNodeMap ) {
787 
788  for ( auto algoNode : pair.second->getProducers() ) ost << idt << " " << algoNode->getNodeName() << "\n";
789 
790  ost << idt << " V\n";
791  ost << idt << " o " << pair.first << "\n";
792  ost << idt << " V\n";
793 
794  for ( auto algoNode : pair.second->getConsumers() ) ost << idt << " " << algoNode->getNodeName() << "\n";
795 
796  ost << idt << "====================================\n";
797  }
798 
799  return ost.str();
800  }
801 
802  //---------------------------------------------------------------------------
803 
805  {
806  std::ofstream myfile;
807  myfile.open( "ExecutionPlan.graphml", std::ios::app );
808 
809  boost::dynamic_properties dp;
810  dp.property( "name", boost::get( &boost::AlgoNodeStruct::m_name, m_ExecPlan ) );
811  dp.property( "index", boost::get( &boost::AlgoNodeStruct::m_index, m_ExecPlan ) );
812  dp.property( "rank", boost::get( &boost::AlgoNodeStruct::m_rank, m_ExecPlan ) );
813  dp.property( "runtime", boost::get( &boost::AlgoNodeStruct::m_runtime, m_ExecPlan ) );
814 
815  boost::write_graphml( myfile, m_ExecPlan, dp );
816 
817  myfile.close();
818  }
819 
821  {
822 
823  boost::AlgoVertex source;
824  float runtime( 0. );
825  if ( u == nullptr ) {
826  auto itT = m_exec_plan_map.find( "ENTRY" );
827  if ( itT != m_exec_plan_map.end() ) {
828  source = itT->second;
829  } else {
830  source = boost::add_vertex( boost::AlgoNodeStruct( "ENTRY", -999, -999, 0 ), m_ExecPlan );
831  m_exec_plan_map["ENTRY"] = source;
832  }
833  } else {
834  auto itS = m_exec_plan_map.find( u->getNodeName() );
835  if ( itS != m_exec_plan_map.end() ) {
836  source = itS->second;
837  } else {
838  auto alg = dynamic_cast<Algorithm*>( u->getAlgorithmRepresentatives()[0] );
839  if ( alg == 0 ) {
840  fatal() << "could not convert IAlgorithm to Algorithm!" << endmsg;
841  } else {
842  try {
843  const Gaudi::Details::PropertyBase& p = alg->getProperty( "AvgRuntime" );
844  runtime = std::stof( p.toString() );
845  } catch(...) {
846  if (msgLevel(MSG::DEBUG))
847  debug() << "no AvgRuntime for " << alg->name() << endmsg;
848  runtime = 1.;
849  }
850  }
851  source = boost::add_vertex( boost::AlgoNodeStruct( u->getNodeName(), u->getAlgoIndex(), u->getRank(), runtime ),
852  m_ExecPlan );
853  m_exec_plan_map[u->getNodeName()] = source;
854  }
855  }
856 
857  boost::AlgoVertex target;
858  auto itP = m_exec_plan_map.find( v->getNodeName() );
859  if ( itP != m_exec_plan_map.end() ) {
860  target = itP->second;
861  } else {
862  auto alg = dynamic_cast<Algorithm*>( v->getAlgorithmRepresentatives()[0] );
863  if ( alg == 0 ) {
864  fatal() << "could not convert IAlgorithm to Algorithm!" << endmsg;
865  } else {
866  try {
867  const Gaudi::Details::PropertyBase& p = alg->getProperty( "AvgRuntime" );
868  runtime = std::stof( p.toString() );
869  } catch(...) {
870  if (msgLevel(MSG::DEBUG))
871  debug() << "no AvgRuntime for " << alg->name() << endmsg;
872  runtime = 1.;
873  }
874  }
875  target = boost::add_vertex( boost::AlgoNodeStruct( v->getNodeName(), v->getAlgoIndex(), v->getRank(), runtime ),
876  m_ExecPlan );
877  m_exec_plan_map[v->getNodeName()] = target;
878  }
879 
880  if (msgLevel(MSG::DEBUG))
881  debug() << "Edge added to execution plan" << endmsg;
882  boost::add_edge(source, target, m_ExecPlan);
883  }
884 
885 } // namespace
void dumpExecutionPlan()
dump to file encountered execution plan
bool promoteToControlReadyState(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
XXX: CF tests.
virtual bool visitEnter(DecisionNode &) const =0
const unsigned int & getAlgoIndex() const
XXX: CF tests.
T empty(T...args)
T open(T...args)
StatusCode addAlgorithmNode(Algorithm *daughterAlgo, const std::string &parentName, bool inverted, bool allPass)
Add algorithm node.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:715
void addDaughterNode(ControlFlowNode *node)
Add a daughter node.
virtual void acceptDHVisitor(IDataHandleVisitor *) const override
Definition: Algorithm.cpp:205
bool dataDependenciesSatisfied(const int &slotNum) const
Method to check whether the Algorithm has its all data dependency satisfied.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
T stof(T...args)
T endl(T...args)
StatusCode addDecisionHubNode(Algorithm *daughterAlgo, const std::string &parentName, bool modeConcurrent, bool modePromptDecision, bool modeOR, bool allPass)
Add a node, which aggregates decisions of direct daughter nodes.
const std::vector< IAlgorithm * > & getAlgorithmRepresentatives() const
get Algorithm representatives
void addHeadNode(const std::string &headName, bool modeConcurrent, bool modePromptDecision, bool modeOR, bool allPass)
Add a node, which has no parents.
void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const override
XXX: CF tests.
std::string stateToString(const int &stateId) const
Translation between state id and name.
void addEdgeToExecutionPlan(const AlgorithmNode *u, const AlgorithmNode *v)
set cause-effect connection between two algorithms in the execution plan
StatusCode addDataNode(const DataObjID &dataPath)
Add DataNode that represents DataObject.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
void rankAlgorithms(IGraphVisitor &ranker) const
Rank Algorithm nodes by the number of data outputs.
virtual std::string toString() const =0
value -> string
STL class.
void addInputDataNode(DataNode *node)
Associate an AlgorithmNode, which is a data consumer of this one.
bool accept(IGraphVisitor &visitor) override
T push_back(T...args)
STL class.
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
A method to update algorithm node decision, and propagate it upwards.
const float & getRank() const
Get Algorithm rank.
void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const override
XXX: CF tests.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
PrecedenceRulesGraph * m_graph
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::string dumpDataFlow() const
Print out all data origins and destinations, as reflected in the EF graph.
T close(T...args)
graph_traits< ExecPlan >::vertex_descriptor AlgoVertex
void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map) override
Initialize.
virtual bool visit(DecisionNode &)=0
PropertyBase base class allowing PropertyBase* collections to be "homogeneous".
Definition: Property.h:32
const std::vector< AlgorithmNode * > getDataIndependentNodes() const
DataNode * getDataNode(const DataObjID &dataPath) const
Get DataNode by DataObject path using graph index.
StatusCode buildAugmentedDataDependenciesRealm()
Build data dependency realm WITH data object nodes participating.
AlgsExecutionStates & getAlgoStates(const int &slotNum) const
void addOutputDataNode(DataNode *node)
Associate an AlgorithmNode, which is a data supplier for this one.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find(T...args)
StatusCode initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize graph.
std::vector< InputHandle_t< In > > m_inputs
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
bool isIOBound() const
Definition: Algorithm.h:465
void registerIODataObjects(const Algorithm *algo)
Register algorithm in the Data Dependency index.
int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
Method to set algos to CONTROLREADY, if possible.
void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const override
Print a string representing the control flow state.
void updateEventState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const
XXX CF tests. Is needed for older CF implementation.
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
bool accept(IGraphVisitor &visitor) override
int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
Method to set algos to CONTROLREADY, if possible.
bool promoteToControlReadyState(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
XXX: CF tests. Method to set algos to CONTROLREADY, if possible.
const std::string & getNodeName() const
void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const override
Print a string representing the control flow state.
StatusCode buildDataDependenciesRealm()
Build data dependency realm WITHOUT data object nodes: just interconnect algorithm nodes directly...
void ignore() const
Definition: StatusCode.h:106
~DecisionNode() override
Destructor.
State
Execution states of the algorithms.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map) override
Initialize.
bool promoteToDataReadyState(const int &slotNum, const AlgorithmNode *requestor=nullptr) const
static std::map< State, std::string > stateNames
void addConsumerNode(AlgorithmNode *node)
Associate an AlgorithmNode, which is a data consumer of this one.
StatusCode updateState(unsigned int iAlgo, State newState)