concurrency::ExecutionFlowGraph Class Reference

#include <src/ExecutionFlowGraph.h>

Inheritance diagram for concurrency::ExecutionFlowGraph:
Collaboration diagram for concurrency::ExecutionFlowGraph:

Public Member Functions

 ExecutionFlowGraph (const std::string &name, SmartIF< ISvcLocator > svc)
 Constructor. More...
 
 ~ExecutionFlowGraph () override
 Destructor. More...
 
StatusCode initialize (const std::unordered_map< std::string, unsigned int > &algname_index_map)
 Initialize graph. More...
 
StatusCode initialize (const std::unordered_map< std::string, unsigned int > &algname_index_map, std::vector< EventSlot > &eventSlots)
 
void registerIODataObjects (const Algorithm *algo)
 Register algorithm in the Data Dependency index. More...
 
StatusCode buildDataDependenciesRealm ()
 Build data dependency realm WITHOUT data object nodes: just interconnect algorithm nodes directly. More...
 
StatusCode buildAugmentedDataDependenciesRealm ()
 Build data dependency realm WITH data object nodes participating. More...
 
void addHeadNode (const std::string &headName, bool modeOR, bool allPass, bool isLazy)
 Add a node, which has no parents. More...
 
StatusCode addAlgorithmNode (Algorithm *daughterAlgo, const std::string &parentName, bool inverted, bool allPass)
 Add algorithm node. More...
 
template<class T >
void attachAlgorithmsToNodes (const std::string &algo_name, const T &container)
 Attach pointers to real Algorithms (and their clones) to Algorithm nodes of the graph. More...
 
AlgorithmNodegetAlgorithmNode (const std::string &algoName) const
 Get the AlgorithmNode from by algorithm name using graph index. More...
 
StatusCode addDataNode (const std::string &dataPath)
 Add DataNode that represents DataObject. More...
 
DataNodegetDataNode (const std::string &dataPath) const
 Get DataNode by DataObject path using graph index. More...
 
StatusCode addDecisionHubNode (Algorithm *daughterAlgo, const std::string &parentName, bool modeOR, bool allPass, bool isLazy)
 Add a node, which aggregates decisions of direct daughter nodes. More...
 
unsigned int getControlFlowNodeCounter () const
 Get total number of graph nodes. More...
 
void updateEventState (AlgsExecutionStates &states, std::vector< int > &node_decisions) const
 XXX CF tests. Is needed for older CF implementation. More...
 
void updateDecision (const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
 A method to update algorithm node decision, and propagate it upwards. More...
 
void rankAlgorithms (IGraphVisitor &ranker) const
 Rank Algorithm nodes by the number of data outputs. More...
 
void printState (std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
 Print a string representing the control flow state. More...
 
const std::vector< AlgorithmNode * > getDataIndependentNodes () const
 
const std::string & name () const override
 Retrieve name of the service. More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator. More...
 
const std::chrono::system_clock::time_point getInitTime () const
 
AlgsExecutionStatesgetAlgoStates (const int &slotNum) const
 
std::vector< int > & getNodeDecisions (const int &slotNum) const
 
void dumpDataFlow () const
 Print out all data origins and destinations, as reflected in the EF graph. More...
 
void dumpExecutionPlan ()
 dump to file encountered execution plan More...
 
void addEdgeToExecutionPlan (const AlgorithmNode *u, const AlgorithmNode *v)
 set cause-effect connection between two algorithms in the execution plan More...
 
- Public Member Functions inherited from CommonMessaging< IExecutionFlowGraph >
 ~CommonMessaging () override=default
 Virtual destructor. More...
 
 ~CommonMessaging () override=default
 Virtual destructor. More...
 
SmartIF< IMessageSvc > & msgSvc () const
 The standard message service. More...
 
SmartIF< IMessageSvc > & msgSvc () const
 The standard message service. More...
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream. More...
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts. More...
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream. More...
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts. More...
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS) More...
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS) More...
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL) More...
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL) More...
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING) More...
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING) More...
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG) More...
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG) More...
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE) More...
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE) More...
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MSG::Level msgLevel () const
 get the output level from the embedded MsgStream More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 
MSG::Level msgLevel () const
 get the output level from the embedded MsgStream More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 

Private Attributes

friend ExecutionFlowManager
 
DecisionNodem_headNode
 the head node of the control flow graph; may want to have multiple ones once supporting trigger paths More...
 
