All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
ExecutionFlowGraph.cpp
Go to the documentation of this file.
1 #include "ExecutionFlowGraph.h"
2 
3 namespace concurrency {
4 
5  //---------------------------------------------------------------------------
6  std::string ControlFlowNode::stateToString(const int& stateId) const {
7 
8  if (0 == stateId ) return "FALSE";
9  else if (1 == stateId ) return "TRUE";
10  else return "UNDEFINED";
11  }
12 
13  //---------------------------------------------------------------------------
15 
16  for (auto node : m_children)
17  delete node;
18  }
19 
20  //---------------------------------------------------------------------------
21  void DecisionNode::initialize(const std::unordered_map<std::string,unsigned int>& algname_index_map) {
22 
23  for (auto daughter : m_children)
24  daughter->initialize(algname_index_map);
25  }
26 
27  //---------------------------------------------------------------------------
29 
30  if (std::find(m_parents.begin(), m_parents.end(), node) == m_parents.end())
31  m_parents.push_back(node);
32  }
33 
34  //--------------------------------------------------------------------------
36 
37  if (std::find(m_children.begin(), m_children.end(), node) == m_children.end())
38  m_children.push_back(node);
39  }
40 
41  //---------------------------------------------------------------------------
42  void DecisionNode::printState(std::stringstream& output,
43  AlgsExecutionStates& states,
44  const std::vector<int>& node_decisions,
45  const unsigned int& recursionLevel) const {
46 
47  output << std::string(recursionLevel, ' ') << m_nodeName << " (" << m_nodeIndex << ")" << ", w/ decision: "
48  << stateToString(node_decisions[m_nodeIndex]) << "(" << node_decisions[m_nodeIndex] <<")" << std::endl;
49  for (auto daughter : m_children ) {
50  daughter->printState(output,states,node_decisions,recursionLevel+2);
51  }
52  }
53 
54  //---------------------------------------------------------------------------
56  std::vector<int>& node_decisions) const {
57  // check whether we already had a result earlier
58  // if (-1 != node_decisions[m_nodeIndex] ) { return node_decisions[m_nodeIndex]; }
59  int decision = ((m_allPass && m_isLazy) ? 1 : -1);
60  bool hasUndecidedChild = false;
61  for (auto daughter : m_children){
62  if (m_isLazy && (-1 !=decision || hasUndecidedChild ) ) {
63  node_decisions[m_nodeIndex] = decision;
64  return decision;} // if lazy return once result is known already or we can't fully evaluate right now because one daugther decision is missing still
65  auto res = daughter->updateState(states, node_decisions);
66  if ( -1 == res) {hasUndecidedChild = true;}
67  else if ( false == m_modeOR && res == 0 ){decision = 0;} // "and"-mode (once first result false, the overall decision is false)
68  else if ( true == m_modeOR && res == 1){decision = 1;} // "or"-mode (once first result true, the overall decision is true)
69  }
70  // what to do with yet undefined answers depends on whether AND or OR mode applies
71  if (!hasUndecidedChild && -1 == decision ) {
72  // OR mode: all results known, and none true -> reject
73  if ( true == m_modeOR){ decision = 0; }
74  // AND mode: all results known, and no false -> accept
75  else { decision = 1; }
76  }
77  // in all other cases I stay with previous decisions
78  node_decisions[m_nodeIndex] = decision;
79  if (m_allPass) decision = 1;
80  return decision;
81  }
82 
83  //---------------------------------------------------------------------------
84  void DecisionNode::updateDecision(const int& slotNum,
85  AlgsExecutionStates& states,
86  std::vector<int>& node_decisions,
87  const AlgorithmNode* requestor) const {
88 
89  int decision = ((m_allPass && m_isLazy) ? 1 : -1);
90  bool keepGoing = true;
91  bool hasUndecidedChild = false;
92  //std::cout << "++++++++++++++++++++BEGIN(UPDATING)++++++++++++++++++++" << std::endl;
93  //std::cout << "UPDATING DAUGHTERS of DECISION NODE: " << m_nodeName << std::endl;
94 
95  for (auto daughter : m_children){
96  // if lazy return once result is known already or we can't fully evaluate
97  // right now because one daughter decision is missing still
98  //std::cout << "----UPDATING DAUGHTER: " << daughter->getNodeName() << std::endl;
99  if (m_isLazy && !keepGoing) {
100  node_decisions[m_nodeIndex] = decision;
101  //std::cout << "STOPPING ITERATION OVER (UPDATING) DECISION NODE CHILDREN: " << m_nodeName << std::endl;
102  break;
103  //return;
104  }
105 
106  // modified
107  int& res = node_decisions[daughter->getNodeIndex()];
108  if (-1 == res) {
109  hasUndecidedChild = true;
110  if (typeid(*daughter) != typeid(concurrency::DecisionNode)) {
111  auto algod = (AlgorithmNode*) daughter;
112  algod->promoteToControlReadyState(slotNum,states,node_decisions);
113  bool result = algod->promoteToDataReadyState(slotNum, requestor);
114  if (result)
115  keepGoing = false;
116  } else {
117  daughter->updateDecision(slotNum, states, node_decisions, requestor);
118  }
119 
120  // "and"-mode (once first result false, the overall decision is false)
121  } else if (false == m_modeOR && res == 0) {
122  decision = 0;
123  keepGoing = false;
124  // "or"-mode (once first result true, the overall decision is true)
125  } else if (true == m_modeOR && res == 1) {
126  decision = 1;
127  keepGoing = false;
128  }
129  }
130 
131  // what to do with yet undefined answers depends on whether AND or OR mode applies
132  if (!hasUndecidedChild && -1 == decision ) {
133  // OR mode: all results known, and none true -> reject
134  if ( true == m_modeOR) {
135  decision = 0;
136  // AND mode: all results known, and no false -> accept
137  } else {
138  decision = 1;
139  }
140  }
141 
142  // in all other cases I stay with previous decisions
143  node_decisions[m_nodeIndex] = decision;
144 
145  // propagate decision upwards through the decision graph
146  if (-1 != decision)
147  for (auto p : m_parents)
148  p->updateDecision(slotNum, states, node_decisions, requestor);
149 
150  //std::cout << "++++++++++++++++++++END(UPDATING)++++++++++++++++++++" << std::endl;
151  }
152 
153  //---------------------------------------------------------------------------
155  AlgsExecutionStates& states,
156  std::vector<int>& node_decisions) const {
157  //std::cout << "REACHED DECISNODE " << m_nodeName << std::endl;
158  if (-1 != node_decisions[m_nodeIndex]) {
159  return true;
160  }
161 
162  for (auto daughter : m_children ) {
163  auto res = node_decisions[daughter->getNodeIndex()];
164  if (-1 == res) {
165  daughter->promoteToControlReadyState(slotNum, states, node_decisions);
166  if (m_isLazy) return true;
167  } else if (m_isLazy) {
168  if ((false == m_modeOR && res == 0) || (true == m_modeOR && res == 1)) return true;
169  }
170  }
171 
172  return true;
173  }
174 
175  //---------------------------------------------------------------------------
177 
178  if (visitor.visitEnter(*this)) {
179  bool result = visitor.visit(*this);
180  if (result)
181  return visitor.visitLeave(*this);
182 
183  for (auto child : m_children) {
184  bool keepGoing = child->accept(visitor);
185  if (m_isLazy && !keepGoing)
186  return false; //break;
187  }
188  }
189 
190  return true;
191  }
192 
193  //---------------------------------------------------------------------------
195 
196  for (auto node : m_outputs) {
197  delete node;
198  }
199  }
200 
201  //---------------------------------------------------------------------------
202  void AlgorithmNode::initialize(const std::unordered_map<std::string,unsigned int>& algname_index_map) {
203 
204  m_algoIndex = algname_index_map.at(m_algoName);
205  }
206 
207  //---------------------------------------------------------------------------
208  bool AlgorithmNode::promoteToControlReadyState(const int& /*slotNum*/,
209  AlgsExecutionStates& states,
210  std::vector<int>& /*node_decisions*/) const {
211 
212  auto& state = states[m_algoIndex];
213  bool result = false;
214 
215  if (State::INITIAL == state) {
216  states.updateState(m_algoIndex, State::CONTROLREADY);
217  //std::cout << "----> UPDATING ALGORITHM to CONTROLREADY: " << m_algoName << std::endl;
218  result = true;
219  } else if (State::CONTROLREADY == state) {
220  result = true;
221  }
222 
223  return result;
224  }
225 
226  //---------------------------------------------------------------------------
227  bool AlgorithmNode::promoteToDataReadyState(const int& slotNum, const AlgorithmNode* /*requestor*/) const {
228 
229  auto& states = m_graph->getAlgoStates(slotNum);
230  auto& state = states[m_algoIndex];
231  bool result = false;
232 
233  if (State::CONTROLREADY == state) {
234  if (dataDependenciesSatisfied(slotNum)) {
235  //std::cout << "----> UPDATING ALGORITHM to DATAREADY: " << m_algoName << std::endl;
236  states.updateState(m_algoIndex, State::DATAREADY);
237  result = true;
238 
239  //m_graph->addEdgeToExecutionPlan(requestor, this);
240 
241  /*
242  auto xtime = std::chrono::high_resolution_clock::now();
243  std::stringstream s;
244  s << getNodeName() << ", "
245  << (xtime-m_graph->getInitTime()).count() << "\n";
246  std::ofstream myfile;
247  myfile.open("DRTiming.csv", std::ios::app);
248  myfile << s.str();
249  myfile.close();
250  */
251  }
252  } else if (State::DATAREADY == state) {
253  result = true;
254  } else if (State::SCHEDULED == state) {
255  result = true;
256  }
257 
258  return result;
259  }
260 
261  //---------------------------------------------------------------------------
262  bool AlgorithmNode::dataDependenciesSatisfied(const int& slotNum) const {
263 
264  bool result = true;
265  auto& states = m_graph->getAlgoStates(slotNum);
266 
267  for (auto dataNode : m_inputs) {
268  result = false;
269  for (auto algoNode : dataNode->getProducers())
270  if (State::EVTACCEPTED == states[algoNode->getAlgoIndex()]) {
271  result = true;
272  break;
273  }
274  if (!result) break;
275  }
276 
277  return result;
278  }
279 
280  //---------------------------------------------------------------------------
282 
283  bool result = true;
284  for (auto dataNode : m_inputs) {
285 
286  result = false;
287  for (auto algoNode : dataNode->getProducers())
288  if (State::EVTACCEPTED == states[algoNode->getAlgoIndex()]) {
289  result = true;
290  break;
291  }
292 
293  if (!result) break;
294  }
295 
296  return result;
297  }
298 
299  //---------------------------------------------------------------------------
300  void AlgorithmNode::printState(std::stringstream& output,
301  AlgsExecutionStates& states,
302  const std::vector<int>& node_decisions,
303  const unsigned int& recursionLevel) const {
304  output << std::string(recursionLevel, ' ') << m_nodeName << " (" << m_nodeIndex << ")" << ", w/ decision: "
305  << stateToString(node_decisions[m_nodeIndex]) << "(" << node_decisions[m_nodeIndex] << ")"
306  << ", in state: " << states[m_algoIndex] << std::endl;
307  }
308 
309  //---------------------------------------------------------------------------
311  std::vector<int>& node_decisions) const {
312  // check whether we already had a result earlier
313  // if (-1 != node_decisions[m_nodeIndex] ) { return node_decisions[m_nodeIndex]; }
314  // since we reached this point in the control flow, this algorithm is supposed to run
315  // if it hasn't already
316  const State& state = states[m_algoIndex];
317  unsigned int decision = -1;
318  if (State::INITIAL == state) {states.updateState(m_algoIndex, State::CONTROLREADY);}
319  // now derive the proper result to pass back
320  if (true == m_allPass) {
321  decision = 1;
322  } else if (State::EVTACCEPTED == state) {
323  decision = !m_inverted;
324  } else if (State::EVTREJECTED == state) {
325  decision = m_inverted;
326  } else {
327  decision = -1; // result not known yet
328  }
329  node_decisions[m_nodeIndex] = decision;
330  return decision;
331  }
332 
333  //---------------------------------------------------------------------------
334  void AlgorithmNode::updateDecision(const int& slotNum,
335  AlgsExecutionStates& states,
336  std::vector<int>& node_decisions,
337  const AlgorithmNode* requestor) const {
338 
339  const State& state = states[m_algoIndex];
340  int decision = -1;
341  requestor = this;
342 
343  // now derive the proper result to pass back
344  if (true == m_allPass) {
345  decision = 1;
346  } else if (State::EVTACCEPTED == state) {
347  decision = !m_inverted;
348  } else if (State::EVTREJECTED == state) {
349  decision = m_inverted;
350  } else {
351  decision = -1; // result not known yet
352  }
353 
354  node_decisions[m_nodeIndex] = decision;
355 
356  if (-1 != decision) {
357  for (auto output : m_outputs)
358  for (auto consumer : output->getConsumers())
359  consumer->promoteToDataReadyState(slotNum, requestor);
360 
361  for (auto p : m_parents)
362  p->updateDecision(slotNum, states, node_decisions, requestor);
363  }
364  }
365 
366  //---------------------------------------------------------------------------
368 
369  if (visitor.visitEnter(*this)) {
370  bool result = visitor.visit(*this);
371  if (result) return false;
372  }
373 
374  return true;
375 
376  }
377 
378  //---------------------------------------------------------------------------
380 
381  if (std::find(m_parents.begin(), m_parents.end(), node) == m_parents.end())
382  m_parents.push_back(node);
383  }
384 
385  //---------------------------------------------------------------------------
387 
388  if (std::find(m_outputs.begin(),m_outputs.end(),node) == m_outputs.end())
389  m_outputs.push_back(node);
390  }
391 
392  //---------------------------------------------------------------------------
394 
395  if (std::find(m_inputs.begin(),m_inputs.end(),node) == m_inputs.end())
396  m_inputs.push_back(node);
397  }
398 
399  //---------------------------------------------------------------------------
400  StatusCode ExecutionFlowGraph::initialize(const std::unordered_map<std::string,unsigned int>& algname_index_map){
401 
402  m_headNode->initialize(algname_index_map);
403  //StatusCode sc = buildDataDependenciesRealm();
405 
406  if (!sc.isSuccess())
407  error() << "Could not build the data dependency realm." << endmsg;
408 
409  return sc;
410  }
411 
412  //---------------------------------------------------------------------------
413  StatusCode ExecutionFlowGraph::initialize(const std::unordered_map<std::string,unsigned int>& algname_index_map,
414  std::vector<EventSlot>& eventSlots){
415 
416  m_eventSlots = &eventSlots;
417  m_headNode->initialize(algname_index_map);
418  //StatusCode sc = buildDataDependenciesRealm();
420 
421  if (!sc.isSuccess())
422  error() << "Could not build the data dependency realm." << endmsg;
423 
424  if (msgLevel(MSG::DEBUG)) {
425  dumpDataFlow();
426  }
427 
428  return sc;
429  }
430 
431  //---------------------------------------------------------------------------
433 
434  const std::string& algoName = algo->name();
435 
436  const DataObjectDescriptorCollection& inputDOCollection = algo->inputDataObjects();
437  m_algoNameToAlgoInputsMap[algoName] = &inputDOCollection;
438 
439  debug() << "Inputs of " << algoName << ": ";
440  for (auto tag : inputDOCollection) {
441  if (inputDOCollection[tag].isValid())
442  debug() << inputDOCollection[tag].dataProductName() << " | ";
443  }
444  debug() << endmsg;
445 
446  const DataObjectDescriptorCollection& outputDOCollection = algo->outputDataObjects();
447  m_algoNameToAlgoOutputsMap[algoName] = &outputDOCollection;
448 
449  debug() << "Outputs of " << algoName << ": ";
450  for (auto tag : outputDOCollection) {
451  if (outputDOCollection[tag].isValid())
452  debug() << outputDOCollection[tag].dataProductName() << " | ";
453  }
454  debug() << endmsg;
455  }
456 
457  //---------------------------------------------------------------------------
459 
460  StatusCode global_sc(StatusCode::SUCCESS);
461 
462  for (auto algo : m_algoNameToAlgoNodeMap) {
463 
464  auto targetNode = m_algoNameToAlgoNodeMap[algo.first];
465 
466  // Find producers for all the inputs of the target node
467  auto& targetInCollection = *m_algoNameToAlgoInputsMap[algo.first];
468  for (auto inputTag : targetInCollection) {
469  auto& input2Match = targetInCollection[inputTag].dataProductName();
470  for (auto producer : m_algoNameToAlgoOutputsMap) {
471  auto& outputs = *m_algoNameToAlgoOutputsMap[producer.first];
472  for (auto outputTag : outputs) {
473  if (outputs[outputTag].isValid() && outputs[outputTag].dataProductName() == input2Match) {
474  auto& known_producers = targetNode->getSupplierNodes();
475  auto valid_producer = m_algoNameToAlgoNodeMap[producer.first];
476  auto& known_consumers = valid_producer->getConsumerNodes();
477  if (std::find(known_producers.begin(),known_producers.end(),valid_producer) == known_producers.end())
478  targetNode->addSupplierNode(valid_producer);
479  if (std::find(known_consumers.begin(),known_consumers.end(),targetNode) == known_consumers.end())
480  valid_producer->addConsumerNode(targetNode);
481  }
482  }
483  }
484  }
485 
486  // Find consumers for all the outputs of the target node
487  auto& targetOutCollection = *m_algoNameToAlgoOutputsMap[algo.first];
488  for (auto outputTag : targetOutCollection) {
489  auto& output2Match = targetOutCollection[outputTag].dataProductName();
490  for (auto consumer : m_algoNameToAlgoInputsMap) {
491  auto& inputs = *m_algoNameToAlgoInputsMap[consumer.first];
492  for (auto inputTag : inputs) {
493  if (inputs[inputTag].isValid() && inputs[inputTag].dataProductName() == output2Match) {
494  auto& known_consumers = targetNode->getConsumerNodes();
495  auto valid_consumer = m_algoNameToAlgoNodeMap[consumer.first];
496  auto& known_producers = valid_consumer->getSupplierNodes();
497  if (std::find(known_producers.begin(),known_producers.end(),targetNode) == known_producers.end())
498  valid_consumer->addSupplierNode(targetNode);
499  if (std::find(known_consumers.begin(),known_consumers.end(),valid_consumer) == known_consumers.end())
500  targetNode->addConsumerNode(valid_consumer);
501  }
502  }
503  }
504  }
505 
506  }
507  return global_sc;
508  }
509 
510  //---------------------------------------------------------------------------
512 
513  StatusCode global_sc(StatusCode::SUCCESS);
514 
515  // Create the DataObjects (DO) realm (represented by DataNodes in the graph), connected to DO producers (AlgorithmNodes)
516  for (auto algo : m_algoNameToAlgoNodeMap) {
517 
518  StatusCode sc;
519  auto& outCollection = *m_algoNameToAlgoOutputsMap[algo.first];
520  for (auto outputTag : outCollection) {
521  if (outCollection[outputTag].isValid()) {
522  auto& output = outCollection[outputTag].dataProductName();
523  sc = addDataNode(output);
524  if (!sc.isSuccess()) {
525  error() << "Extra producer (" << algo.first << ") for DataObject @ " << output
526  << " has been detected: this is not allowed." << endmsg;
527  global_sc = StatusCode::FAILURE;
528  }
529  auto dataNode = getDataNode(output);
530  dataNode->addProducerNode(algo.second);
531  algo.second->addOutputDataNode(dataNode);
532  }
533  }
534  }
535 
536  // Connect previously created DO realm to DO consumers (AlgorithmNodes)
537  for (auto algo : m_algoNameToAlgoNodeMap) {
538  auto& inCollection = *m_algoNameToAlgoInputsMap[algo.first];
539  for (auto inputTag : inCollection) {
540  if (inCollection[inputTag].isValid()) {
541  DataNode* dataNode = nullptr;
542  auto& primaryPath = inCollection[inputTag].dataProductName();
543  auto itP = m_dataPathToDataNodeMap.find(primaryPath);
544  if (itP != m_dataPathToDataNodeMap.end()) {
545  dataNode = getDataNode(primaryPath);
546  if (!inCollection[inputTag].alternativeDataProductNames().empty())
547  warning() << "Dropping all alternative data dependencies in the graph, but '" << primaryPath
548  << "', for algorithm " << algo.first << endmsg;
549  } else {
550  for (auto alterPath : inCollection[inputTag].alternativeDataProductNames()) {
551  auto itAP = m_dataPathToDataNodeMap.find(alterPath);
552  if (itAP != m_dataPathToDataNodeMap.end()) {
553  dataNode = getDataNode(alterPath);
554  warning() << "Dropping all alternative data dependencies in the graph, but '" << alterPath
555  << "', for algorithm " << algo.first << endmsg;
556  break;
557  }
558  }
559  }
560  if (dataNode) {
561  dataNode->addConsumerNode(algo.second);
562  algo.second->addInputDataNode(dataNode);
563  }
564 
565  }
566  }
567  }
568 
569  return global_sc;
570  }
571 
572  //---------------------------------------------------------------------------
573  StatusCode ExecutionFlowGraph::addAlgorithmNode(Algorithm* algo, const std::string& parentName, bool inverted, bool allPass) {
574 
576 
577  auto& algoName = algo->name();
578 
579  auto itP = m_decisionNameToDecisionHubMap.find(parentName);
580  concurrency::DecisionNode* parentNode;
581  if ( itP != m_decisionNameToDecisionHubMap.end()) {
582  parentNode = itP->second;
583  auto itA = m_algoNameToAlgoNodeMap.find(algoName);
584  concurrency::AlgorithmNode* algoNode;
585  if ( itA != m_algoNameToAlgoNodeMap.end()) {
586  algoNode = itA->second;
587  } else {
588  algoNode = new concurrency::AlgorithmNode(*this,m_nodeCounter,algoName,inverted,allPass);
589  ++m_nodeCounter;
590  m_algoNameToAlgoNodeMap[algoName] = algoNode;
591  debug() << "AlgoNode " << algoName << " added @ " << algoNode << endmsg;
592  registerIODataObjects(algo);
593  }
594 
595  parentNode->addDaughterNode(algoNode);
596  algoNode->addParentNode(parentNode);
597  } else {
598  sc = StatusCode::FAILURE;
599  error() << "DecisionHubNode " << parentName << ", meant to be used as parent, is not registered in the EFG." << endmsg;
600  }
601 
602  return sc;
603  }
604 
605  //---------------------------------------------------------------------------
606  AlgorithmNode* ExecutionFlowGraph::getAlgorithmNode(const std::string& algoName) const {
607 
608  return m_algoNameToAlgoNodeMap.at(algoName);
609  }
610 
611  //---------------------------------------------------------------------------
612  StatusCode ExecutionFlowGraph::addDataNode(const std::string& dataPath) {
613 
614  StatusCode sc;
615 
616  auto itD = m_dataPathToDataNodeMap.find(dataPath);
617  concurrency::DataNode* dataNode;
618  if ( itD != m_dataPathToDataNodeMap.end()) {
619  dataNode = itD->second;
620  //sc = StatusCode::FAILURE;
621  sc = StatusCode::SUCCESS;
622  } else {
623  dataNode = new concurrency::DataNode(*this,dataPath);
624  m_dataPathToDataNodeMap[dataPath] = dataNode;
625  debug() << " DataNode for " << dataPath << " added @ " << dataNode << endmsg;
626  sc = StatusCode::SUCCESS;
627  }
628 
629  return sc;
630  }
631 
632  //---------------------------------------------------------------------------
633  DataNode* ExecutionFlowGraph::getDataNode(const std::string& dataPath) const {
634 
635  return m_dataPathToDataNodeMap.at(dataPath);
636  }
637 
638  //---------------------------------------------------------------------------
639  StatusCode ExecutionFlowGraph::addDecisionHubNode(Algorithm* decisionHubAlgo, const std::string& parentName, bool modeOR, bool allPass, bool isLazy) {
640 
642 
643  auto& decisionHubName = decisionHubAlgo->name();
644 
645  auto itP = m_decisionNameToDecisionHubMap.find(parentName);
646  concurrency::DecisionNode* parentNode;
647  if ( itP != m_decisionNameToDecisionHubMap.end()) {
648  parentNode = itP->second;
649  auto itA = m_decisionNameToDecisionHubMap.find(decisionHubName);
650  concurrency::DecisionNode* decisionHubNode;
651  if ( itA != m_decisionNameToDecisionHubMap.end()) {
652  decisionHubNode = itA->second;
653  } else {
654  decisionHubNode = new concurrency::DecisionNode(*this,m_nodeCounter,decisionHubName,modeOR,allPass,isLazy);
655  ++m_nodeCounter;
656  m_decisionNameToDecisionHubMap[decisionHubName] = decisionHubNode;
657  debug() << "DecisionHubNode " << decisionHubName << " added @ " << decisionHubNode << endmsg;
658  }
659 
660  parentNode->addDaughterNode(decisionHubNode);
661  decisionHubNode->addParentNode(parentNode);
662  } else {
663  sc = StatusCode::FAILURE;
664  error() << "DecisionHubNode " << parentName << ", meant to be used as parent, is not registered in the EFG." << endmsg;
665  }
666 
667  return sc;
668  }
669 
670  //---------------------------------------------------------------------------
671  void ExecutionFlowGraph::addHeadNode(const std::string& headName, bool modeOR, bool allPass, bool isLazy) {
672 
673  auto itH = m_decisionNameToDecisionHubMap.find(headName);
674  if ( itH != m_decisionNameToDecisionHubMap.end()) {
675  m_headNode = itH->second;
676  } else {
677  m_headNode = new concurrency::DecisionNode(*this,m_nodeCounter,headName,modeOR,allPass,isLazy);
678  ++m_nodeCounter;
680  }
681 
682  }
683 
684  //---------------------------------------------------------------------------
686  std::vector<int>& node_decisions) const {
687  m_headNode->updateState(algo_states, node_decisions);
688  }
689 
690  //---------------------------------------------------------------------------
691  void ExecutionFlowGraph::updateDecision(const std::string& algo_name,
692  const int& slotNum,
693  AlgsExecutionStates& algo_states,
694  std::vector<int>& node_decisions) const {
695  //debug() << "(UPDATING)Setting decision of algorithm " << algo_name << " and propagating it upwards.." << endmsg;
696  getAlgorithmNode(algo_name)->updateDecision(slotNum, algo_states, node_decisions);
697  }
698 
699  //---------------------------------------------------------------------------
701 
702  info() << "Starting ranking by data outputs .. " << endmsg;
703  for (auto& pair : m_algoNameToAlgoNodeMap) {
704  pair.second->accept(ranker);
705  debug() << " Rank of " << pair.first << ": " << pair.second->getRank() << endmsg;
706  }
707  }
708 
709  //---------------------------------------------------------------------------
710  const std::vector<AlgorithmNode*> ExecutionFlowGraph::getDataIndependentNodes() const {
711 
712  std::vector<AlgorithmNode*> result;
713 
714  for (auto node : m_algoNameToAlgoInputsMap) {
715  const DataObjectDescriptorCollection& collection = *(node.second);
716  for (auto tag : collection)
717  if (collection[tag].isValid()) {
718  result.push_back(getAlgorithmNode(node.first));
719  break;
720  }
721  }
722 
723  return result;
724  }
725 
726  //---------------------------------------------------------------------------
728 
729  debug() << "====================================" << endmsg;
730  debug() << "Data origins and destinations:" << endmsg;
731  debug() << "====================================" << endmsg;
732 
733  for (auto& pair : m_dataPathToDataNodeMap) {
734 
735  for (auto algoNode : pair.second->getProducers())
736  debug() << " " << algoNode->getNodeName() << endmsg;
737 
738  debug() << " V" << endmsg;
739  debug() << " o " << pair.first << endmsg;
740  debug() << " V" << endmsg;
741 
742  for (auto algoNode : pair.second->getConsumers())
743  debug() << " " << algoNode->getNodeName() << endmsg;
744 
745  debug() << " ====================================" << endmsg;
746  }
747  }
748 
750  std::ofstream myfile;
751  myfile.open("ExecutionPlan.graphml", std::ios::app);
752 
753  boost::dynamic_properties dp;
754  dp.property("name", boost::get(&boost::AlgoNodeStruct::m_name, m_ExecPlan));
755  dp.property("index", boost::get(&boost::AlgoNodeStruct::m_index, m_ExecPlan));
756  dp.property("rank", boost::get(&boost::AlgoNodeStruct::m_rank, m_ExecPlan));
757  dp.property("runtime", boost::get(&boost::AlgoNodeStruct::m_runtime, m_ExecPlan));
758 
759  boost::write_graphml(myfile, m_ExecPlan, dp);
760 
761  myfile.close();
762  }
763 
765 
766  boost::AlgoVertex source;
767  if (u == nullptr) {
768  auto itT = m_exec_plan_map.find("ENTRY");
769  if ( itT != m_exec_plan_map.end()) {
770  source = itT->second;
771  } else {
772  source = boost::add_vertex(boost::AlgoNodeStruct("ENTRY",-999,-999, 0), m_ExecPlan);
773  m_exec_plan_map["ENTRY"] = source;
774  }
775  } else {
776  auto itS = m_exec_plan_map.find(u->getNodeName());
777  if ( itS != m_exec_plan_map.end()) {
778  source = itS->second;
779  } else {
780  auto cruncher = dynamic_cast<CPUCruncher*> ( u->getAlgorithmRepresentatives()[0] );
781  if (!cruncher) fatal() << "Conversion from IAlgorithm to CPUCruncher failed" << endmsg;
782  source = boost::add_vertex(boost::AlgoNodeStruct(u->getNodeName(),u->getAlgoIndex(),u->getRank(),cruncher->get_runtime()), m_ExecPlan);
783  m_exec_plan_map[u->getNodeName()] = source;
784  }
785  }
786 
787  boost::AlgoVertex target;
788  auto itP = m_exec_plan_map.find(v->getNodeName());
789  if ( itP != m_exec_plan_map.end()) {
790  target = itP->second;
791  } else {
792  auto cruncher = dynamic_cast<CPUCruncher*> ( v->getAlgorithmRepresentatives()[0] );
793  if (!cruncher) fatal() << "Conversion from IAlgorithm to CPUCruncher failed" << endmsg;
794  target = boost::add_vertex(boost::AlgoNodeStruct(v->getNodeName(),v->getAlgoIndex(),v->getRank(),cruncher->get_runtime()), m_ExecPlan);
795  m_exec_plan_map[v->getNodeName()] = target;
796  }
797 
798  debug() << "Edge added to execution plan" << endmsg;
799  boost::add_edge(source, target, m_ExecPlan);
800  }
801 
802 } // namespace
void dumpDataFlow() const
Print out all data origins and destinations, as reflected in the EF graph.
bool m_isLazy
Whether to evaluate lazily - i.e. whether to stop once result known.
std::string stateToString(const int &stateId) const
Translation between state id and name.
std::vector< DataNode * > m_outputs
Vectors, used in augmented data dependencies realm Outputs of the algorithm, represented as DataNode'...
const std::vector< AlgorithmNode * > getDataIndependentNodes() const
const unsigned int & getAlgoIndex() const
XXX: CF tests.
virtual bool promoteToControlReadyState(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
XXX: CF tests.
virtual bool visitLeave(DecisionNode &) const =0
std::vector< DecisionNode * > m_parents
XXX: CF tests. All direct parent nodes in the tree.
StatusCode initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize graph.
unsigned int m_algoIndex
The index of the algorithm.
unsigned int m_nodeCounter
Total number of nodes in the graph.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
const DataObjectDescriptorCollection & outputDataObjects() const override
Definition: Algorithm.h:700
boost::ExecPlan m_ExecPlan
temporary items to experiment with execution planning
StatusCode buildAugmentedDataDependenciesRealm()
Build data dependency realm WITH data object nodes participating.
void addDaughterNode(ControlFlowNode *node)
Add a daughter node.
StatusCode addDecisionHubNode(Algorithm *daughterAlgo, const std::string &parentName, bool modeOR, bool allPass, bool isLazy)
Add a node, which aggregates decisions of direct daughter nodes.
virtual int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const
Method to set algos to CONTROLREADY, if possible.
virtual bool accept(IGraphVisitor &visitor)
std::string m_algoName
The name of the algorithm.
bool dataDependenciesSatisfied(const int &slotNum) const
Method to check whether the Algorithm has its all data dependency satisfied.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
std::vector< ControlFlowNode * > m_children
All direct daughter nodes in the tree.
DataNodesMap m_dataPathToDataNodeMap
Index: map of data path to DataNode.
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
const std::vector< IAlgorithm * > & getAlgorithmRepresentatives() const
get Algorithm representatives
std::vector< DecisionNode * > m_parents
XXX: CF tests.
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
A method to update algorithm node decision, and propagate it upwards.
std::map< std::string, boost::AlgoVertex > m_exec_plan_map
bool m_inverted
Whether the selection result is negated or not.
bool m_allPass
Whether always passing regardless of daughter results.
ExecutionFlowGraph * m_graph
virtual void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize.
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm's name to AlgorithmNode.
AlgsExecutionStates & getAlgoStates(const int &slotNum) const
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:919
void registerIODataObjects(const Algorithm *algo)
Register algorithm in the Data Dependency index.
bool m_modeOR
Whether acting as "and" (false) or "or" node (true)
void addInputDataNode(DataNode *node)
Associate an AlgorithmNode, which is a data consumer of this one.
const float & getRank() const
Get Algorithm rank.
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
virtual bool accept(IGraphVisitor &visitor)
graph_traits< ExecPlan >::vertex_descriptor AlgoVertex
virtual void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print a string representing the control flow state.
virtual bool visit(DecisionNode &)=0
virtual ~DecisionNode()
Destructor.
StatusCode buildDataDependenciesRealm()
Build data dependency realm WITHOUT data object nodes: just interconnect algorithm nodes directly...
DecisionHubsMap m_decisionNameToDecisionHubMap
Index: map of decision's name to DecisionHub.
virtual void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize.
void addOutputDataNode(DataNode *node)
Associate an AlgorithmNode, which is a data supplier for this one.
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:77
void rankAlgorithms(IGraphVisitor &ranker) const
Rank Algorithm nodes by the number of data outputs.
virtual void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const
XXX: CF tests.
void addEdgeToExecutionPlan(const AlgorithmNode *u, const AlgorithmNode *v)
set cause-effect connection between two algorithms in the execution plan
StatusCode addAlgorithmNode(Algorithm *daughterAlgo, const std::string &parentName, bool inverted, bool allPass)
Add algorithm node.
std::vector< DataNode * > m_inputs
Inputs of the algorithm, represented as DataNode's.
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
StatusCode addDataNode(const std::string &dataPath)
Add DataNode that represents DataObject.
DataNode * getDataNode(const std::string &dataPath) const
Get DataNode by DataObject path using graph index.
virtual int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const
Method to set algos to CONTROLREADY, if possible.
void dumpExecutionPlan()
dump to file encountered execution plan
std::vector< EventSlot > * m_eventSlots
void addHeadNode(const std::string &headName, bool modeOR, bool allPass, bool isLazy)
Add a node, which has no parents.
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
virtual void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print a string representing the control flow state.
virtual void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const
XXX: CF tests.
AlgoInputsMap m_algoNameToAlgoInputsMap
Indexes: maps of algorithm's name to algorithm's inputs/outputs.
const std::string & getNodeName() const
__attribute__((deprecated)) const std const DataObjectDescriptorCollection & inputDataObjects() const override
Return the handles declared in the algorithm.
Definition: Algorithm.h:697
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
State
Execution states of the algorithms.
virtual bool visitEnter(DecisionNode &) const =0
void updateEventState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const
XXX CF tests. Is needed for older CF implementation.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
bool m_allPass
Whether the selection result is relevant or always "pass".
bool promoteToDataReadyState(const int &slotNum, const AlgorithmNode *requestor=nullptr) const
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
void addConsumerNode(AlgorithmNode *node)
Associate an AlgorithmNode, which is a data consumer of this one.
virtual bool promoteToControlReadyState(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
XXX: CF tests. Method to set algos to CONTROLREADY, if possible.
StatusCode updateState(unsigned int iAlgo, State newState)