All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
ExecutionFlowGraph.cpp
Go to the documentation of this file.
1 #include "ExecutionFlowGraph.h"
3 
4 namespace concurrency
5 {
6 
7  //---------------------------------------------------------------------------
8  std::string ControlFlowNode::stateToString( const int& stateId ) const
9  {
10 
11  if ( 0 == stateId )
12  return "FALSE";
13  else if ( 1 == stateId )
14  return "TRUE";
15  else
16  return "UNDEFINED";
17  }
18 
19  //---------------------------------------------------------------------------
21  {
22 
23  for ( auto node : m_children ) delete node;
24  }
25 
26  //---------------------------------------------------------------------------
28  {
29 
30  for ( auto daughter : m_children ) daughter->initialize( algname_index_map );
31  }
32 
33  //---------------------------------------------------------------------------
35  {
36 
37  if ( std::find( m_parents.begin(), m_parents.end(), node ) == m_parents.end() ) m_parents.push_back( node );
38  }
39 
40  //--------------------------------------------------------------------------
42  {
43 
44  if ( std::find( m_children.begin(), m_children.end(), node ) == m_children.end() ) m_children.push_back( node );
45  }
46 
47  //---------------------------------------------------------------------------
49  const std::vector<int>& node_decisions, const unsigned int& recursionLevel ) const
50  {
51 
52  output << std::string( recursionLevel, ' ' ) << m_nodeName << " (" << m_nodeIndex << ")"
53  << ", w/ decision: " << stateToString( node_decisions[m_nodeIndex] ) << "(" << node_decisions[m_nodeIndex]
54  << ")" << std::endl;
55  for ( auto daughter : m_children ) {
56  daughter->printState( output, states, node_decisions, recursionLevel + 2 );
57  }
58  }
59 
60  //---------------------------------------------------------------------------
61  int DecisionNode::updateState( AlgsExecutionStates& states, std::vector<int>& node_decisions ) const
62  {
63  // check whether we already had a result earlier
64  // if (-1 != node_decisions[m_nodeIndex] ) { return node_decisions[m_nodeIndex]; }
65  int decision = ( ( m_allPass && m_isLazy ) ? 1 : -1 );
66  bool hasUndecidedChild = false;
67  for ( auto daughter : m_children ) {
68  if ( m_isLazy && ( -1 != decision || hasUndecidedChild ) ) {
69  node_decisions[m_nodeIndex] = decision;
70  return decision;
71  } // if lazy return once result is known already or we can't fully evaluate right now because one daugther
72  // decision is missing still
73  auto res = daughter->updateState( states, node_decisions );
74  if ( -1 == res ) {
75  hasUndecidedChild = true;
76  } else if ( false == m_modeOR && res == 0 ) {
77  decision = 0;
78  } // "and"-mode (once first result false, the overall decision is false)
79  else if ( true == m_modeOR && res == 1 ) {
80  decision = 1;
81  } // "or"-mode (once first result true, the overall decision is true)
82  }
83  // what to do with yet undefined answers depends on whether AND or OR mode applies
84  if ( !hasUndecidedChild && -1 == decision ) {
85  // OR mode: all results known, and none true -> reject
86  if ( true == m_modeOR ) {
87  decision = 0;
88  }
89  // AND mode: all results known, and no false -> accept
90  else {
91  decision = 1;
92  }
93  }
94  // in all other cases I stay with previous decisions
95  node_decisions[m_nodeIndex] = decision;
96  if ( m_allPass ) decision = 1;
97  return decision;
98  }
99 
100  //---------------------------------------------------------------------------
101  void DecisionNode::updateDecision( const int& slotNum, AlgsExecutionStates& states, std::vector<int>& node_decisions,
102  const AlgorithmNode* requestor ) const
103  {
104 
105  int decision = ( ( m_allPass && m_isLazy ) ? 1 : -1 );
106  bool keepGoing = true;
107  bool hasUndecidedChild = false;
108  // std::cout << "++++++++++++++++++++BEGIN(UPDATING)++++++++++++++++++++" << std::endl;
109  // std::cout << "UPDATING DAUGHTERS of DECISION NODE: " << m_nodeName << std::endl;
110 
111  for ( auto daughter : m_children ) {
112  // if lazy return once result is known already or we can't fully evaluate
113  // right now because one daughter decision is missing still
114  // std::cout << "----UPDATING DAUGHTER: " << daughter->getNodeName() << std::endl;
115  if ( m_isLazy && !keepGoing ) {
116  node_decisions[m_nodeIndex] = decision;
117  // std::cout << "STOPPING ITERATION OVER (UPDATING) DECISION NODE CHILDREN: " << m_nodeName << std::endl;
118  break;
119  // return;
120  }
121 
122  // modified
123  int& res = node_decisions[daughter->getNodeIndex()];
124  if ( -1 == res ) {
125  hasUndecidedChild = true;
126  if ( typeid( *daughter ) != typeid( concurrency::DecisionNode ) ) {
127  auto algod = (AlgorithmNode*)daughter;
128  algod->promoteToControlReadyState( slotNum, states, node_decisions );
129  bool result = algod->promoteToDataReadyState( slotNum, requestor );
130  if ( result ) keepGoing = false;
131  } else {
132  daughter->updateDecision( slotNum, states, node_decisions, requestor );
133  }
134 
135  // "and"-mode (once first result false, the overall decision is false)
136  } else if ( false == m_modeOR && res == 0 ) {
137  decision = 0;
138  keepGoing = false;
139  // "or"-mode (once first result true, the overall decision is true)
140  } else if ( true == m_modeOR && res == 1 ) {
141  decision = 1;
142  keepGoing = false;
143  }
144  }
145 
146  // what to do with yet undefined answers depends on whether AND or OR mode applies
147  if ( !hasUndecidedChild && -1 == decision ) {
148  // OR mode: all results known, and none true -> reject
149  if ( true == m_modeOR ) {
150  decision = 0;
151  // AND mode: all results known, and no false -> accept
152  } else {
153  decision = 1;
154  }
155  }
156 
157  // in all other cases I stay with previous decisions
158  node_decisions[m_nodeIndex] = decision;
159 
160  // propagate decision upwards through the decision graph
161  if ( -1 != decision )
162  for ( auto p : m_parents ) p->updateDecision( slotNum, states, node_decisions, requestor );
163 
164  // std::cout << "++++++++++++++++++++END(UPDATING)++++++++++++++++++++" << std::endl;
165  }
166 
167  //---------------------------------------------------------------------------
169  std::vector<int>& node_decisions ) const
170  {
171  // std::cout << "REACHED DECISNODE " << m_nodeName << std::endl;
172  if ( -1 != node_decisions[m_nodeIndex] ) {
173  return true;
174  }
175 
176  for ( auto daughter : m_children ) {
177  auto res = node_decisions[daughter->getNodeIndex()];
178  if ( -1 == res ) {
179  daughter->promoteToControlReadyState( slotNum, states, node_decisions );
180  if ( m_isLazy ) return true;
181  } else if ( m_isLazy ) {
182  if ( ( false == m_modeOR && res == 0 ) || ( true == m_modeOR && res == 1 ) ) return true;
183  }
184  }
185 
186  return true;
187  }
188 
189  //---------------------------------------------------------------------------
191  {
192 
193  if ( visitor.visitEnter( *this ) ) {
194  bool result = visitor.visit( *this );
195  if ( result ) return visitor.visitLeave( *this );
196 
197  for ( auto child : m_children ) {
198  bool keepGoing = child->accept( visitor );
199  if ( m_isLazy && !keepGoing ) return false; // break;
200  }
201  }
202 
203  return true;
204  }
205 
206  //---------------------------------------------------------------------------
208  {
209 
210  for ( auto node : m_outputs ) {
211  delete node;
212  }
213  }
214 
215  //---------------------------------------------------------------------------
217  {
218 
219  m_algoIndex = algname_index_map.at( m_algoName );
220  }
221 
222  //---------------------------------------------------------------------------
223  bool AlgorithmNode::promoteToControlReadyState( const int& /*slotNum*/, AlgsExecutionStates& states,
224  std::vector<int>& /*node_decisions*/ ) const
225  {
226 
227  auto& state = states[m_algoIndex];
228  bool result = false;
229 
230  if ( State::INITIAL == state ) {
231  states.updateState( m_algoIndex, State::CONTROLREADY ).ignore();
232  // std::cout << "----> UPDATING ALGORITHM to CONTROLREADY: " << m_algoName << std::endl;
233  result = true;
234  } else if ( State::CONTROLREADY == state ) {
235  result = true;
236  }
237 
238  return result;
239  }
240 
241  //---------------------------------------------------------------------------
242  bool AlgorithmNode::promoteToDataReadyState( const int& slotNum, const AlgorithmNode* /*requestor*/ ) const
243  {
244 
245  auto& states = m_graph->getAlgoStates( slotNum );
246  auto& state = states[m_algoIndex];
247  bool result = false;
248 
249  if ( State::CONTROLREADY == state ) {
250  if ( dataDependenciesSatisfied( slotNum ) ) {
251  // std::cout << "----> UPDATING ALGORITHM to DATAREADY: " << m_algoName << std::endl;
252  states.updateState( m_algoIndex, State::DATAREADY ).ignore();
253  result = true;
254 
255  // m_graph->addEdgeToExecutionPlan(requestor, this);
256 
257  /*
258  auto xtime = std::chrono::high_resolution_clock::now();
259  std::stringstream s;
260  s << getNodeName() << ", "
261  << (xtime-m_graph->getInitTime()).count() << "\n";
262  std::ofstream myfile;
263  myfile.open("DRTiming.csv", std::ios::app);
264  myfile << s.str();
265  myfile.close();
266  */
267  }
268  } else if ( State::DATAREADY == state ) {
269  result = true;
270  } else if ( State::SCHEDULED == state ) {
271  result = true;
272  }
273 
274  return result;
275  }
276 
277  //---------------------------------------------------------------------------
278  bool AlgorithmNode::dataDependenciesSatisfied( const int& slotNum ) const
279  {
280 
281  bool result = true; //return true if an algorithm has no data inputs
282  auto& states = m_graph->getAlgoStates( slotNum );
283 
284  for ( auto dataNode : m_inputs ) {
285  // return false if the input has no producers at all (normally this case must be
286  // forbidden, and must be invalidated at configuration time)
287  result = false;
288  for ( auto algoNode : dataNode->getProducers() )
289  if ( State::EVTACCEPTED == states[algoNode->getAlgoIndex()] ) {
290  result = true;
291  break; // skip checking other producers if one was found to be executed
292  }
293 
294  if (!result) break; // skip checking other inputs if this input was not produced yet
295  }
296 
297  return result;
298  }
299 
300  //---------------------------------------------------------------------------
302  {
303 
304  bool result = true;
305  for ( auto dataNode : m_inputs ) {
306 
307  result = false;
308  for ( auto algoNode : dataNode->getProducers() )
309  if ( State::EVTACCEPTED == states[algoNode->getAlgoIndex()] ) {
310  result = true;
311  break;
312  }
313 
314  if ( !result ) break;
315  }
316 
317  return result;
318  }
319 
320  //---------------------------------------------------------------------------
322  const std::vector<int>& node_decisions, const unsigned int& recursionLevel ) const
323  {
324  output << std::string( recursionLevel, ' ' ) << m_nodeName << " (" << m_nodeIndex << ")"
325  << ", w/ decision: " << stateToString( node_decisions[m_nodeIndex] ) << "(" << node_decisions[m_nodeIndex]
326  << ")"
327  << ", in state: " << AlgsExecutionStates::stateNames[states[m_algoIndex]] << std::endl;
328  }
329 
330  //---------------------------------------------------------------------------
332  {
333  // check whether we already had a result earlier
334  // if (-1 != node_decisions[m_nodeIndex] ) { return node_decisions[m_nodeIndex]; }
335  // since we reached this point in the control flow, this algorithm is supposed to run
336  // if it hasn't already
337  const State& state = states[m_algoIndex];
338  unsigned int decision = -1;
339  if ( State::INITIAL == state ) {
340  states.updateState( m_algoIndex, State::CONTROLREADY ).ignore();
341  }
342  // now derive the proper result to pass back
343  if ( true == m_allPass ) {
344  decision = 1;
345  } else if ( State::EVTACCEPTED == state ) {
346  decision = !m_inverted;
347  } else if ( State::EVTREJECTED == state ) {
348  decision = m_inverted;
349  } else {
350  decision = -1; // result not known yet
351  }
352  node_decisions[m_nodeIndex] = decision;
353  return decision;
354  }
355 
356  //---------------------------------------------------------------------------
357  void AlgorithmNode::updateDecision( const int& slotNum, AlgsExecutionStates& states, std::vector<int>& node_decisions,
358  const AlgorithmNode* requestor ) const
359  {
360 
361  const State& state = states[m_algoIndex];
362  int decision = -1;
363  requestor = this;
364 
365  // now derive the proper result to pass back
366  if ( true == m_allPass ) {
367  decision = 1;
368  } else if ( State::EVTACCEPTED == state ) {
369  decision = !m_inverted;
370  } else if ( State::EVTREJECTED == state ) {
371  decision = m_inverted;
372  } else {
373  decision = -1; // result not known yet
374  }
375 
376  node_decisions[m_nodeIndex] = decision;
377 
378  if ( -1 != decision ) {
379  for ( auto output : m_outputs )
380  for ( auto consumer : output->getConsumers() ) consumer->promoteToDataReadyState( slotNum, requestor );
381 
382  for ( auto p : m_parents ) p->updateDecision( slotNum, states, node_decisions, requestor );
383  }
384  }
385 
386  //---------------------------------------------------------------------------
388  {
389 
390  if ( visitor.visitEnter( *this ) ) {
391  bool result = visitor.visit( *this );
392  if ( result ) return false;
393  }
394 
395  return true;
396  }
397 
398  //---------------------------------------------------------------------------
400  {
401 
402  if ( std::find( m_parents.begin(), m_parents.end(), node ) == m_parents.end() ) m_parents.push_back( node );
403  }
404 
405  //---------------------------------------------------------------------------
407  {
408 
409  if ( std::find( m_outputs.begin(), m_outputs.end(), node ) == m_outputs.end() ) m_outputs.push_back( node );
410  }
411 
412  //---------------------------------------------------------------------------
414  {
415 
416  if ( std::find( m_inputs.begin(), m_inputs.end(), node ) == m_inputs.end() ) m_inputs.push_back( node );
417  }
418 
419  //---------------------------------------------------------------------------
421  {
422 
423  m_headNode->initialize( algname_index_map );
424  // StatusCode sc = buildDataDependenciesRealm();
425  StatusCode sc = buildAugmentedDataDependenciesRealm();
426 
427  if ( !sc.isSuccess() ) error() << "Could not build the data dependency realm." << endmsg;
428 
429  return sc;
430  }
431 
432  //---------------------------------------------------------------------------
434  std::vector<EventSlot>& eventSlots )
435  {
436 
437  m_eventSlots = &eventSlots;
438  m_headNode->initialize( algname_index_map );
439  // StatusCode sc = buildDataDependenciesRealm();
440  StatusCode sc = buildAugmentedDataDependenciesRealm();
441 
442  if ( !sc.isSuccess() ) error() << "Could not build the data dependency realm." << endmsg;
443 
444  if (msgLevel(MSG::DEBUG))
445  debug() << dumpDataFlow() << endmsg;
446 
447  return sc;
448  }
449 
450  //---------------------------------------------------------------------------
452  {
453 
454  const std::string& algoName = algo->name();
455 
456  DataObjIDColl inputObjs, outputObjs;
457  DHHVisitor avis( inputObjs, outputObjs );
458  algo->acceptDHVisitor( &avis );
459 
460  m_algoNameToAlgoInputsMap[algoName] = inputObjs;
461  m_algoNameToAlgoOutputsMap[algoName] = outputObjs;
462 
463  if (msgLevel(MSG::DEBUG)) {
464  debug() << "Inputs of " << algoName << ": ";
465  for (auto tag : inputObjs)
466  debug() << tag << " | ";
467  debug() << endmsg;
468 
469  debug() << "Outputs of " << algoName << ": ";
470  for (auto tag : outputObjs)
471  debug() << tag << " | ";
472  debug() << endmsg;
473  }
474  }
475 
476  //---------------------------------------------------------------------------
478  {
479 
480  StatusCode global_sc( StatusCode::SUCCESS );
481 
482  for ( auto algo : m_algoNameToAlgoNodeMap ) {
483 
484  auto targetNode = m_algoNameToAlgoNodeMap[algo.first];
485 
486  // Find producers for all the inputs of the target node
487  auto& targetInCollection = m_algoNameToAlgoInputsMap[algo.first];
488  for (auto inputTag : targetInCollection) {
489  for (auto producer : m_algoNameToAlgoOutputsMap) {
490  auto& outputs = m_algoNameToAlgoOutputsMap[producer.first];
491  for (auto outputTag : outputs) {
492  if (inputTag == outputTag) {
493  auto& known_producers = targetNode->getSupplierNodes();
494  auto valid_producer = m_algoNameToAlgoNodeMap[producer.first];
495  auto& known_consumers = valid_producer->getConsumerNodes();
496  if ( std::find( known_producers.begin(), known_producers.end(), valid_producer ) ==
497  known_producers.end() )
498  targetNode->addSupplierNode( valid_producer );
499  if ( std::find( known_consumers.begin(), known_consumers.end(), targetNode ) == known_consumers.end() )
500  valid_producer->addConsumerNode( targetNode );
501  }
502  }
503  }
504  }
505 
506  // Find consumers for all the outputs of the target node
507  auto& targetOutCollection = m_algoNameToAlgoOutputsMap[algo.first];
508  for (auto outputTag : targetOutCollection) {
509  for (auto consumer : m_algoNameToAlgoInputsMap) {
510  auto& inputs = m_algoNameToAlgoInputsMap[consumer.first];
511  for (auto inputTag : inputs) {
512  if (inputTag == outputTag) {
513  auto& known_consumers = targetNode->getConsumerNodes();
514  auto valid_consumer = m_algoNameToAlgoNodeMap[consumer.first];
515  auto& known_producers = valid_consumer->getSupplierNodes();
516  if ( std::find( known_producers.begin(), known_producers.end(), targetNode ) == known_producers.end() )
517  valid_consumer->addSupplierNode( targetNode );
518  if ( std::find( known_consumers.begin(), known_consumers.end(), valid_consumer ) ==
519  known_consumers.end() )
520  targetNode->addConsumerNode( valid_consumer );
521  }
522  }
523  }
524  }
525  }
526  return global_sc;
527  }
528 
529  //---------------------------------------------------------------------------
531  {
532 
533  StatusCode global_sc( StatusCode::SUCCESS, true );
534 
535  // Create the DataObjects (DO) realm (represented by DataNodes in the graph),
536  // connected to DO producers (AlgorithmNodes)
537  for (auto algo : m_algoNameToAlgoNodeMap) {
538 
539  auto& outCollection = m_algoNameToAlgoOutputsMap[algo.first];
540  for (auto outputTag : outCollection) {
541  const auto sc = addDataNode(outputTag);
542  if (!sc.isSuccess()) {
543  error() << "Extra producer (" << algo.first << ") for DataObject @ "
544  << outputTag
545  << " has been detected: this is not allowed." << endmsg;
546  global_sc = sc;
547  }
548  auto dataNode = getDataNode(outputTag);
549  dataNode->addProducerNode(algo.second);
550  algo.second->addOutputDataNode(dataNode);
551  }
552  }
553 
554  // Connect previously created DO realm to DO consumers (AlgorithmNodes)
555  for ( auto algo : m_algoNameToAlgoNodeMap ) {
556  auto& inCollection = m_algoNameToAlgoInputsMap[algo.first];
557  for (auto inputTag : inCollection) {
558  DataNode* dataNode = nullptr;
559  auto primaryPath = inputTag;
560  auto itP = m_dataPathToDataNodeMap.find(primaryPath);
561  if (itP != m_dataPathToDataNodeMap.end()) {
562  dataNode = getDataNode(primaryPath);
563  //if (!inCollection[inputTag].alternativeDataProductNames().empty())
564  // warning() << "Dropping all alternative data dependencies in the graph, but '" << primaryPath
565  // << "', for algorithm " << algo.first << endmsg;
566  //} else {
567  // for (auto alterPath : inCollection[inputTag].alternativeDataProductNames()) {
568  // auto itAP = m_dataPathToDataNodeMap.find(alterPath);
569  // if (itAP != m_dataPathToDataNodeMap.end()) {
570  // dataNode = getDataNode(alterPath);
571  // warning() << "Dropping all alternative data dependencies in the graph, but '" << alterPath
572  // << "', for algorithm " << algo.first << endmsg;
573  // break;
574  // }
575  //}
576  }
577 
578  if (dataNode) {
579  dataNode->addConsumerNode(algo.second);
580  algo.second->addInputDataNode(dataNode);
581  }
582  }
583  }
584 
585  return global_sc;
586  }
587 
588  //---------------------------------------------------------------------------
589  StatusCode ExecutionFlowGraph::addAlgorithmNode( Algorithm* algo, const std::string& parentName, bool inverted,
590  bool allPass )
591  {
592 
594 
595  auto& algoName = algo->name();
596 
597  auto itP = m_decisionNameToDecisionHubMap.find( parentName );
598  concurrency::DecisionNode* parentNode;
599  if ( itP != m_decisionNameToDecisionHubMap.end() ) {
600  parentNode = itP->second;
601  auto itA = m_algoNameToAlgoNodeMap.find( algoName );
602  concurrency::AlgorithmNode* algoNode;
603  if ( itA != m_algoNameToAlgoNodeMap.end() ) {
604  algoNode = itA->second;
605  } else {
606  algoNode = new concurrency::AlgorithmNode( *this, m_nodeCounter, algoName, inverted, allPass, algo->isIOBound() );
607  ++m_nodeCounter;
608  m_algoNameToAlgoNodeMap[algoName] = algoNode;
609  if (msgLevel(MSG::DEBUG))
610  debug() << "AlgoNode " << algoName << " added @ " << algoNode << endmsg;
611  registerIODataObjects(algo);
612  }
613 
614  parentNode->addDaughterNode( algoNode );
615  algoNode->addParentNode( parentNode );
616  } else {
617  sc = StatusCode::FAILURE;
618  error() << "DecisionHubNode " << parentName << ", meant to be used as parent, is not registered in the EFG."
619  << endmsg;
620  }
621 
622  return sc;
623  }
624 
625  //---------------------------------------------------------------------------
627  {
628 
629  return m_algoNameToAlgoNodeMap.at( algoName );
630  }
631 
632  //---------------------------------------------------------------------------
634  {
635 
636  StatusCode sc;
637 
638  auto itD = m_dataPathToDataNodeMap.find( dataPath );
639  concurrency::DataNode* dataNode;
640  if ( itD != m_dataPathToDataNodeMap.end() ) {
641  dataNode = itD->second;
642  sc = StatusCode::SUCCESS;
643  } else {
644  dataNode = new concurrency::DataNode( *this, dataPath );
645  m_dataPathToDataNodeMap[dataPath] = dataNode;
646  if (msgLevel(MSG::DEBUG))
647  debug() << " DataNode for " << dataPath << " added @ " << dataNode << endmsg;
648  sc = StatusCode::SUCCESS;
649  }
650 
651  return sc;
652  }
653 
654  //---------------------------------------------------------------------------
656  {
657 
658  return m_dataPathToDataNodeMap.at( dataPath );
659  }
660 
661  //---------------------------------------------------------------------------
663  bool modeOR, bool allPass, bool isLazy )
664  {
665 
667 
668  auto& decisionHubName = decisionHubAlgo->name();
669 
670  auto itP = m_decisionNameToDecisionHubMap.find( parentName );
671  concurrency::DecisionNode* parentNode;
672  if ( itP != m_decisionNameToDecisionHubMap.end() ) {
673  parentNode = itP->second;
674  auto itA = m_decisionNameToDecisionHubMap.find( decisionHubName );
675  concurrency::DecisionNode* decisionHubNode;
676  if ( itA != m_decisionNameToDecisionHubMap.end() ) {
677  decisionHubNode = itA->second;
678  } else {
679  decisionHubNode =
680  new concurrency::DecisionNode( *this, m_nodeCounter, decisionHubName, modeOR, allPass, isLazy );
681  ++m_nodeCounter;
682  m_decisionNameToDecisionHubMap[decisionHubName] = decisionHubNode;
683  if (msgLevel(MSG::DEBUG))
684  debug() << "DecisionHubNode " << decisionHubName << " added @ " << decisionHubNode << endmsg;
685  }
686 
687  parentNode->addDaughterNode( decisionHubNode );
688  decisionHubNode->addParentNode( parentNode );
689  } else {
690  sc = StatusCode::FAILURE;
691  error() << "DecisionHubNode " << parentName << ", meant to be used as parent, is not registered in the EFG."
692  << endmsg;
693  }
694 
695  return sc;
696  }
697 
698  //---------------------------------------------------------------------------
699  void ExecutionFlowGraph::addHeadNode( const std::string& headName, bool modeOR, bool allPass, bool isLazy )
700  {
701 
702  auto itH = m_decisionNameToDecisionHubMap.find( headName );
703  if ( itH != m_decisionNameToDecisionHubMap.end() ) {
704  m_headNode = itH->second;
705  } else {
706  m_headNode = new concurrency::DecisionNode( *this, m_nodeCounter, headName, modeOR, allPass, isLazy );
707  ++m_nodeCounter;
708  m_decisionNameToDecisionHubMap[headName] = m_headNode;
709  }
710  }
711 
712  //---------------------------------------------------------------------------
714  {
715  m_headNode->updateState( algo_states, node_decisions );
716  }
717 
718  //---------------------------------------------------------------------------
719  void ExecutionFlowGraph::updateDecision( const std::string& algo_name, const int& slotNum,
720  AlgsExecutionStates& algo_states, std::vector<int>& node_decisions ) const
721  {
722  //if (msgLevel(MSG::DEBUG))
723  // debug() << "(UPDATING)Setting decision of algorithm " << algo_name << " and propagating it upwards.." << endmsg;
724  getAlgorithmNode( algo_name )->updateDecision( slotNum, algo_states, node_decisions );
725  }
726 
727  //---------------------------------------------------------------------------
729  {
730 
731  info() << "Starting ranking by data outputs .. " << endmsg;
732  for (auto& pair : m_algoNameToAlgoNodeMap) {
733  if (msgLevel(MSG::DEBUG))
734  debug() << " Ranking " << pair.first << "... " << endmsg;
735  pair.second->accept(ranker);
736  if (msgLevel(MSG::DEBUG))
737  debug() << " ... rank of " << pair.first << ": " << pair.second->getRank() << endmsg;
738  }
739  }
740 
741  //---------------------------------------------------------------------------
743  {
744 
746 
747  for (auto node : m_algoNameToAlgoInputsMap) {
748  DataObjIDColl collection = (node.second);
749  if (collection.empty())
750  result.push_back(getAlgorithmNode(node.first));
751  }
752 
753  return result;
754  }
755 
756  //---------------------------------------------------------------------------
758  {
759 
760  const char idt[] = " ";
761  std::ostringstream ost;
762 
763  ost << "\n" << idt << "====================================\n";
764  ost << idt << "Data origins and destinations:\n";
765  ost << idt << "====================================\n";
766 
767  for ( auto& pair : m_dataPathToDataNodeMap ) {
768 
769  for ( auto algoNode : pair.second->getProducers() ) ost << idt << " " << algoNode->getNodeName() << "\n";
770 
771  ost << idt << " V\n";
772  ost << idt << " o " << pair.first << "\n";
773  ost << idt << " V\n";
774 
775  for ( auto algoNode : pair.second->getConsumers() ) ost << idt << " " << algoNode->getNodeName() << "\n";
776 
777  ost << idt << "====================================\n";
778  }
779 
780  return ost.str();
781  }
782 
783  //---------------------------------------------------------------------------
784 
786  {
787  std::ofstream myfile;
788  myfile.open( "ExecutionPlan.graphml", std::ios::app );
789 
790  boost::dynamic_properties dp;
791  dp.property( "name", boost::get( &boost::AlgoNodeStruct::m_name, m_ExecPlan ) );
792  dp.property( "index", boost::get( &boost::AlgoNodeStruct::m_index, m_ExecPlan ) );
793  dp.property( "rank", boost::get( &boost::AlgoNodeStruct::m_rank, m_ExecPlan ) );
794  dp.property( "runtime", boost::get( &boost::AlgoNodeStruct::m_runtime, m_ExecPlan ) );
795 
796  boost::write_graphml( myfile, m_ExecPlan, dp );
797 
798  myfile.close();
799  }
800 
802  {
803 
804  boost::AlgoVertex source;
805  float runtime( 0. );
806  if ( u == nullptr ) {
807  auto itT = m_exec_plan_map.find( "ENTRY" );
808  if ( itT != m_exec_plan_map.end() ) {
809  source = itT->second;
810  } else {
811  source = boost::add_vertex( boost::AlgoNodeStruct( "ENTRY", -999, -999, 0 ), m_ExecPlan );
812  m_exec_plan_map["ENTRY"] = source;
813  }
814  } else {
815  auto itS = m_exec_plan_map.find( u->getNodeName() );
816  if ( itS != m_exec_plan_map.end() ) {
817  source = itS->second;
818  } else {
819  auto alg = dynamic_cast<Algorithm*>( u->getAlgorithmRepresentatives()[0] );
820  if ( alg == 0 ) {
821  fatal() << "could not convert IAlgorithm to Algorithm!" << endmsg;
822  } else {
823  try {
824  const Gaudi::Details::PropertyBase& p = alg->getProperty( "AvgRuntime" );
825  runtime = std::stof( p.toString() );
826  } catch(...) {
827  if (msgLevel(MSG::DEBUG))
828  debug() << "no AvgRuntime for " << alg->name() << endmsg;
829  runtime = 1.;
830  }
831  }
832  source = boost::add_vertex( boost::AlgoNodeStruct( u->getNodeName(), u->getAlgoIndex(), u->getRank(), runtime ),
833  m_ExecPlan );
834  m_exec_plan_map[u->getNodeName()] = source;
835  }
836  }
837 
838  boost::AlgoVertex target;
839  auto itP = m_exec_plan_map.find( v->getNodeName() );
840  if ( itP != m_exec_plan_map.end() ) {
841  target = itP->second;
842  } else {
843  auto alg = dynamic_cast<Algorithm*>( v->getAlgorithmRepresentatives()[0] );
844  if ( alg == 0 ) {
845  fatal() << "could not convert IAlgorithm to Algorithm!" << endmsg;
846  } else {
847  try {
848  const Gaudi::Details::PropertyBase& p = alg->getProperty( "AvgRuntime" );
849  runtime = std::stof( p.toString() );
850  } catch(...) {
851  if (msgLevel(MSG::DEBUG))
852  debug() << "no AvgRuntime for " << alg->name() << endmsg;
853  runtime = 1.;
854  }
855  }
856  target = boost::add_vertex( boost::AlgoNodeStruct( v->getNodeName(), v->getAlgoIndex(), v->getRank(), runtime ),
857  m_ExecPlan );
858  m_exec_plan_map[v->getNodeName()] = target;
859  }
860 
861  if (msgLevel(MSG::DEBUG))
862  debug() << "Edge added to execution plan" << endmsg;
863  boost::add_edge(source, target, m_ExecPlan);
864  }
865 
866 } // namespace
std::string stateToString(const int &stateId) const
Translation between state id and name.
bool promoteToControlReadyState(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
XXX: CF tests.
virtual bool visitEnter(DecisionNode &) const =0
const std::vector< AlgorithmNode * > getDataIndependentNodes() const
const unsigned int & getAlgoIndex() const
XXX: CF tests.
T empty(T...args)
StatusCode initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize graph.
T open(T...args)
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:725
StatusCode buildAugmentedDataDependenciesRealm()
Build data dependency realm WITH data object nodes participating.
void addDaughterNode(ControlFlowNode *node)
Add a daughter node.
StatusCode addDecisionHubNode(Algorithm *daughterAlgo, const std::string &parentName, bool modeOR, bool allPass, bool isLazy)
Add a node, which aggregates decisions of direct daughter nodes.
virtual void acceptDHVisitor(IDataHandleVisitor *) const override
Definition: Algorithm.cpp:198
bool dataDependenciesSatisfied(const int &slotNum) const
Method to check whether the Algorithm has its all data dependency satisfied.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
T stof(T...args)
T endl(T...args)
const std::vector< IAlgorithm * > & getAlgorithmRepresentatives() const
get Algorithm representatives
void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const override
XXX: CF tests.
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
A method to update algorithm node decision, and propagate it upwards.
std::string dumpDataFlow() const
Print out all data origins and destinations, as reflected in the EF graph.
ExecutionFlowGraph * m_graph
virtual std::string toString() const =0
value -> string
STL class.
DataNode * getDataNode(const DataObjID &dataPath) const
Get DataNode by DataObject path using graph index.
AlgsExecutionStates & getAlgoStates(const int &slotNum) const
void registerIODataObjects(const Algorithm *algo)
Register algorithm in the Data Dependency index.
void addInputDataNode(DataNode *node)
Associate an AlgorithmNode, which is a data consumer of this one.
bool accept(IGraphVisitor &visitor) override
T push_back(T...args)
STL class.
const float & getRank() const
Get Algorithm rank.
void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const override
XXX: CF tests.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T close(T...args)
graph_traits< ExecPlan >::vertex_descriptor AlgoVertex
void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map) override
Initialize.
virtual bool visit(DecisionNode &)=0
PropertyBase base class allowing PropertyBase* collections to be "homogeneous".
Definition: Property.h:32
StatusCode buildDataDependenciesRealm()
Build data dependency realm WITHOUT data object nodes: just interconnect algorithm nodes directly...
void addOutputDataNode(DataNode *node)
Associate an AlgorithmNode, which is a data supplier for this one.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find(T...args)
void rankAlgorithms(IGraphVisitor &ranker) const
Rank Algorithm nodes by the number of data outputs.
void addEdgeToExecutionPlan(const AlgorithmNode *u, const AlgorithmNode *v)
set cause-effect connection between two algorithms in the execution plan
StatusCode addAlgorithmNode(Algorithm *daughterAlgo, const std::string &parentName, bool inverted, bool allPass)
Add algorithm node.
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
bool isIOBound() const
Definition: Algorithm.h:489
virtual bool visitLeave(DecisionNode &) const =0
void dumpExecutionPlan()
dump to file encountered execution plan
int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
Method to set algos to CONTROLREADY, if possible.
void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const override
Print a string representing the control flow state.
void addHeadNode(const std::string &headName, bool modeOR, bool allPass, bool isLazy)
Add a node, which has no parents.
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
bool accept(IGraphVisitor &visitor) override
int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
Method to set algos to CONTROLREADY, if possible.
bool promoteToControlReadyState(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
XXX: CF tests. Method to set algos to CONTROLREADY, if possible.
const std::string & getNodeName() const
void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const override
Print a string representing the control flow state.
void ignore() const
Definition: StatusCode.h:106
~DecisionNode() override
Destructor.
State
Execution states of the algorithms.
void updateEventState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const
XXX CF tests. Is needed for older CF implementation.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map) override
Initialize.
bool promoteToDataReadyState(const int &slotNum, const AlgorithmNode *requestor=nullptr) const
static std::map< State, std::string > stateNames
StatusCode addDataNode(const DataObjID &dataPath)
Add DataNode that represents DataObject.
void addConsumerNode(AlgorithmNode *node)
Associate an AlgorithmNode, which is a data consumer of this one.
StatusCode updateState(unsigned int iAlgo, State newState)