AlgoNodesMap m_algoNameToAlgoNodeMap
 Index: map of algorithm's name to AlgorithmNode. More...
 
DecisionHubsMap m_decisionNameToDecisionHubMap
 Index: map of decision's name to DecisionHub. More...
 
DataNodesMap m_dataPathToDataNodeMap
 Index: map of data path to DataNode. More...
 
AlgoInputsMap m_algoNameToAlgoInputsMap
 Indexes: maps of algorithm's name to algorithm's inputs/outputs. More...
 
AlgoOutputsMap m_algoNameToAlgoOutputsMap
 
unsigned int m_nodeCounter
 Total number of nodes in the graph. More...
 
SmartIF< ISvcLocatorm_svcLocator
 Service locator (needed to access the MessageSvc) More...
 
const std::string m_name
 
const std::chrono::system_clock::time_point m_initTime
 
std::vector< EventSlot > * m_eventSlots
 
boost::ExecPlan m_ExecPlan
 temporary items to experiment with execution planning More...
 
std::map< std::string, boost::AlgoVertexm_exec_plan_map
 

Additional Inherited Members

- Public Types inherited from CommonMessaging< IExecutionFlowGraph >
using base_class = CommonMessaging
 
using base_class = CommonMessaging
 
- Protected Member Functions inherited from CommonMessaging< IExecutionFlowGraph >
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 
- Protected Attributes inherited from CommonMessaging< IExecutionFlowGraph >
SmartIF< IMessageSvcm_msgsvc
 Pointer to the message service;. More...
 
std::unique_ptr< MsgStreamm_msgStream
 The predefined message stream. More...
 
bool m_streamWithService
 Flag to create a new MsgStream if it was created without the message service. More...
 

Detailed Description

Definition at line 270 of file ExecutionFlowGraph.h.

Constructor & Destructor Documentation

concurrency::ExecutionFlowGraph::ExecutionFlowGraph ( const std::string &  name,
SmartIF< ISvcLocator svc 
)
inline

Constructor.

Definition at line 274 of file ExecutionFlowGraph.h.

274  :
275  m_headNode(0), m_nodeCounter(0), m_svcLocator(svc), m_name(name), m_initTime(std::chrono::high_resolution_clock::now()),
276  m_eventSlots(nullptr) {}
unsigned int m_nodeCounter
Total number of nodes in the graph.
SmartIF< ISvcLocator > m_svcLocator
Service locator (needed to access the MessageSvc)
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
const std::string & name() const override
Retrieve name of the service.
const std::chrono::system_clock::time_point m_initTime
std::vector< EventSlot > * m_eventSlots
concurrency::ExecutionFlowGraph::~ExecutionFlowGraph ( )
inlineoverride

Destructor.

Definition at line 278 of file ExecutionFlowGraph.h.

278  {
279  if (m_headNode != 0) delete m_headNode;
280  }
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...

Member Function Documentation

StatusCode concurrency::ExecutionFlowGraph::addAlgorithmNode ( Algorithm daughterAlgo,
const std::string &  parentName,
bool  inverted,
bool  allPass 
)

Add algorithm node.

Definition at line 573 of file ExecutionFlowGraph.cpp.

573  {
574 
576 
577  auto& algoName = algo->name();
578 
579  auto itP = m_decisionNameToDecisionHubMap.find(parentName);
580  concurrency::DecisionNode* parentNode;
581  if ( itP != m_decisionNameToDecisionHubMap.end()) {
582  parentNode = itP->second;
583  auto itA = m_algoNameToAlgoNodeMap.find(algoName);
584  concurrency::AlgorithmNode* algoNode;
585  if ( itA != m_algoNameToAlgoNodeMap.end()) {
586  algoNode = itA->second;
587  } else {
588  algoNode = new concurrency::AlgorithmNode(*this,m_nodeCounter,algoName,inverted,allPass);
589  ++m_nodeCounter;
590  m_algoNameToAlgoNodeMap[algoName] = algoNode;
591  debug() << "AlgoNode " << algoName << " added @ " << algoNode << endmsg;
592  registerIODataObjects(algo);
593  }
594 
595  parentNode->addDaughterNode(algoNode);
596  algoNode->addParentNode(parentNode);
597  } else {
598  sc = StatusCode::FAILURE;
599  error() << "DecisionHubNode " << parentName << ", meant to be used as parent, is not registered in the EFG." << endmsg;
600  }
601 
602  return sc;
603  }
unsigned int m_nodeCounter
Total number of nodes in the graph.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void addDaughterNode(ControlFlowNode *node)
Add a daughter node.
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm's name to AlgorithmNode.
void registerIODataObjects(const Algorithm *algo)
Register algorithm in the Data Dependency index.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
DecisionHubsMap m_decisionNameToDecisionHubMap
Index: map of decision's name to DecisionHub.
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
StatusCode concurrency::ExecutionFlowGraph::addDataNode ( const std::string &  dataPath)

