ExecutionFlowGraph.cpp
Go to the documentation of this file.
1 #include "ExecutionFlowGraph.h"
3 
4 namespace concurrency {
5 
6  //---------------------------------------------------------------------------
7  std::string ControlFlowNode::stateToString(const int& stateId) const {
8 
9  if (0 == stateId ) return "FALSE";
10  else if (1 == stateId ) return "TRUE";
11  else return "UNDEFINED";
12  }
13 
14  //---------------------------------------------------------------------------
16 
17  for (auto node : m_children)
18  delete node;
19  }
20 
21  //---------------------------------------------------------------------------
23 
24  for (auto daughter : m_children)
25  daughter->initialize(algname_index_map);
26  }
27 
28  //---------------------------------------------------------------------------
30 
31  if (std::find(m_parents.begin(), m_parents.end(), node) == m_parents.end())
32  m_parents.push_back(node);
33  }
34 
35  //--------------------------------------------------------------------------
37 
38  if (std::find(m_children.begin(), m_children.end(), node) == m_children.end())
39  m_children.push_back(node);
40  }
41 
42  //---------------------------------------------------------------------------
44  AlgsExecutionStates& states,
45  const std::vector<int>& node_decisions,
46  const unsigned int& recursionLevel) const {
47 
48  output << std::string(recursionLevel, ' ') << m_nodeName << " (" << m_nodeIndex << ")" << ", w/ decision: "
49  << stateToString(node_decisions[m_nodeIndex]) << "(" << node_decisions[m_nodeIndex] <<")" << std::endl;
50  for (auto daughter : m_children ) {
51  daughter->printState(output,states,node_decisions,recursionLevel+2);
52  }
53  }
54 
55  //---------------------------------------------------------------------------
57  std::vector<int>& node_decisions) const {
58  // check whether we already had a result earlier
59  // if (-1 != node_decisions[m_nodeIndex] ) { return node_decisions[m_nodeIndex]; }
60  int decision = ((m_allPass && m_isLazy) ? 1 : -1);
61  bool hasUndecidedChild = false;
62  for (auto daughter : m_children){
63  if (m_isLazy && (-1 !=decision || hasUndecidedChild ) ) {
64  node_decisions[m_nodeIndex] = decision;
65  return decision;} // if lazy return once result is known already or we can't fully evaluate right now because one daugther decision is missing still
66  auto res = daughter->updateState(states, node_decisions);
67  if ( -1 == res) {hasUndecidedChild = true;}
68  else if ( false == m_modeOR && res == 0 ){decision = 0;} // "and"-mode (once first result false, the overall decision is false)
69  else if ( true == m_modeOR && res == 1){decision = 1;} // "or"-mode (once first result true, the overall decision is true)
70  }
71  // what to do with yet undefined answers depends on whether AND or OR mode applies
72  if (!hasUndecidedChild && -1 == decision ) {
73  // OR mode: all results known, and none true -> reject
74  if ( true == m_modeOR){ decision = 0; }
75  // AND mode: all results known, and no false -> accept
76  else { decision = 1; }
77  }
78  // in all other cases I stay with previous decisions
79  node_decisions[m_nodeIndex] = decision;
80  if (m_allPass) decision = 1;
81  return decision;
82  }
83 
84  //---------------------------------------------------------------------------
85  void DecisionNode::updateDecision(const int& slotNum,
86  AlgsExecutionStates& states,
87  std::vector<int>& node_decisions,
88  const AlgorithmNode* requestor) const {
89 
90  int decision = ((m_allPass && m_isLazy) ? 1 : -1);
91  bool keepGoing = true;
92  bool hasUndecidedChild = false;
93  //std::cout << "++++++++++++++++++++BEGIN(UPDATING)++++++++++++++++++++" << std::endl;
94  //std::cout << "UPDATING DAUGHTERS of DECISION NODE: " << m_nodeName << std::endl;
95 
96  for (auto daughter : m_children){
97  // if lazy return once result is known already or we can't fully evaluate
98  // right now because one daughter decision is missing still
99  //std::cout << "----UPDATING DAUGHTER: " << daughter->getNodeName() << std::endl;
100  if (m_isLazy && !keepGoing) {
101  node_decisions[m_nodeIndex] = decision;
102  //std::cout << "STOPPING ITERATION OVER (UPDATING) DECISION NODE CHILDREN: " << m_nodeName << std::endl;
103  break;
104  //return;
105  }
106 
107  // modified
108  int& res = node_decisions[daughter->getNodeIndex()];
109  if (-1 == res) {
110  hasUndecidedChild = true;
111  if (typeid(*daughter) != typeid(concurrency::DecisionNode)) {
112  auto algod = (AlgorithmNode*) daughter;
113  algod->promoteToControlReadyState(slotNum,states,node_decisions);
114  bool result = algod->promoteToDataReadyState(slotNum, requestor);
115  if (result)
116  keepGoing = false;
117  } else {
118  daughter->updateDecision(slotNum, states, node_decisions, requestor);
119  }
120 
121  // "and"-mode (once first result false, the overall decision is false)
122  } else if (false == m_modeOR && res == 0) {
123  decision = 0;
124  keepGoing = false;
125  // "or"-mode (once first result true, the overall decision is true)
126  } else if (true == m_modeOR && res == 1) {
127  decision = 1;
128  keepGoing = false;
129  }
130  }
131 
132  // what to do with yet undefined answers depends on whether AND or OR mode applies
133  if (!hasUndecidedChild && -1 == decision ) {
134  // OR mode: all results known, and none true -> reject
135  if ( true == m_modeOR) {
136  decision = 0;
137  // AND mode: all results known, and no false -> accept
138  } else {
139  decision = 1;
140  }
141  }
142 
143  // in all other cases I stay with previous decisions
144  node_decisions[m_nodeIndex] = decision;
145 
146  // propagate decision upwards through the decision graph
147  if (-1 != decision)
148  for (auto p : m_parents)
149  p->updateDecision(slotNum, states, node_decisions, requestor);
150 
151  //std::cout << "++++++++++++++++++++END(UPDATING)++++++++++++++++++++" << std::endl;
152  }
153 
154  //---------------------------------------------------------------------------
156  AlgsExecutionStates& states,
157  std::vector<int>& node_decisions) const {
158  //std::cout << "REACHED DECISNODE " << m_nodeName << std::endl;
159  if (-1 != node_decisions[m_nodeIndex]) {
160  return true;
161  }
162 
163  for (auto daughter : m_children ) {
164  auto res = node_decisions[daughter->getNodeIndex()];
165  if (-1 == res) {
166  daughter->promoteToControlReadyState(slotNum, states, node_decisions);
167  if (m_isLazy) return true;
168  } else if (m_isLazy) {
169  if ((false == m_modeOR && res == 0) || (true == m_modeOR && res == 1)) return true;
170  }
171  }
172 
173  return true;
174  }
175 
176  //---------------------------------------------------------------------------
178 
179  if (visitor.visitEnter(*this)) {
180  bool result = visitor.visit(*this);
181  if (result)
182  return visitor.visitLeave(*this);
183 
184  for (auto child : m_children) {
185  bool keepGoing = child->accept(visitor);
186  if (m_isLazy && !keepGoing)
187  return false; //break;
188  }
189  }
190 
191  return true;
192  }
193 
194  //---------------------------------------------------------------------------
196 
197  for (auto node : m_outputs) {
198  delete node;
199  }
200  }
201 
202  //---------------------------------------------------------------------------
204 
205  m_algoIndex = algname_index_map.at(m_algoName);
206  }
207 
208  //---------------------------------------------------------------------------
209  bool AlgorithmNode::promoteToControlReadyState(const int& /*slotNum*/,
210  AlgsExecutionStates& states,
211  std::vector<int>& /*node_decisions*/) const {
212 
213  auto& state = states[m_algoIndex];
214  bool result = false;
215 
216  if (State::INITIAL == state) {
217  states.updateState(m_algoIndex, State::CONTROLREADY).ignore();
218  //std::cout << "----> UPDATING ALGORITHM to CONTROLREADY: " << m_algoName << std::endl;
219  result = true;
220  } else if (State::CONTROLREADY == state) {
221  result = true;
222  }
223 
224  return result;
225  }
226 
227  //---------------------------------------------------------------------------
228  bool AlgorithmNode::promoteToDataReadyState(const int& slotNum,
229  const AlgorithmNode* /*requestor*/) const {
230 
231  auto& states = m_graph->getAlgoStates(slotNum);
232  auto& state = states[m_algoIndex];
233  bool result = false;
234 
235  if (State::CONTROLREADY == state) {
236  if (dataDependenciesSatisfied(slotNum)) {
237  //std::cout << "----> UPDATING ALGORITHM to DATAREADY: " << m_algoName << std::endl;
238  states.updateState(m_algoIndex, State::DATAREADY).ignore();
239  result = true;
240 
241  //m_graph->addEdgeToExecutionPlan(requestor, this);
242 
243  /*
244  auto xtime = std::chrono::high_resolution_clock::now();
245  std::stringstream s;
246  s << getNodeName() << ", "
247  << (xtime-m_graph->getInitTime()).count() << "\n";
248  std::ofstream myfile;
249  myfile.open("DRTiming.csv", std::ios::app);
250  myfile << s.str();
251  myfile.close();
252  */
253  }
254  } else if (State::DATAREADY == state) {
255  result = true;
256  } else if (State::SCHEDULED == state) {
257  result = true;
258  }
259 
260  return result;
261  }
262 
263  //---------------------------------------------------------------------------
264  bool AlgorithmNode::dataDependenciesSatisfied(const int& slotNum) const {
265 
266  bool result = true;
267  auto& states = m_graph->getAlgoStates(slotNum);
268 
269  for (auto dataNode : m_inputs) {
270  result = false;
271  for (auto algoNode : dataNode->getProducers())
272  if (State::EVTACCEPTED == states[algoNode->getAlgoIndex()]) {
273  result = true;
274  break;
275  }
276  if (!result) break;
277  }
278 
279  return result;
280  }
281 
282  //---------------------------------------------------------------------------
284 
285  bool result = true;
286  for (auto dataNode : m_inputs) {
287 
288  result = false;
289  for (auto algoNode : dataNode->getProducers())
290  if (State::EVTACCEPTED == states[algoNode->getAlgoIndex()]) {
291  result = true;
292  break;
293  }
294 
295  if (!result) break;
296  }
297 
298  return result;
299  }
300 
301  //---------------------------------------------------------------------------
303  AlgsExecutionStates& states,
304  const std::vector<int>& node_decisions,
305  const unsigned int& recursionLevel) const {
306  output << std::string(recursionLevel, ' ') << m_nodeName
307  << " (" << m_nodeIndex << ")" << ", w/ decision: "
308  << stateToString(node_decisions[m_nodeIndex])
309  << "(" << node_decisions[m_nodeIndex] << ")"
310  << ", in state: " << AlgsExecutionStates::stateNames[states[m_algoIndex]]
311  << std::endl;
312  }
313 
314  //---------------------------------------------------------------------------
316  std::vector<int>& node_decisions) const {
317  // check whether we already had a result earlier
318  // if (-1 != node_decisions[m_nodeIndex] ) { return node_decisions[m_nodeIndex]; }
319  // since we reached this point in the control flow, this algorithm is supposed to run
320  // if it hasn't already
321  const State& state = states[m_algoIndex];
322  unsigned int decision = -1;
323  if (State::INITIAL == state) {states.updateState(m_algoIndex, State::CONTROLREADY).ignore();}
324  // now derive the proper result to pass back
325  if (true == m_allPass) {
326  decision = 1;
327  } else if (State::EVTACCEPTED == state) {
328  decision = !m_inverted;
329  } else if (State::EVTREJECTED == state) {
330  decision = m_inverted;
331  } else {
332  decision = -1; // result not known yet
333  }
334  node_decisions[m_nodeIndex] = decision;
335  return decision;
336  }
337 
338  //---------------------------------------------------------------------------
339  void AlgorithmNode::updateDecision(const int& slotNum,
340  AlgsExecutionStates& states,
341  std::vector<int>& node_decisions,
342  const AlgorithmNode* requestor) const {
343 
344  const State& state = states[m_algoIndex];
345  int decision = -1;
346  requestor = this;
347 
348  // now derive the proper result to pass back
349  if (true == m_allPass) {
350  decision = 1;
351  } else if (State::EVTACCEPTED == state) {
352  decision = !m_inverted;
353  } else if (State::EVTREJECTED == state) {
354  decision = m_inverted;
355  } else {
356  decision = -1; // result not known yet
357  }
358 
359  node_decisions[m_nodeIndex] = decision;
360 
361  if (-1 != decision) {
362  for (auto output : m_outputs)
363  for (auto consumer : output->getConsumers())
364  consumer->promoteToDataReadyState(slotNum, requestor);
365 
366  for (auto p : m_parents)
367  p->updateDecision(slotNum, states, node_decisions, requestor);
368  }
369  }
370 
371  //---------------------------------------------------------------------------
373 
374  if (visitor.visitEnter(*this)) {
375  bool result = visitor.visit(*this);
376  if (result) return false;
377  }
378 
379  return true;
380 
381  }
382 
383  //---------------------------------------------------------------------------
385 
386  if (std::find(m_parents.begin(), m_parents.end(), node) == m_parents.end())
387  m_parents.push_back(node);
388  }
389 
390  //---------------------------------------------------------------------------
392 
393  if (std::find(m_outputs.begin(),m_outputs.end(),node) == m_outputs.end())
394  m_outputs.push_back(node);
395  }
396 
397  //---------------------------------------------------------------------------
399 
400  if (std::find(m_inputs.begin(),m_inputs.end(),node) == m_inputs.end())
401  m_inputs.push_back(node);
402  }
403 
404  //---------------------------------------------------------------------------
406 
407  m_headNode->initialize(algname_index_map);
408  //StatusCode sc = buildDataDependenciesRealm();
410 
411  if (!sc.isSuccess())
412  error() << "Could not build the data dependency realm." << endmsg;
413 
414  return sc;
415  }
416 
417  //---------------------------------------------------------------------------
419  std::vector<EventSlot>& eventSlots){
420 
421  m_eventSlots = &eventSlots;
422  m_headNode->initialize(algname_index_map);
423  //StatusCode sc = buildDataDependenciesRealm();
425 
426  if (!sc.isSuccess())
427  error() << "Could not build the data dependency realm." << endmsg;
428 
429  debug() << dumpDataFlow() << endmsg;
430 
431  return sc;
432  }
433 
434  //---------------------------------------------------------------------------
436 
437  const std::string& algoName = algo->name();
438 
439 
440  DataObjIDColl inputObjs, outputObjs;
441  DHHVisitor avis(inputObjs, outputObjs);
442  algo->acceptDHVisitor(&avis);
443 
444  m_algoNameToAlgoInputsMap[algoName] = inputObjs;
445 
446  debug() << "Inputs of " << algoName << ": ";
447  for (auto tag : inputObjs) {
448  debug() << tag << " | ";
449  }
450  debug() << endmsg;
451 
452  m_algoNameToAlgoOutputsMap[algoName] = outputObjs;
453 
454  debug() << "Outputs of " << algoName << ": ";
455  for (auto tag : outputObjs) {
456  debug() << tag << " | ";
457  }
458  debug() << endmsg;
459  }
460 
461  //---------------------------------------------------------------------------
463 
464  StatusCode global_sc(StatusCode::SUCCESS);
465 
466  for (auto algo : m_algoNameToAlgoNodeMap) {
467 
468  auto targetNode = m_algoNameToAlgoNodeMap[algo.first];
469 
470  // Find producers for all the inputs of the target node
471  auto& targetInCollection = m_algoNameToAlgoInputsMap[algo.first];
472  for (auto inputTag : targetInCollection) {
473  // auto& input2Match = targetInCollection[inputTag].dataProductName();
474  for (auto producer : m_algoNameToAlgoOutputsMap) {
475  auto& outputs = m_algoNameToAlgoOutputsMap[producer.first];
476  for (auto outputTag : outputs) {
477  // if (outputs[outputTag].isValid() && outputs[outputTag].dataProductName() == input2Match) {
478  if (inputTag == outputTag) {
479  auto& known_producers = targetNode->getSupplierNodes();
480  auto valid_producer = m_algoNameToAlgoNodeMap[producer.first];
481  auto& known_consumers = valid_producer->getConsumerNodes();
482  if (std::find(known_producers.begin(),known_producers.end(),valid_producer) == known_producers.end())
483  targetNode->addSupplierNode(valid_producer);
484  if (std::find(known_consumers.begin(),known_consumers.end(),targetNode) == known_consumers.end())
485  valid_producer->addConsumerNode(targetNode);
486  }
487  }
488  }
489  }
490 
491  // Find consumers for all the outputs of the target node
492  auto& targetOutCollection = m_algoNameToAlgoOutputsMap[algo.first];
493  for (auto outputTag : targetOutCollection) {
494  // auto& output2Match = targetOutCollection[outputTag].dataProductName();
495  for (auto consumer : m_algoNameToAlgoInputsMap) {
496  auto& inputs = m_algoNameToAlgoInputsMap[consumer.first];
497  for (auto inputTag : inputs) {
498  // if (inputs[inputTag].isValid() && inputs[inputTag].dataProductName() == output2Match) {
499  if (inputTag == outputTag) {
500  auto& known_consumers = targetNode->getConsumerNodes();
501  auto valid_consumer = m_algoNameToAlgoNodeMap[consumer.first];
502  auto& known_producers = valid_consumer->getSupplierNodes();
503  if (std::find(known_producers.begin(),known_producers.end(),targetNode) == known_producers.end())
504  valid_consumer->addSupplierNode(targetNode);
505  if (std::find(known_consumers.begin(),known_consumers.end(),valid_consumer) == known_consumers.end())
506  targetNode->addConsumerNode(valid_consumer);
507  }
508  }
509  }
510  }
511 
512  }
513  return global_sc;
514  }
515 
516  //---------------------------------------------------------------------------
518 
519  StatusCode global_sc(StatusCode::SUCCESS);
520 
521  // Create the DataObjects (DO) realm (represented by DataNodes in the graph), connected to DO producers (AlgorithmNodes)
522  for (auto algo : m_algoNameToAlgoNodeMap) {
523 
524  StatusCode sc;
525  auto& outCollection = m_algoNameToAlgoOutputsMap[algo.first];
526  for (auto outputTag : outCollection) {
527  // if (outCollection[outputTag].isValid()) {
528  // auto& output = outCollection[outputTag].dataProductName();
529  sc = addDataNode(outputTag);
530  if (!sc.isSuccess()) {
531  error() << "Extra producer (" << algo.first << ") for DataObject @ "
532  << outputTag
533  << " has been detected: this is not allowed." << endmsg;
534  global_sc = StatusCode::FAILURE;
535  }
536  auto dataNode = getDataNode(outputTag);
537  dataNode->addProducerNode(algo.second);
538  algo.second->addOutputDataNode(dataNode);
539  // }
540  }
541  }
542 
543  // Connect previously created DO realm to DO consumers (AlgorithmNodes)
544  for (auto algo : m_algoNameToAlgoNodeMap) {
545  auto& inCollection = m_algoNameToAlgoInputsMap[algo.first];
546  for (auto inputTag : inCollection) {
547  // if (inCollection[inputTag].isValid()) {
548  DataNode* dataNode = nullptr;
549  // auto& primaryPath = inCollection[inputTag].dataProductName();
550  auto primaryPath = inputTag;
551  auto itP = m_dataPathToDataNodeMap.find(primaryPath);
552  if (itP != m_dataPathToDataNodeMap.end()) {
553  dataNode = getDataNode(primaryPath);
554  // if (!inCollection[inputTag].alternativeDataProductNames().empty())
555  // warning() << "Dropping all alternative data dependencies in the graph, but '" << primaryPath
556  // << "', for algorithm " << algo.first << endmsg;
557  // } else {
558  // for (auto alterPath : inCollection[inputTag].alternativeDataProductNames()) {
559  // auto itAP = m_dataPathToDataNodeMap.find(alterPath);
560  // if (itAP != m_dataPathToDataNodeMap.end()) {
561  // dataNode = getDataNode(alterPath);
562  // warning() << "Dropping all alternative data dependencies in the graph, but '" << alterPath
563  // << "', for algorithm " << algo.first << endmsg;
564  // break;
565  // }
566  // }
567  }
568  if (dataNode) {
569  dataNode->addConsumerNode(algo.second);
570  algo.second->addInputDataNode(dataNode);
571  }
572 
573  // }
574  }
575  }
576 
577  return global_sc;
578  }
579 
580  //---------------------------------------------------------------------------
581  StatusCode ExecutionFlowGraph::addAlgorithmNode(Algorithm* algo, const std::string& parentName, bool inverted, bool allPass) {
582 
584 
585  auto& algoName = algo->name();
586 
587  auto itP = m_decisionNameToDecisionHubMap.find(parentName);
588  concurrency::DecisionNode* parentNode;
589  if ( itP != m_decisionNameToDecisionHubMap.end()) {
590  parentNode = itP->second;
591  auto itA = m_algoNameToAlgoNodeMap.find(algoName);
592  concurrency::AlgorithmNode* algoNode;
593  if ( itA != m_algoNameToAlgoNodeMap.end()) {
594  algoNode = itA->second;
595  } else {
596  algoNode = new concurrency::AlgorithmNode(*this,m_nodeCounter,algoName,inverted,allPass);
597  ++m_nodeCounter;
598  m_algoNameToAlgoNodeMap[algoName] = algoNode;
599  debug() << "AlgoNode " << algoName << " added @ " << algoNode << endmsg;
600  registerIODataObjects(algo);
601  }
602 
603  parentNode->addDaughterNode(algoNode);
604  algoNode->addParentNode(parentNode);
605  } else {
606  sc = StatusCode::FAILURE;
607  error() << "DecisionHubNode " << parentName << ", meant to be used as parent, is not registered in the EFG." << endmsg;
608  }
609 
610  return sc;
611  }
612 
613  //---------------------------------------------------------------------------
615 
616  return m_algoNameToAlgoNodeMap.at(algoName);
617  }
618 
619  //---------------------------------------------------------------------------
621 
622  StatusCode sc;
623 
624  auto itD = m_dataPathToDataNodeMap.find(dataPath);
625  concurrency::DataNode* dataNode;
626  if ( itD != m_dataPathToDataNodeMap.end()) {
627  dataNode = itD->second;
628  //sc = StatusCode::FAILURE;
629  sc = StatusCode::SUCCESS;
630  } else {
631  dataNode = new concurrency::DataNode(*this,dataPath);
632  m_dataPathToDataNodeMap[dataPath] = dataNode;
633  debug() << " DataNode for " << dataPath << " added @ " << dataNode << endmsg;
634  sc = StatusCode::SUCCESS;
635  }
636 
637  return sc;
638  }
639 
640  //---------------------------------------------------------------------------
642 
643  return m_dataPathToDataNodeMap.at(dataPath);
644  }
645 
646  //---------------------------------------------------------------------------
647  StatusCode ExecutionFlowGraph::addDecisionHubNode(Algorithm* decisionHubAlgo, const std::string& parentName, bool modeOR, bool allPass, bool isLazy) {
648 
650 
651  auto& decisionHubName = decisionHubAlgo->name();
652 
653  auto itP = m_decisionNameToDecisionHubMap.find(parentName);
654  concurrency::DecisionNode* parentNode;
655  if ( itP != m_decisionNameToDecisionHubMap.end()) {
656  parentNode = itP->second;
657  auto itA = m_decisionNameToDecisionHubMap.find(decisionHubName);
658  concurrency::DecisionNode* decisionHubNode;
659  if ( itA != m_decisionNameToDecisionHubMap.end()) {
660  decisionHubNode = itA->second;
661  } else {
662  decisionHubNode = new concurrency::DecisionNode(*this,m_nodeCounter,decisionHubName,modeOR,allPass,isLazy);
663  ++m_nodeCounter;
664  m_decisionNameToDecisionHubMap[decisionHubName] = decisionHubNode;
665  debug() << "DecisionHubNode " << decisionHubName << " added @ " << decisionHubNode << endmsg;
666  }
667 
668  parentNode->addDaughterNode(decisionHubNode);
669  decisionHubNode->addParentNode(parentNode);
670  } else {
671  sc = StatusCode::FAILURE;
672  error() << "DecisionHubNode " << parentName << ", meant to be used as parent, is not registered in the EFG." << endmsg;
673  }
674 
675  return sc;
676  }
677 
678  //---------------------------------------------------------------------------
679  void ExecutionFlowGraph::addHeadNode(const std::string& headName, bool modeOR, bool allPass, bool isLazy) {
680 
681  auto itH = m_decisionNameToDecisionHubMap.find(headName);
682  if ( itH != m_decisionNameToDecisionHubMap.end()) {
683  m_headNode = itH->second;
684  } else {
685  m_headNode = new concurrency::DecisionNode(*this,m_nodeCounter,headName,modeOR,allPass,isLazy);
686  ++m_nodeCounter;
688  }
689 
690  }
691 
692  //---------------------------------------------------------------------------
694  std::vector<int>& node_decisions) const {
695  m_headNode->updateState(algo_states, node_decisions);
696  }
697 
698  //---------------------------------------------------------------------------
700  const int& slotNum,
701  AlgsExecutionStates& algo_states,
702  std::vector<int>& node_decisions) const {
703  //debug() << "(UPDATING)Setting decision of algorithm " << algo_name << " and propagating it upwards.." << endmsg;
704  getAlgorithmNode(algo_name)->updateDecision(slotNum, algo_states, node_decisions);
705  }
706 
707  //---------------------------------------------------------------------------
709 
710  info() << "Starting ranking by data outputs .. " << endmsg;
711  for (auto& pair : m_algoNameToAlgoNodeMap) {
712  pair.second->accept(ranker);
713  debug() << " Rank of " << pair.first << ": " << pair.second->getRank() << endmsg;
714  }
715  }
716 
717  //---------------------------------------------------------------------------
719 
721 
722  for (auto node : m_algoNameToAlgoInputsMap) {
723  DataObjIDColl collection = (node.second);
724  for (auto tag : collection)
725  // if (collection[tag].isValid()) {
726  result.push_back(getAlgorithmNode(node.first));
727  break;
728  // }
729  }
730 
731  return result;
732  }
733 
734  //---------------------------------------------------------------------------
736 
737  const char idt[] = " ";
738  std::ostringstream ost;
739 
740 
741  ost << "\n" << idt << "====================================\n";
742  ost << idt << "Data origins and destinations:\n";
743  ost << idt << "====================================\n";
744 
745  for (auto& pair : m_dataPathToDataNodeMap) {
746 
747  for (auto algoNode : pair.second->getProducers())
748  ost << idt << " " << algoNode->getNodeName() << "\n";
749 
750  ost << idt << " V\n";
751  ost << idt << " o " << pair.first << "\n";
752  ost << idt << " V\n";
753 
754  for (auto algoNode : pair.second->getConsumers())
755  ost << idt << " " << algoNode->getNodeName() << "\n";
756 
757  ost << idt << "====================================\n";
758  }
759 
760  return ost.str();
761  }
762 
763  //---------------------------------------------------------------------------
764 
766  std::ofstream myfile;
767  myfile.open("ExecutionPlan.graphml", std::ios::app);
768 
769  boost::dynamic_properties dp;
770  dp.property("name", boost::get(&boost::AlgoNodeStruct::m_name, m_ExecPlan));
771  dp.property("index", boost::get(&boost::AlgoNodeStruct::m_index, m_ExecPlan));
772  dp.property("rank", boost::get(&boost::AlgoNodeStruct::m_rank, m_ExecPlan));
773  dp.property("runtime", boost::get(&boost::AlgoNodeStruct::m_runtime, m_ExecPlan));
774 
775  boost::write_graphml(myfile, m_ExecPlan, dp);
776 
777  myfile.close();
778  }
779 
781 
782  boost::AlgoVertex source;
783  float runtime(0.);
784  if (u == nullptr) {
785  auto itT = m_exec_plan_map.find("ENTRY");
786  if ( itT != m_exec_plan_map.end()) {
787  source = itT->second;
788  } else {
789  source = boost::add_vertex(boost::AlgoNodeStruct("ENTRY",-999,-999, 0),
790  m_ExecPlan);
791  m_exec_plan_map["ENTRY"] = source;
792  }
793  } else {
794  auto itS = m_exec_plan_map.find(u->getNodeName());
795  if ( itS != m_exec_plan_map.end()) {
796  source = itS->second;
797  } else {
798  auto alg = dynamic_cast<Algorithm*> ( u->getAlgorithmRepresentatives()[0] );
799  if (alg == 0) {
800  fatal() << "could not convert IAlgorithm to Algorithm!" << endmsg;
801  } else {
802  try {
803  const Property& p = alg->getProperty( "AvgRuntime" );
804  runtime = std::stof( p.toString() );
805  } catch(...) {
806  debug() << "no AvgRuntime for " << alg->name() << endmsg;
807  runtime = 1.;
808  }
809  }
810  source = boost::add_vertex(boost::AlgoNodeStruct(u->getNodeName(),
811  u->getAlgoIndex(),
812  u->getRank(),runtime),
813  m_ExecPlan);
814  m_exec_plan_map[u->getNodeName()] = source;
815  }
816  }
817 
818  boost::AlgoVertex target;
819  auto itP = m_exec_plan_map.find(v->getNodeName());
820  if ( itP != m_exec_plan_map.end()) {
821  target = itP->second;
822  } else {
823  auto alg = dynamic_cast<Algorithm*> ( v->getAlgorithmRepresentatives()[0] );
824  if (alg == 0) {
825  fatal() << "could not convert IAlgorithm to Algorithm!" << endmsg;
826  } else {
827  try {
828  const Property& p = alg->getProperty( "AvgRuntime" );
829  runtime = std::stof( p.toString() );
830  } catch(...) {
831  debug() << "no AvgRuntime for " << alg->name() << endmsg;
832  runtime = 1.;
833  }
834  }
835  target = boost::add_vertex(boost::AlgoNodeStruct(v->getNodeName(),
836  v->getAlgoIndex(),
837  v->getRank(),runtime),
838  m_ExecPlan);
839  m_exec_plan_map[v->getNodeName()] = target;
840  }
841 
842  debug() << "Edge added to execution plan" << endmsg;
843  boost::add_edge(source, target, m_ExecPlan);
844  }
845 
846 } // namespace
bool m_isLazy
Whether to evaluate lazily - i.e. whether to stop once result known.
std::string stateToString(const int &stateId) const
Translation between state id and name.
std::vector< DataNode * > m_outputs
Vectors, used in augmented data dependencies realm Outputs of the algorithm, represented as DataNode'...
const std::vector< AlgorithmNode * > getDataIndependentNodes() const
const unsigned int & getAlgoIndex() const
XXX: CF tests.
virtual bool promoteToControlReadyState(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
XXX: CF tests.
virtual bool visitLeave(DecisionNode &) const =0
std::vector< DecisionNode * > m_parents
XXX: CF tests. All direct parent nodes in the tree.
StatusCode initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize graph.
unsigned int m_algoIndex
The index of the algorithm.
unsigned int m_nodeCounter
Total number of nodes in the graph.
T open(T...args)
boost::ExecPlan m_ExecPlan
temporary items to experiment with execution planning
StatusCode buildAugmentedDataDependenciesRealm()
Build data dependency realm WITH data object nodes participating.
void addDaughterNode(ControlFlowNode *node)
Add a daughter node.
StatusCode addDecisionHubNode(Algorithm *daughterAlgo, const std::string &parentName, bool modeOR, bool allPass, bool isLazy)
Add a node, which aggregates decisions of direct daughter nodes.
virtual std::string toString() const =0
value -> string
virtual int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const
Method to set algos to CONTROLREADY, if possible.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
virtual bool accept(IGraphVisitor &visitor)
std::string m_algoName
The name of the algorithm.
bool dataDependenciesSatisfied(const int &slotNum) const
Method to check whether the Algorithm has its all data dependency satisfied.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
std::vector< ControlFlowNode * > m_children
All direct daughter nodes in the tree.
DataNodesMap m_dataPathToDataNodeMap
Index: map of data path to DataNode.
T stof(T...args)
T endl(T...args)
const std::vector< IAlgorithm * > & getAlgorithmRepresentatives() const
get Algorithm representatives
std::vector< DecisionNode * > m_parents
XXX: CF tests.
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
A method to update algorithm node decision, and propagate it upwards.
std::map< std::string, boost::AlgoVertex > m_exec_plan_map
bool m_inverted
Whether the selection result is negated or not.
bool m_allPass
Whether always passing regardless of daughter results.
std::string dumpDataFlow() const
Print out all data origins and destinations, as reflected in the EF graph.
ExecutionFlowGraph * m_graph
virtual void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize.
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm's name to AlgorithmNode.
STL class.
DataNode * getDataNode(const DataObjID &dataPath) const
Get DataNode by DataObject path using graph index.
AlgsExecutionStates & getAlgoStates(const int &slotNum) const
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:820
void registerIODataObjects(const Algorithm *algo)
Register algorithm in the Data Dependency index.
bool m_modeOR
Whether acting as "and" (false) or "or" node (true)
void addInputDataNode(DataNode *node)
Associate an AlgorithmNode, which is a data consumer of this one.
T push_back(T...args)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
STL class.
const float & getRank() const
Get Algorithm rank.
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
virtual bool accept(IGraphVisitor &visitor)
T close(T...args)
graph_traits< ExecPlan >::vertex_descriptor AlgoVertex
virtual void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print a string representing the control flow state.
virtual bool visit(DecisionNode &)=0
virtual ~DecisionNode()
Destructor.
StatusCode buildDataDependenciesRealm()
Build data dependency realm WITHOUT data object nodes: just interconnect algorithm nodes directly...
DecisionHubsMap m_decisionNameToDecisionHubMap
Index: map of decision's name to DecisionHub.
virtual void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize.
void addOutputDataNode(DataNode *node)
Associate an AlgorithmNode, which is a data supplier for this one.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:74
T find(T...args)
void rankAlgorithms(IGraphVisitor &ranker) const
Rank Algorithm nodes by the number of data outputs.
virtual void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const
XXX: CF tests.
Property base class allowing Property* collections to be "homogeneous".
Definition: Property.h:38
void addEdgeToExecutionPlan(const AlgorithmNode *u, const AlgorithmNode *v)
set cause-effect connection between two algorithms in the execution plan
StatusCode addAlgorithmNode(Algorithm *daughterAlgo, const std::string &parentName, bool inverted, bool allPass)
Add algorithm node.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
std::vector< DataNode * > m_inputs
Inputs of the algorithm, represented as DataNode's.
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
virtual void acceptDHVisitor(IDataHandleVisitor *) const override
Definition: Algorithm.cpp:226
virtual int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const
Method to set algos to CONTROLREADY, if possible.
void dumpExecutionPlan()
dump to file encountered execution plan
std::vector< EventSlot > * m_eventSlots
void addHeadNode(const std::string &headName, bool modeOR, bool allPass, bool isLazy)
Add a node, which has no parents.
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
virtual void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print a string representing the control flow state.
virtual void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const
XXX: CF tests.
AlgoInputsMap m_algoNameToAlgoInputsMap
Indexes: maps of algorithm's name to algorithm's inputs/outputs.
const std::string & getNodeName() const
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
void ignore() const
Definition: StatusCode.h:108
State
Execution states of the algorithms.
virtual bool visitEnter(DecisionNode &) const =0
void updateEventState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const
XXX CF tests. Is needed for older CF implementation.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
bool m_allPass
Whether the selection result is relevant or always "pass".
bool promoteToDataReadyState(const int &slotNum, const AlgorithmNode *requestor=nullptr) const
static std::map< State, std::string > stateNames
StatusCode addDataNode(const DataObjID &dataPath)
Add DataNode that represents DataObject.
void addConsumerNode(AlgorithmNode *node)
Associate an AlgorithmNode, which is a data consumer of this one.
virtual bool promoteToControlReadyState(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
XXX: CF tests. Method to set algos to CONTROLREADY, if possible.
StatusCode updateState(unsigned int iAlgo, State newState)