Add DataNode that represents DataObject.

Definition at line 612 of file ExecutionFlowGraph.cpp.

612  {
613 
614  StatusCode sc;
615 
616  auto itD = m_dataPathToDataNodeMap.find(dataPath);
617  concurrency::DataNode* dataNode;
618  if ( itD != m_dataPathToDataNodeMap.end()) {
619  dataNode = itD->second;
620  //sc = StatusCode::FAILURE;
621  sc = StatusCode::SUCCESS;
622  } else {
623  dataNode = new concurrency::DataNode(*this,dataPath);
624  m_dataPathToDataNodeMap[dataPath] = dataNode;
625  debug() << " DataNode for " << dataPath << " added @ " << dataNode << endmsg;
626  sc = StatusCode::SUCCESS;
627  }
628 
629  return sc;
630  }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
DataNodesMap m_dataPathToDataNodeMap
Index: map of data path to DataNode.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode concurrency::ExecutionFlowGraph::addDecisionHubNode ( Algorithm daughterAlgo,
const std::string &  parentName,
bool  modeOR,
bool  allPass,
bool  isLazy 
)

Add a node, which aggregates decisions of direct daughter nodes.

Definition at line 639 of file ExecutionFlowGraph.cpp.

639  {
640 
642 
643  auto& decisionHubName = decisionHubAlgo->name();
644 
645  auto itP = m_decisionNameToDecisionHubMap.find(parentName);
646  concurrency::DecisionNode* parentNode;
647  if ( itP != m_decisionNameToDecisionHubMap.end()) {
648  parentNode = itP->second;
649  auto itA = m_decisionNameToDecisionHubMap.find(decisionHubName);
650  concurrency::DecisionNode* decisionHubNode;
651  if ( itA != m_decisionNameToDecisionHubMap.end()) {
652  decisionHubNode = itA->second;
653  } else {
654  decisionHubNode = new concurrency::DecisionNode(*this,m_nodeCounter,decisionHubName,modeOR,allPass,isLazy);
655  ++m_nodeCounter;
656  m_decisionNameToDecisionHubMap[decisionHubName] = decisionHubNode;
657  debug() << "DecisionHubNode " << decisionHubName << " added @ " << decisionHubNode << endmsg;
658  }
659 
660  parentNode->addDaughterNode(decisionHubNode);
661  decisionHubNode->addParentNode(parentNode);
662  } else {
663  sc = StatusCode::FAILURE;
664  error() << "DecisionHubNode " << parentName << ", meant to be used as parent, is not registered in the EFG." << endmsg;
665  }
666 
667  return sc;
668  }
unsigned int m_nodeCounter
Total number of nodes in the graph.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void addDaughterNode(ControlFlowNode *node)
Add a daughter node.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
DecisionHubsMap m_decisionNameToDecisionHubMap
Index: map of decision's name to DecisionHub.
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
void concurrency::ExecutionFlowGraph::addEdgeToExecutionPlan ( const AlgorithmNode u,
const AlgorithmNode v 
)

set cause-effect connection between two algorithms in the execution plan

Definition at line 764 of file ExecutionFlowGraph.cpp.

764  {
765 
766  boost::AlgoVertex source;
767  if (u == nullptr) {
768  auto itT = m_exec_plan_map.find("ENTRY");
769  if ( itT != m_exec_plan_map.end()) {
770  source = itT->second;
771  } else {
772  source = boost::add_vertex(boost::AlgoNodeStruct("ENTRY",-999,-999, 0), m_ExecPlan);
773  m_exec_plan_map["ENTRY"] = source;
774  }
775  } else {
776  auto itS = m_exec_plan_map.find(u->getNodeName());
777  if ( itS != m_exec_plan_map.end()) {
778  source = itS->second;
779  } else {
780  auto cruncher = dynamic_cast<CPUCruncher*> ( u->getAlgorithmRepresentatives()[0] );
781  if (!cruncher) fatal() << "Conversion from IAlgorithm to CPUCruncher failed" << endmsg;
782  source = boost::add_vertex(boost::AlgoNodeStruct(u->getNodeName(),u->getAlgoIndex(),u->getRank(),cruncher->get_runtime()), m_ExecPlan);
783  m_exec_plan_map[u->getNodeName()] = source;
784  }
785  }
786 
787  boost::AlgoVertex target;
788  auto itP = m_exec_plan_map.find(v->getNodeName());
789  if ( itP != m_exec_plan_map.end()) {
790  target = itP->second;
791  } else {
792  auto cruncher = dynamic_cast<CPUCruncher*> ( v->getAlgorithmRepresentatives()[0] );
793  if (!cruncher) fatal() << "Conversion from IAlgorithm to CPUCruncher failed" << endmsg;
794  target = boost::add_vertex(boost::AlgoNodeStruct(v->getNodeName(),v->getAlgoIndex(),v->getRank(),cruncher->get_runtime()), m_ExecPlan);
795  m_exec_plan_map[v->getNodeName()] = target;
796  }
797 
798  debug() << "Edge added to execution plan" << endmsg;
799  boost::add_edge(source, target, m_ExecPlan);
800  }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
boost::ExecPlan m_ExecPlan
temporary items to experiment with execution planning
std::map< std::string, boost::AlgoVertex > m_exec_plan_map
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
graph_traits< ExecPlan >::vertex_descriptor AlgoVertex
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
void concurrency::ExecutionFlowGraph::addHeadNode ( const std::string &  headName,
bool  modeOR,
bool  allPass,
bool  isLazy 
)

Add a node, which has no parents.

Definition at line 671 of file ExecutionFlowGraph.cpp.

671  {
672 
673  auto itH = m_decisionNameToDecisionHubMap.find(headName);
674  if ( itH != m_decisionNameToDecisionHubMap.end()) {
675  m_headNode = itH->second;
676  } else {
677  m_headNode = new concurrency::DecisionNode(*this,m_nodeCounter,headName,modeOR,allPass,isLazy);
678  ++m_nodeCounter;
680  }
681 
682  }
unsigned int m_nodeCounter
Total number of nodes in the graph.
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
DecisionHubsMap m_decisionNameToDecisionHubMap
Index: map of decision's name to DecisionHub.
template<class T >
void concurrency::ExecutionFlowGraph::attachAlgorithmsToNodes ( const std::string &  algo_name,
const T &  container 
)
inline

Attach pointers to real Algorithms (and their clones) to Algorithm nodes of the graph.

Definition at line 297 of file ExecutionFlowGraph.h.

297  {
298  auto node = getAlgorithmNode(algo_name);
299  for (auto ialgoIt = container.unsafe_begin(); ialgoIt != container.unsafe_end(); ++ialgoIt)
300  node->attachAlgorithm(*ialgoIt);
301  }
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
StatusCode concurrency::ExecutionFlowGraph::buildAugmentedDataDependenciesRealm ( )

Build data dependency realm WITH data object nodes participating.

Definition at line 511 of file ExecutionFlowGraph.cpp.

511  {
512 
513  StatusCode global_sc(StatusCode::SUCCESS);
514 
515  // Create the DataObjects (DO) realm (represented by DataNodes in the graph), connected to DO producers (AlgorithmNodes)
516  for (auto algo : m_algoNameToAlgoNodeMap) {
517 
518  StatusCode sc;
519  auto& outCollection = *m_algoNameToAlgoOutputsMap[algo.first];
520  for (auto outputTag : outCollection) {
521  if (outCollection[outputTag].isValid()) {
522  auto& output = outCollection[outputTag].dataProductName();
523  sc = addDataNode(output);
524  if (!sc.isSuccess()) {
525  error() << "Extra producer (" << algo.first << ") for DataObject @ " << output
526  << " has been detected: this is not allowed." << endmsg;
527  global_sc = StatusCode::FAILURE;
528  }
529  auto dataNode = getDataNode(output);
530  dataNode->addProducerNode(algo.second);
531  algo.second->addOutputDataNode(dataNode);
532  }
533  }
534  }
535 
536  // Connect previously created DO realm to DO consumers (AlgorithmNodes)
537  for (auto algo : m_algoNameToAlgoNodeMap) {
538  auto& inCollection = *m_algoNameToAlgoInputsMap[algo.first];
539  for (auto inputTag : inCollection) {
540  if (inCollection[inputTag].isValid()) {
541  DataNode* dataNode = nullptr;
542  auto& primaryPath = inCollection[inputTag].dataProductName();
543  auto itP = m_dataPathToDataNodeMap.find(primaryPath);
544  if (itP != m_dataPathToDataNodeMap.end()) {
545  dataNode = getDataNode(primaryPath);
546  if (!inCollection[inputTag].alternativeDataProductNames().empty())
547  warning() << "Dropping all alternative data dependencies in the graph, but '" << primaryPath
548  << "', for algorithm " << algo.first << endmsg;
549  } else {
550  for (auto alterPath : inCollection[inputTag].alternativeDataProductNames()) {
551  auto itAP = m_dataPathToDataNodeMap.find(alterPath);
552  if (itAP != m_dataPathToDataNodeMap.end()) {
553  dataNode = getDataNode(alterPath);
554  warning() << "Dropping all alternative data dependencies in the graph, but '" << alterPath
555  << "', for algorithm " << algo.first << endmsg;
556  break;
557  }
558  }
559  }
560  if (dataNode) {
561  dataNode->addConsumerNode(algo.second);
562  algo.second->addInputDataNode(dataNode);
563  }
564 
565  }
566  }
567  }
568 
569  return global_sc;
570  }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
DataNodesMap m_dataPathToDataNodeMap
Index: map of data path to DataNode.
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm's name to AlgorithmNode.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
StatusCode addDataNode(const std::string &dataPath)
Add DataNode that represents DataObject.
DataNode * getDataNode(const std::string &dataPath) const
Get DataNode by DataObject path using graph index.
AlgoInputsMap m_algoNameToAlgoInputsMap
Indexes: maps of algorithm's name to algorithm's inputs/outputs.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
StatusCode concurrency::ExecutionFlowGraph::buildDataDependenciesRealm ( )

Build data dependency realm WITHOUT data object nodes: just interconnect algorithm nodes directly.

Definition at line 458 of file ExecutionFlowGraph.cpp.

458  {
459 
460  StatusCode global_sc(StatusCode::SUCCESS);
461 
462  for (auto algo : m_algoNameToAlgoNodeMap) {
463 
464  auto targetNode = m_algoNameToAlgoNodeMap[algo.first];
465 
466  // Find producers for all the inputs of the target node
467  auto& targetInCollection = *m_algoNameToAlgoInputsMap[algo.first];
468  for (auto inputTag : targetInCollection) {
469  auto& input2Match = targetInCollection[inputTag].dataProductName();
470  for (auto producer : m_algoNameToAlgoOutputsMap) {
471  auto& outputs = *m_algoNameToAlgoOutputsMap[producer.first];
472  for (auto outputTag : outputs) {
473  if (outputs[outputTag].isValid() && outputs[outputTag].dataProductName() == input2Match) {
474  auto& known_producers = targetNode->getSupplierNodes();
475  auto valid_producer = m_algoNameToAlgoNodeMap[producer.first];
476  auto& known_consumers = valid_producer->getConsumerNodes();
477  if (std::find(known_producers.begin(),known_producers.end(),valid_producer) == known_producers.end())
478  targetNode->addSupplierNode(valid_producer);
479  if (std::find(known_consumers.begin(),known_consumers.end(),targetNode) == known_consumers.end())
480  valid_producer->addConsumerNode(targetNode);
481  }
482  }
483  }
484  }
485 
486  // Find consumers for all the outputs of the target node
487  auto& targetOutCollection = *m_algoNameToAlgoOutputsMap[algo.first];
488  for (auto outputTag : targetOutCollection) {
489  auto& output2Match = targetOutCollection[outputTag].dataProductName();
490  for (auto consumer : m_algoNameToAlgoInputsMap) {
491  auto& inputs = *m_algoNameToAlgoInputsMap[consumer.first];
492  for (auto inputTag : inputs) {
493  if (inputs[inputTag].isValid() && inputs[inputTag].dataProductName() == output2Match) {
494  auto& known_consumers = targetNode->getConsumerNodes();
495  auto valid_consumer = m_algoNameToAlgoNodeMap[consumer.first];
496  auto& known_producers = valid_consumer->getSupplierNodes();
497  if (std::find(known_producers.begin(),known_producers.end(),targetNode) == known_producers.end())
498  valid_consumer->addSupplierNode(targetNode);
499  if (std::find(known_consumers.begin(),known_consumers.end(),valid_consumer) == known_consumers.end())
500  targetNode->addConsumerNode(valid_consumer);
501  }
502  }
503  }
504  }
505 
506  }
507  return global_sc;
508  }
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm's name to AlgorithmNode.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
AlgoInputsMap m_algoNameToAlgoInputsMap
Indexes: maps of algorithm's name to algorithm's inputs/outputs.
void concurrency::ExecutionFlowGraph::dumpDataFlow ( ) const

Print out all data origins and destinations, as reflected in the EF graph.

Definition at line 727 of file ExecutionFlowGraph.cpp.

727  {
728 
729  debug() << "====================================" << endmsg;
730  debug() << "Data origins and destinations:" << endmsg;
731  debug() << "====================================" << endmsg;
732 
733  for (auto& pair : m_dataPathToDataNodeMap) {
734 
735  for (auto algoNode : pair.second->getProducers())
736  debug() << " " << algoNode->getNodeName() << endmsg;
737 
738  debug() << " V" << endmsg;
739  debug() << " o " << pair.first << endmsg;
740  debug() << " V" << endmsg;
741 
742  for (auto algoNode : pair.second->getConsumers())
743  debug() << " " << algoNode->getNodeName() << endmsg;
744 
745  debug() << " ====================================" << endmsg;
746  }
747  }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
DataNodesMap m_dataPathToDataNodeMap
Index: map of data path to DataNode.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
void concurrency::ExecutionFlowGraph::dumpExecutionPlan ( )

dump to file encountered execution plan

Definition at line 749 of file ExecutionFlowGraph.cpp.

749  {
750  std::ofstream myfile;
751  myfile.open("ExecutionPlan.graphml", std::ios::app);
752 
753  boost::dynamic_properties dp;
754  dp.property("name", boost::get(&boost::AlgoNodeStruct::m_name, m_ExecPlan));
755  dp.property("index", boost::get(&boost::AlgoNodeStruct::m_index, m_ExecPlan));
756  dp.property("rank", boost::get(&boost::AlgoNodeStruct::m_rank, m_ExecPlan));
757  dp.property("runtime", boost::get(&boost::AlgoNodeStruct::m_runtime, m_ExecPlan));
758 
759  boost::write_graphml(myfile, m_ExecPlan, dp);
760 
761  myfile.close();
762  }
boost::ExecPlan m_ExecPlan
temporary items to experiment with execution planning
AlgorithmNode * concurrency::ExecutionFlowGraph::getAlgorithmNode ( const std::string &  algoName) const

Get the AlgorithmNode from by algorithm name using graph index.

Definition at line 606 of file ExecutionFlowGraph.cpp.

606  {
607 
608  return m_algoNameToAlgoNodeMap.at(algoName);
609  }
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm's name to AlgorithmNode.
AlgsExecutionStates& concurrency::ExecutionFlowGraph::getAlgoStates ( const int &  slotNum) const
inline

Definition at line 336 of file ExecutionFlowGraph.h.

336 {return m_eventSlots->at(slotNum).algsStates;};
std::vector< EventSlot > * m_eventSlots
unsigned int concurrency::ExecutionFlowGraph::getControlFlowNodeCounter ( ) const
inline

Get total number of graph nodes.

Definition at line 311 of file ExecutionFlowGraph.h.

311 {return m_nodeCounter;}
unsigned int m_nodeCounter
Total number of nodes in the graph.
const std::vector< AlgorithmNode * > concurrency::ExecutionFlowGraph::getDataIndependentNodes ( ) const

Definition at line 710 of file ExecutionFlowGraph.cpp.

710  {
711 
712  std::vector<AlgorithmNode*> result;
713 
714  for (auto node : m_algoNameToAlgoInputsMap) {
715  const DataObjectDescriptorCollection& collection = *(node.second);
716  for (auto tag : collection)
717  if (collection[tag].isValid()) {
718  result.push_back(getAlgorithmNode(node.first));
719  break;
720  }
721  }
722 
723  return result;
724  }
AlgoInputsMap m_algoNameToAlgoInputsMap
Indexes: maps of algorithm's name to algorithm's inputs/outputs.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
DataNode * concurrency::ExecutionFlowGraph::getDataNode ( const std::string &  dataPath) const

Get DataNode by DataObject path using graph index.

Definition at line 633 of file ExecutionFlowGraph.cpp.

633  {
634 
635  return m_dataPathToDataNodeMap.at(dataPath);
636  }
DataNodesMap m_dataPathToDataNodeMap
Index: map of data path to DataNode.
const std::chrono::system_clock::time_point concurrency::ExecutionFlowGraph::getInitTime ( ) const
inline

Definition at line 334 of file ExecutionFlowGraph.h.

334 {return m_initTime;};
const std::chrono::system_clock::time_point m_initTime
std::vector<int>& concurrency::ExecutionFlowGraph::getNodeDecisions ( const int &  slotNum) const
inline

Definition at line 338 of file ExecutionFlowGraph.h.

338 {return m_eventSlots->at(slotNum).controlFlowState;}
std::vector< EventSlot > * m_eventSlots
StatusCode concurrency::ExecutionFlowGraph::initialize ( const std::unordered_map< std::string, unsigned int > &  algname_index_map)

Initialize graph.

Definition at line 400 of file ExecutionFlowGraph.cpp.

400  {
401 
402  m_headNode->initialize(algname_index_map);
403  //StatusCode sc = buildDataDependenciesRealm();
405 
406  if (!sc.isSuccess())
407  error() << "Could not build the data dependency realm." << endmsg;
408 
409  return sc;
410  }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode buildAugmentedDataDependenciesRealm()
Build data dependency realm WITH data object nodes participating.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
virtual void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize.
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
StatusCode concurrency::ExecutionFlowGraph::initialize ( const std::unordered_map< std::string, unsigned int > &  algname_index_map,
std::vector< EventSlot > &  eventSlots 
)

Definition at line 413 of file ExecutionFlowGraph.cpp.

414  {
415 
416  m_eventSlots = &eventSlots;
417  m_headNode->initialize(algname_index_map);
418  //StatusCode sc = buildDataDependenciesRealm();
420 
421  if (!sc.isSuccess())
422  error() << "Could not build the data dependency realm." << endmsg;
423 
424  if (msgLevel(MSG::DEBUG)) {
425  dumpDataFlow();
426  }
427 
428  return sc;
429  }
void dumpDataFlow() const
Print out all data origins and destinations, as reflected in the EF graph.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode buildAugmentedDataDependenciesRealm()
Build data dependency realm WITH data object nodes participating.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
virtual void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize.
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::vector< EventSlot > * m_eventSlots
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
const std::string& concurrency::ExecutionFlowGraph::name ( ) const
inlineoverride

Retrieve name of the service.

Definition at line 330 of file ExecutionFlowGraph.h.

330 {return m_name;}
void concurrency::ExecutionFlowGraph::printState ( std::stringstream &  output,
AlgsExecutionStates states,
const std::vector< int > &  node_decisions,
const unsigned int &  recursionLevel 
) const
inline

Print a string representing the control flow state.

Definition at line 323 of file ExecutionFlowGraph.h.

326  {m_headNode->printState(output,states,node_decisions,recursionLevel);};
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
virtual void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print a string representing the control flow state.
void concurrency::ExecutionFlowGraph::rankAlgorithms ( IGraphVisitor ranker) const

Rank Algorithm nodes by the number of data outputs.

Definition at line 700 of file ExecutionFlowGraph.cpp.

700  {
701 
702  info() << "Starting ranking by data outputs .. " << endmsg;
703  for (auto& pair : m_algoNameToAlgoNodeMap) {
704  pair.second->accept(ranker);
705  debug() << " Rank of " << pair.first << ": " << pair.second->getRank() << endmsg;
706  }
707  }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm's name to AlgorithmNode.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
void concurrency::ExecutionFlowGraph::registerIODataObjects ( const Algorithm algo)

Register algorithm in the Data Dependency index.

Definition at line 432 of file ExecutionFlowGraph.cpp.

432  {
433 
434  const std::string& algoName = algo->name();
435 
436  const DataObjectDescriptorCollection& inputDOCollection = algo->inputDataObjects();
437  m_algoNameToAlgoInputsMap[algoName] = &inputDOCollection;
438 
439  debug() << "Inputs of " << algoName << ": ";
440  for (auto tag : inputDOCollection) {
441  if (inputDOCollection[tag].isValid())
442  debug() << inputDOCollection[tag].dataProductName() << " | ";
443  }
444  debug() << endmsg;
445 
446  const DataObjectDescriptorCollection& outputDOCollection = algo->outputDataObjects();
447  m_algoNameToAlgoOutputsMap[algoName] = &outputDOCollection;
448 
449  debug() << "Outputs of " << algoName << ": ";
450  for (auto tag : outputDOCollection) {
451  if (outputDOCollection[tag].isValid())
452  debug() << outputDOCollection[tag].dataProductName() << " | ";
453  }
454  debug() << endmsg;
455  }
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
const DataObjectDescriptorCollection & outputDataObjects() const override
Definition: Algorithm.h:700
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:919
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
AlgoInputsMap m_algoNameToAlgoInputsMap
Indexes: maps of algorithm's name to algorithm's inputs/outputs.
__attribute__((deprecated)) const std const DataObjectDescriptorCollection & inputDataObjects() const override
Return the handles declared in the algorithm.
Definition: Algorithm.h:697
SmartIF<ISvcLocator>& concurrency::ExecutionFlowGraph::serviceLocator ( ) const
inlineoverride

Retrieve pointer to service locator.

Definition at line 332 of file ExecutionFlowGraph.h.

332 {return m_svcLocator;}
SmartIF< ISvcLocator > m_svcLocator
Service locator (needed to access the MessageSvc)
void concurrency::ExecutionFlowGraph::updateDecision ( const std::string &  algo_name,
const int &  slotNum,
AlgsExecutionStates states,
std::vector< int > &  node_decisions 
) const

A method to update algorithm node decision, and propagate it upwards.

Definition at line 691 of file ExecutionFlowGraph.cpp.

694  {
695  //debug() << "(UPDATING)Setting decision of algorithm " << algo_name << " and propagating it upwards.." << endmsg;
696  getAlgorithmNode(algo_name)->updateDecision(slotNum, algo_states, node_decisions);
697  }
virtual void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const
XXX: CF tests.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
void concurrency::ExecutionFlowGraph::updateEventState ( AlgsExecutionStates states,
std::vector< int > &  node_decisions 
) const

XXX CF tests. Is needed for older CF implementation.

Definition at line 685 of file ExecutionFlowGraph.cpp.

686  {
687  m_headNode->updateState(algo_states, node_decisions);
688  }
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
virtual int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const
Method to set algos to CONTROLREADY, if possible.

Member Data Documentation

friend concurrency::ExecutionFlowGraph::ExecutionFlowManager
private

Definition at line 271 of file ExecutionFlowGraph.h.

AlgoInputsMap concurrency::ExecutionFlowGraph::m_algoNameToAlgoInputsMap
private

Indexes: maps of algorithm's name to algorithm's inputs/outputs.

Definition at line 356 of file ExecutionFlowGraph.h.

AlgoNodesMap concurrency::ExecutionFlowGraph::m_algoNameToAlgoNodeMap
private

Index: map of algorithm's name to AlgorithmNode.

Definition at line 350 of file ExecutionFlowGraph.h.

AlgoOutputsMap concurrency::ExecutionFlowGraph::m_algoNameToAlgoOutputsMap
private

Definition at line 357 of file ExecutionFlowGraph.h.

DataNodesMap concurrency::ExecutionFlowGraph::m_dataPathToDataNodeMap
private

Index: map of data path to DataNode.

Definition at line 354 of file ExecutionFlowGraph.h.

DecisionHubsMap concurrency::ExecutionFlowGraph::m_decisionNameToDecisionHubMap
private

Index: map of decision's name to DecisionHub.

Definition at line 352 of file ExecutionFlowGraph.h.

std::vector<EventSlot>* concurrency::ExecutionFlowGraph::m_eventSlots
private

Definition at line 365 of file ExecutionFlowGraph.h.

std::map<std::string,boost::AlgoVertex> concurrency::ExecutionFlowGraph::m_exec_plan_map
private

Definition at line 368 of file ExecutionFlowGraph.h.

boost::ExecPlan concurrency::ExecutionFlowGraph::m_ExecPlan
private

temporary items to experiment with execution planning

Definition at line 367 of file ExecutionFlowGraph.h.

DecisionNode* concurrency::ExecutionFlowGraph::m_headNode
private

the head node of the control flow graph; may want to have multiple ones once supporting trigger paths

Definition at line 348 of file ExecutionFlowGraph.h.

const std::chrono::system_clock::time_point concurrency::ExecutionFlowGraph::m_initTime
private

Definition at line 363 of file ExecutionFlowGraph.h.

const std::string concurrency::ExecutionFlowGraph::m_name
private

Definition at line 362 of file ExecutionFlowGraph.h.

unsigned int concurrency::ExecutionFlowGraph::m_nodeCounter
private

Total number of nodes in the graph.

Definition at line 359 of file ExecutionFlowGraph.h.

SmartIF<ISvcLocator> concurrency::ExecutionFlowGraph::m_svcLocator
mutableprivate

Service locator (needed to access the MessageSvc)

Definition at line 361 of file ExecutionFlowGraph.h.


The documentation for this class was generated from the following files: