concurrency::ExecutionFlowGraph Class Reference

#include <src/ExecutionFlowGraph.h>

Inheritance diagram for concurrency::ExecutionFlowGraph:
Collaboration diagram for concurrency::ExecutionFlowGraph:

Public Member Functions

 ExecutionFlowGraph (const std::string &name, SmartIF< ISvcLocator > svc)
 Constructor. More...
 
 ~ExecutionFlowGraph () override
 Destructor. More...
 
StatusCode initialize (const std::unordered_map< std::string, unsigned int > &algname_index_map)
 Initialize graph. More...
 
StatusCode initialize (const std::unordered_map< std::string, unsigned int > &algname_index_map, std::vector< EventSlot > &eventSlots)
 
void registerIODataObjects (const Algorithm *algo)
 Register algorithm in the Data Dependency index. More...
 
StatusCode buildDataDependenciesRealm ()
 Build data dependency realm WITHOUT data object nodes: just interconnect algorithm nodes directly. More...
 
StatusCode buildAugmentedDataDependenciesRealm ()
 Build data dependency realm WITH data object nodes participating. More...
 
void addHeadNode (const std::string &headName, bool modeOR, bool allPass, bool isLazy)
 Add a node, which has no parents. More...
 
StatusCode addAlgorithmNode (Algorithm *daughterAlgo, const std::string &parentName, bool inverted, bool allPass)
 Add algorithm node. More...
 
template<class T >
void attachAlgorithmsToNodes (const std::string &algo_name, const T &container)
 Attach pointers to real Algorithms (and their clones) to Algorithm nodes of the graph. More...
 
AlgorithmNodegetAlgorithmNode (const std::string &algoName) const
 Get the AlgorithmNode from by algorithm name using graph index. More...
 
StatusCode addDataNode (const DataObjID &dataPath)
 Add DataNode that represents DataObject. More...
 
DataNodegetDataNode (const DataObjID &dataPath) const
 Get DataNode by DataObject path using graph index. More...
 
StatusCode addDecisionHubNode (Algorithm *daughterAlgo, const std::string &parentName, bool modeOR, bool allPass, bool isLazy)
 Add a node, which aggregates decisions of direct daughter nodes. More...
 
unsigned int getControlFlowNodeCounter () const
 Get total number of graph nodes. More...
 
void updateEventState (AlgsExecutionStates &states, std::vector< int > &node_decisions) const
 XXX CF tests. Is needed for older CF implementation. More...
 
void updateDecision (const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
 A method to update algorithm node decision, and propagate it upwards. More...
 
void rankAlgorithms (IGraphVisitor &ranker) const
 Rank Algorithm nodes by the number of data outputs. More...
 
void printState (std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
 Print a string representing the control flow state. More...
 
const std::vector< AlgorithmNode * > getDataIndependentNodes () const
 
const std::stringname () const override
 Retrieve name of the service. More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator. More...
 
const std::chrono::system_clock::time_point getInitTime () const
 
AlgsExecutionStatesgetAlgoStates (const int &slotNum) const
 
std::vector< int > & getNodeDecisions (const int &slotNum) const
 
std::string dumpDataFlow () const
 Print out all data origins and destinations, as reflected in the EF graph. More...
 
void dumpExecutionPlan ()
 dump to file encountered execution plan More...
 
void addEdgeToExecutionPlan (const AlgorithmNode *u, const AlgorithmNode *v)
 set cause-effect connection between two algorithms in the execution plan More...
 
- Public Member Functions inherited from CommonMessagingBase
virtual ~CommonMessagingBase ()=default
 Virtual destructor. More...
 
SmartIF< IMessageSvc > & msgSvc () const
 The standard message service. More...
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream. More...
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts. More...
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS) More...
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL) More...
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING) More...
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG) More...
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE) More...
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MSG::Level msgLevel () const
 get the output level from the embedded MsgStream More...
 
MSG::Level outputLevel () const __attribute__((deprecated))
 Backward compatibility function for getting the output level. More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 

Private Attributes

friend ExecutionFlowManager
 
DecisionNodem_headNode
 the head node of the control flow graph; may want to have multiple ones once supporting trigger paths More...
 
AlgoNodesMap m_algoNameToAlgoNodeMap
 Index: map of algorithm's name to AlgorithmNode. More...
 
DecisionHubsMap m_decisionNameToDecisionHubMap
 Index: map of decision's name to DecisionHub. More...
 
DataNodesMap m_dataPathToDataNodeMap
 Index: map of data path to DataNode. More...
 
AlgoInputsMap m_algoNameToAlgoInputsMap
 Indexes: maps of algorithm's name to algorithm's inputs/outputs. More...
 
AlgoOutputsMap m_algoNameToAlgoOutputsMap
 
unsigned int m_nodeCounter
 Total number of nodes in the graph. More...
 
SmartIF< ISvcLocatorm_svcLocator
 Service locator (needed to access the MessageSvc) More...
 
const std::string m_name
 
const std::chrono::system_clock::time_point m_initTime
 
std::vector< EventSlot > * m_eventSlots
 
boost::ExecPlan m_ExecPlan
 temporary items to experiment with execution planning More...
 
std::map< std::string, boost::AlgoVertexm_exec_plan_map
 

Additional Inherited Members

- Public Types inherited from CommonMessaging< IExecutionFlowGraph >
using base_class = CommonMessaging
 
- Protected Member Functions inherited from CommonMessaging< IExecutionFlowGraph >
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 

Detailed Description

Definition at line 271 of file ExecutionFlowGraph.h.

Constructor & Destructor Documentation

concurrency::ExecutionFlowGraph::ExecutionFlowGraph ( const std::string name,
SmartIF< ISvcLocator svc 
)
inline

Constructor.

Definition at line 275 of file ExecutionFlowGraph.h.

275  :
277  m_eventSlots(nullptr) {}
unsigned int m_nodeCounter
Total number of nodes in the graph.
SmartIF< ISvcLocator > m_svcLocator
Service locator (needed to access the MessageSvc)
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
const std::chrono::system_clock::time_point m_initTime
std::vector< EventSlot > * m_eventSlots
concurrency::ExecutionFlowGraph::~ExecutionFlowGraph ( )
inlineoverride

Destructor.

Definition at line 279 of file ExecutionFlowGraph.h.

279  {
280  if (m_headNode != 0) delete m_headNode;
281  }
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...

Member Function Documentation

StatusCode concurrency::ExecutionFlowGraph::addAlgorithmNode ( Algorithm daughterAlgo,
const std::string parentName,
bool  inverted,
bool  allPass 
)

Add algorithm node.

Definition at line 581 of file ExecutionFlowGraph.cpp.

581  {
582 
584 
585  auto& algoName = algo->name();
586 
587  auto itP = m_decisionNameToDecisionHubMap.find(parentName);
588  concurrency::DecisionNode* parentNode;
589  if ( itP != m_decisionNameToDecisionHubMap.end()) {
590  parentNode = itP->second;
591  auto itA = m_algoNameToAlgoNodeMap.find(algoName);
592  concurrency::AlgorithmNode* algoNode;
593  if ( itA != m_algoNameToAlgoNodeMap.end()) {
594  algoNode = itA->second;
595  } else {
596  algoNode = new concurrency::AlgorithmNode(*this,m_nodeCounter,algoName,inverted,allPass);
597  ++m_nodeCounter;
598  m_algoNameToAlgoNodeMap[algoName] = algoNode;
599  debug() << "AlgoNode " << algoName << " added @ " << algoNode << endmsg;
600  registerIODataObjects(algo);
601  }
602 
603  parentNode->addDaughterNode(algoNode);
604  algoNode->addParentNode(parentNode);
605  } else {
606  sc = StatusCode::FAILURE;
607  error() << "DecisionHubNode " << parentName << ", meant to be used as parent, is not registered in the EFG." << endmsg;
608  }
609 
610  return sc;
611  }
unsigned int m_nodeCounter
Total number of nodes in the graph.
void addDaughterNode(ControlFlowNode *node)
Add a daughter node.
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm's name to AlgorithmNode.
void registerIODataObjects(const Algorithm *algo)
Register algorithm in the Data Dependency index.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
DecisionHubsMap m_decisionNameToDecisionHubMap
Index: map of decision's name to DecisionHub.
T find(T...args)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode concurrency::ExecutionFlowGraph::addDataNode ( const DataObjID dataPath)

Add DataNode that represents DataObject.

Definition at line 620 of file ExecutionFlowGraph.cpp.

620  {
621 
622  StatusCode sc;
623 
624  auto itD = m_dataPathToDataNodeMap.find(dataPath);
625  concurrency::DataNode* dataNode;
626  if ( itD != m_dataPathToDataNodeMap.end()) {
627  dataNode = itD->second;
628  //sc = StatusCode::FAILURE;
629  sc = StatusCode::SUCCESS;
630  } else {
631  dataNode = new concurrency::DataNode(*this,dataPath);
632  m_dataPathToDataNodeMap[dataPath] = dataNode;
633  debug() << " DataNode for " << dataPath << " added @ " << dataNode << endmsg;
634  sc = StatusCode::SUCCESS;
635  }
636 
637  return sc;
638  }
DataNodesMap m_dataPathToDataNodeMap
Index: map of data path to DataNode.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T find(T...args)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode concurrency::ExecutionFlowGraph::addDecisionHubNode ( Algorithm daughterAlgo,
const std::string parentName,
bool  modeOR,
bool  allPass,
bool  isLazy 
)

Add a node, which aggregates decisions of direct daughter nodes.

Definition at line 647 of file ExecutionFlowGraph.cpp.

647  {
648 
650 
651  auto& decisionHubName = decisionHubAlgo->name();
652 
653  auto itP = m_decisionNameToDecisionHubMap.find(parentName);
654  concurrency::DecisionNode* parentNode;
655  if ( itP != m_decisionNameToDecisionHubMap.end()) {
656  parentNode = itP->second;
657  auto itA = m_decisionNameToDecisionHubMap.find(decisionHubName);
658  concurrency::DecisionNode* decisionHubNode;
659  if ( itA != m_decisionNameToDecisionHubMap.end()) {
660  decisionHubNode = itA->second;
661  } else {
662  decisionHubNode = new concurrency::DecisionNode(*this,m_nodeCounter,decisionHubName,modeOR,allPass,isLazy);
663  ++m_nodeCounter;
664  m_decisionNameToDecisionHubMap[decisionHubName] = decisionHubNode;
665  debug() << "DecisionHubNode " << decisionHubName << " added @ " << decisionHubNode << endmsg;
666  }
667 
668  parentNode->addDaughterNode(decisionHubNode);
669  decisionHubNode->addParentNode(parentNode);
670  } else {
671  sc = StatusCode::FAILURE;
672  error() << "DecisionHubNode " << parentName << ", meant to be used as parent, is not registered in the EFG." << endmsg;
673  }
674 
675  return sc;
676  }
unsigned int m_nodeCounter
Total number of nodes in the graph.
void addDaughterNode(ControlFlowNode *node)
Add a daughter node.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
DecisionHubsMap m_decisionNameToDecisionHubMap
Index: map of decision's name to DecisionHub.
T find(T...args)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void concurrency::ExecutionFlowGraph::addEdgeToExecutionPlan ( const AlgorithmNode u,
const AlgorithmNode v 
)

set cause-effect connection between two algorithms in the execution plan

Definition at line 780 of file ExecutionFlowGraph.cpp.

780  {
781 
782  boost::AlgoVertex source;
783  float runtime(0.);
784  if (u == nullptr) {
785  auto itT = m_exec_plan_map.find("ENTRY");
786  if ( itT != m_exec_plan_map.end()) {
787  source = itT->second;
788  } else {
789  source = boost::add_vertex(boost::AlgoNodeStruct("ENTRY",-999,-999, 0),
790  m_ExecPlan);
791  m_exec_plan_map["ENTRY"] = source;
792  }
793  } else {
794  auto itS = m_exec_plan_map.find(u->getNodeName());
795  if ( itS != m_exec_plan_map.end()) {
796  source = itS->second;
797  } else {
798  auto alg = dynamic_cast<Algorithm*> ( u->getAlgorithmRepresentatives()[0] );
799  if (alg == 0) {
800  fatal() << "could not convert IAlgorithm to Algorithm!" << endmsg;
801  } else {
802  try {
803  const Property& p = alg->getProperty( "AvgRuntime" );
804  runtime = std::stof( p.toString() );
805  } catch(...) {
806  debug() << "no AvgRuntime for " << alg->name() << endmsg;
807  runtime = 1.;
808  }
809  }
810  source = boost::add_vertex(boost::AlgoNodeStruct(u->getNodeName(),
811  u->getAlgoIndex(),
812  u->getRank(),runtime),
813  m_ExecPlan);
814  m_exec_plan_map[u->getNodeName()] = source;
815  }
816  }
817 
818  boost::AlgoVertex target;
819  auto itP = m_exec_plan_map.find(v->getNodeName());
820  if ( itP != m_exec_plan_map.end()) {
821  target = itP->second;
822  } else {
823  auto alg = dynamic_cast<Algorithm*> ( v->getAlgorithmRepresentatives()[0] );
824  if (alg == 0) {
825  fatal() << "could not convert IAlgorithm to Algorithm!" << endmsg;
826  } else {
827  try {
828  const Property& p = alg->getProperty( "AvgRuntime" );
829  runtime = std::stof( p.toString() );
830  } catch(...) {
831  debug() << "no AvgRuntime for " << alg->name() << endmsg;
832  runtime = 1.;
833  }
834  }
835  target = boost::add_vertex(boost::AlgoNodeStruct(v->getNodeName(),
836  v->getAlgoIndex(),
837  v->getRank(),runtime),
838  m_ExecPlan);
839  m_exec_plan_map[v->getNodeName()] = target;
840  }
841 
842  debug() << "Edge added to execution plan" << endmsg;
843  boost::add_edge(source, target, m_ExecPlan);
844  }
boost::ExecPlan m_ExecPlan
temporary items to experiment with execution planning
virtual std::string toString() const =0
value -> string
T stof(T...args)
T end(T...args)
std::map< std::string, boost::AlgoVertex > m_exec_plan_map
graph_traits< ExecPlan >::vertex_descriptor AlgoVertex
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:74
T find(T...args)
Property base class allowing Property* collections to be "homogeneous".
Definition: Property.h:38
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void concurrency::ExecutionFlowGraph::addHeadNode ( const std::string headName,
bool  modeOR,
bool  allPass,
bool  isLazy 
)

Add a node, which has no parents.

Definition at line 679 of file ExecutionFlowGraph.cpp.

679  {
680 
681  auto itH = m_decisionNameToDecisionHubMap.find(headName);
682  if ( itH != m_decisionNameToDecisionHubMap.end()) {
683  m_headNode = itH->second;
684  } else {
685  m_headNode = new concurrency::DecisionNode(*this,m_nodeCounter,headName,modeOR,allPass,isLazy);
686  ++m_nodeCounter;
688  }
689 
690  }
unsigned int m_nodeCounter
Total number of nodes in the graph.
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
DecisionHubsMap m_decisionNameToDecisionHubMap
Index: map of decision's name to DecisionHub.
T find(T...args)
template<class T >
void concurrency::ExecutionFlowGraph::attachAlgorithmsToNodes ( const std::string algo_name,
const T &  container 
)
inline

Attach pointers to real Algorithms (and their clones) to Algorithm nodes of the graph.

Definition at line 298 of file ExecutionFlowGraph.h.

298  {
299  auto node = getAlgorithmNode(algo_name);
300  for (auto ialgoIt = container.unsafe_begin(); ialgoIt != container.unsafe_end(); ++ialgoIt)
301  node->attachAlgorithm(*ialgoIt);
302  }
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
StatusCode concurrency::ExecutionFlowGraph::buildAugmentedDataDependenciesRealm ( )

Build data dependency realm WITH data object nodes participating.

Definition at line 517 of file ExecutionFlowGraph.cpp.

517  {
518 
519  StatusCode global_sc(StatusCode::SUCCESS);
520 
521  // Create the DataObjects (DO) realm (represented by DataNodes in the graph), connected to DO producers (AlgorithmNodes)
522  for (auto algo : m_algoNameToAlgoNodeMap) {
523 
524  StatusCode sc;
525  auto& outCollection = m_algoNameToAlgoOutputsMap[algo.first];
526  for (auto outputTag : outCollection) {
527  // if (outCollection[outputTag].isValid()) {
528  // auto& output = outCollection[outputTag].dataProductName();
529  sc = addDataNode(outputTag);
530  if (!sc.isSuccess()) {
531  error() << "Extra producer (" << algo.first << ") for DataObject @ "
532  << outputTag
533  << " has been detected: this is not allowed." << endmsg;
534  global_sc = StatusCode::FAILURE;
535  }
536  auto dataNode = getDataNode(outputTag);
537  dataNode->addProducerNode(algo.second);
538  algo.second->addOutputDataNode(dataNode);
539  // }
540  }
541  }
542 
543  // Connect previously created DO realm to DO consumers (AlgorithmNodes)
544  for (auto algo : m_algoNameToAlgoNodeMap) {
545  auto& inCollection = m_algoNameToAlgoInputsMap[algo.first];
546  for (auto inputTag : inCollection) {
547  // if (inCollection[inputTag].isValid()) {
548  DataNode* dataNode = nullptr;
549  // auto& primaryPath = inCollection[inputTag].dataProductName();
550  auto primaryPath = inputTag;
551  auto itP = m_dataPathToDataNodeMap.find(primaryPath);
552  if (itP != m_dataPathToDataNodeMap.end()) {
553  dataNode = getDataNode(primaryPath);
554  // if (!inCollection[inputTag].alternativeDataProductNames().empty())
555  // warning() << "Dropping all alternative data dependencies in the graph, but '" << primaryPath
556  // << "', for algorithm " << algo.first << endmsg;
557  // } else {
558  // for (auto alterPath : inCollection[inputTag].alternativeDataProductNames()) {
559  // auto itAP = m_dataPathToDataNodeMap.find(alterPath);
560  // if (itAP != m_dataPathToDataNodeMap.end()) {
561  // dataNode = getDataNode(alterPath);
562  // warning() << "Dropping all alternative data dependencies in the graph, but '" << alterPath
563  // << "', for algorithm " << algo.first << endmsg;
564  // break;
565  // }
566  // }
567  }
568  if (dataNode) {
569  dataNode->addConsumerNode(algo.second);
570  algo.second->addInputDataNode(dataNode);
571  }
572 
573  // }
574  }
575  }
576 
577  return global_sc;
578  }
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
DataNodesMap m_dataPathToDataNodeMap
Index: map of data path to DataNode.
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm's name to AlgorithmNode.
DataNode * getDataNode(const DataObjID &dataPath) const
Get DataNode by DataObject path using graph index.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T find(T...args)
AlgoInputsMap m_algoNameToAlgoInputsMap
Indexes: maps of algorithm's name to algorithm's inputs/outputs.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode addDataNode(const DataObjID &dataPath)
Add DataNode that represents DataObject.
StatusCode concurrency::ExecutionFlowGraph::buildDataDependenciesRealm ( )

Build data dependency realm WITHOUT data object nodes: just interconnect algorithm nodes directly.

Definition at line 462 of file ExecutionFlowGraph.cpp.

462  {
463 
464  StatusCode global_sc(StatusCode::SUCCESS);
465 
466  for (auto algo : m_algoNameToAlgoNodeMap) {
467 
468  auto targetNode = m_algoNameToAlgoNodeMap[algo.first];
469 
470  // Find producers for all the inputs of the target node
471  auto& targetInCollection = m_algoNameToAlgoInputsMap[algo.first];
472  for (auto inputTag : targetInCollection) {
473  // auto& input2Match = targetInCollection[inputTag].dataProductName();
474  for (auto producer : m_algoNameToAlgoOutputsMap) {
475  auto& outputs = m_algoNameToAlgoOutputsMap[producer.first];
476  for (auto outputTag : outputs) {
477  // if (outputs[outputTag].isValid() && outputs[outputTag].dataProductName() == input2Match) {
478  if (inputTag == outputTag) {
479  auto& known_producers = targetNode->getSupplierNodes();
480  auto valid_producer = m_algoNameToAlgoNodeMap[producer.first];
481  auto& known_consumers = valid_producer->getConsumerNodes();
482  if (std::find(known_producers.begin(),known_producers.end(),valid_producer) == known_producers.end())
483  targetNode->addSupplierNode(valid_producer);
484  if (std::find(known_consumers.begin(),known_consumers.end(),targetNode) == known_consumers.end())
485  valid_producer->addConsumerNode(targetNode);
486  }
487  }
488  }
489  }
490 
491  // Find consumers for all the outputs of the target node
492  auto& targetOutCollection = m_algoNameToAlgoOutputsMap[algo.first];
493  for (auto outputTag : targetOutCollection) {
494  // auto& output2Match = targetOutCollection[outputTag].dataProductName();
495  for (auto consumer : m_algoNameToAlgoInputsMap) {
496  auto& inputs = m_algoNameToAlgoInputsMap[consumer.first];
497  for (auto inputTag : inputs) {
498  // if (inputs[inputTag].isValid() && inputs[inputTag].dataProductName() == output2Match) {
499  if (inputTag == outputTag) {
500  auto& known_consumers = targetNode->getConsumerNodes();
501  auto valid_consumer = m_algoNameToAlgoNodeMap[consumer.first];
502  auto& known_producers = valid_consumer->getSupplierNodes();
503  if (std::find(known_producers.begin(),known_producers.end(),targetNode) == known_producers.end())
504  valid_consumer->addSupplierNode(targetNode);
505  if (std::find(known_consumers.begin(),known_consumers.end(),valid_consumer) == known_consumers.end())
506  targetNode->addConsumerNode(valid_consumer);
507  }
508  }
509  }
510  }
511 
512  }
513  return global_sc;
514  }
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm's name to AlgorithmNode.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T find(T...args)
AlgoInputsMap m_algoNameToAlgoInputsMap
Indexes: maps of algorithm's name to algorithm's inputs/outputs.
std::string concurrency::ExecutionFlowGraph::dumpDataFlow ( ) const

Print out all data origins and destinations, as reflected in the EF graph.

Definition at line 735 of file ExecutionFlowGraph.cpp.

735  {
736 
737  const char idt[] = " ";
738  std::ostringstream ost;
739 
740 
741  ost << "\n" << idt << "====================================\n";
742  ost << idt << "Data origins and destinations:\n";
743  ost << idt << "====================================\n";
744 
745  for (auto& pair : m_dataPathToDataNodeMap) {
746 
747  for (auto algoNode : pair.second->getProducers())
748  ost << idt << " " << algoNode->getNodeName() << "\n";
749 
750  ost << idt << " V\n";
751  ost << idt << " o " << pair.first << "\n";
752  ost << idt << " V\n";
753 
754  for (auto algoNode : pair.second->getConsumers())
755  ost << idt << " " << algoNode->getNodeName() << "\n";
756 
757  ost << idt << "====================================\n";
758  }
759 
760  return ost.str();
761  }
DataNodesMap m_dataPathToDataNodeMap
Index: map of data path to DataNode.
void concurrency::ExecutionFlowGraph::dumpExecutionPlan ( )

dump to file encountered execution plan

Definition at line 765 of file ExecutionFlowGraph.cpp.

765  {
766  std::ofstream myfile;
767  myfile.open("ExecutionPlan.graphml", std::ios::app);
768 
769  boost::dynamic_properties dp;
770  dp.property("name", boost::get(&boost::AlgoNodeStruct::m_name, m_ExecPlan));
771  dp.property("index", boost::get(&boost::AlgoNodeStruct::m_index, m_ExecPlan));
772  dp.property("rank", boost::get(&boost::AlgoNodeStruct::m_rank, m_ExecPlan));
773  dp.property("runtime", boost::get(&boost::AlgoNodeStruct::m_runtime, m_ExecPlan));
774 
775  boost::write_graphml(myfile, m_ExecPlan, dp);
776 
777  myfile.close();
778  }
T open(T...args)
boost::ExecPlan m_ExecPlan
temporary items to experiment with execution planning
STL class.
T close(T...args)
AlgorithmNode * concurrency::ExecutionFlowGraph::getAlgorithmNode ( const std::string algoName) const

Get the AlgorithmNode from by algorithm name using graph index.

Definition at line 614 of file ExecutionFlowGraph.cpp.

614  {
615 
616  return m_algoNameToAlgoNodeMap.at(algoName);
617  }
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm's name to AlgorithmNode.
AlgsExecutionStates& concurrency::ExecutionFlowGraph::getAlgoStates ( const int &  slotNum) const
inline

Definition at line 337 of file ExecutionFlowGraph.h.

337 {return m_eventSlots->at(slotNum).algsStates;};
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
T at(T...args)
std::vector< EventSlot > * m_eventSlots
unsigned int concurrency::ExecutionFlowGraph::getControlFlowNodeCounter ( ) const
inline

Get total number of graph nodes.

Definition at line 312 of file ExecutionFlowGraph.h.

312 {return m_nodeCounter;}
unsigned int m_nodeCounter
Total number of nodes in the graph.
const std::vector< AlgorithmNode * > concurrency::ExecutionFlowGraph::getDataIndependentNodes ( ) const

Definition at line 718 of file ExecutionFlowGraph.cpp.

718  {
719 
721 
722  for (auto node : m_algoNameToAlgoInputsMap) {
723  DataObjIDColl collection = (node.second);
724  for (auto tag : collection)
725  // if (collection[tag].isValid()) {
726  result.push_back(getAlgorithmNode(node.first));
727  break;
728  // }
729  }
730 
731  return result;
732  }
T push_back(T...args)
STL class.
AlgoInputsMap m_algoNameToAlgoInputsMap
Indexes: maps of algorithm's name to algorithm's inputs/outputs.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
DataNode * concurrency::ExecutionFlowGraph::getDataNode ( const DataObjID dataPath) const

Get DataNode by DataObject path using graph index.

Definition at line 641 of file ExecutionFlowGraph.cpp.

641  {
642 
643  return m_dataPathToDataNodeMap.at(dataPath);
644  }
DataNodesMap m_dataPathToDataNodeMap
Index: map of data path to DataNode.
const std::chrono::system_clock::time_point concurrency::ExecutionFlowGraph::getInitTime ( ) const
inline

Definition at line 335 of file ExecutionFlowGraph.h.

335 {return m_initTime;};
const std::chrono::system_clock::time_point m_initTime
std::vector<int>& concurrency::ExecutionFlowGraph::getNodeDecisions ( const int &  slotNum) const
inline

Definition at line 339 of file ExecutionFlowGraph.h.

339 {return m_eventSlots->at(slotNum).controlFlowState;}
T at(T...args)
std::vector< int > controlFlowState
State of the control flow.
Definition: EventSlot.h:43
std::vector< EventSlot > * m_eventSlots
StatusCode concurrency::ExecutionFlowGraph::initialize ( const std::unordered_map< std::string, unsigned int > &  algname_index_map)

Initialize graph.

Definition at line 405 of file ExecutionFlowGraph.cpp.

405  {
406 
407  m_headNode->initialize(algname_index_map);
408  //StatusCode sc = buildDataDependenciesRealm();
410 
411  if (!sc.isSuccess())
412  error() << "Could not build the data dependency realm." << endmsg;
413 
414  return sc;
415  }
StatusCode buildAugmentedDataDependenciesRealm()
Build data dependency realm WITH data object nodes participating.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
virtual void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode concurrency::ExecutionFlowGraph::initialize ( const std::unordered_map< std::string, unsigned int > &  algname_index_map,
std::vector< EventSlot > &  eventSlots 
)

Definition at line 418 of file ExecutionFlowGraph.cpp.

419  {
420 
421  m_eventSlots = &eventSlots;
422  m_headNode->initialize(algname_index_map);
423  //StatusCode sc = buildDataDependenciesRealm();
425 
426  if (!sc.isSuccess())
427  error() << "Could not build the data dependency realm." << endmsg;
428 
429  debug() << dumpDataFlow() << endmsg;
430 
431  return sc;
432  }
StatusCode buildAugmentedDataDependenciesRealm()
Build data dependency realm WITH data object nodes participating.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:76
std::string dumpDataFlow() const
Print out all data origins and destinations, as reflected in the EF graph.
virtual void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
std::vector< EventSlot > * m_eventSlots
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
const std::string& concurrency::ExecutionFlowGraph::name ( ) const
inlineoverride

Retrieve name of the service.

Definition at line 331 of file ExecutionFlowGraph.h.

331 {return m_name;}
void concurrency::ExecutionFlowGraph::printState ( std::stringstream output,
AlgsExecutionStates states,
const std::vector< int > &  node_decisions,
const unsigned int &  recursionLevel 
) const
inline

Print a string representing the control flow state.

Definition at line 324 of file ExecutionFlowGraph.h.

327  {m_headNode->printState(output,states,node_decisions,recursionLevel);};
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
virtual void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
Print a string representing the control flow state.
void concurrency::ExecutionFlowGraph::rankAlgorithms ( IGraphVisitor ranker) const

Rank Algorithm nodes by the number of data outputs.

Definition at line 708 of file ExecutionFlowGraph.cpp.

708  {
709 
710  info() << "Starting ranking by data outputs .. " << endmsg;
711  for (auto& pair : m_algoNameToAlgoNodeMap) {
712  pair.second->accept(ranker);
713  debug() << " Rank of " << pair.first << ": " << pair.second->getRank() << endmsg;
714  }
715  }
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm's name to AlgorithmNode.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void concurrency::ExecutionFlowGraph::registerIODataObjects ( const Algorithm algo)

Register algorithm in the Data Dependency index.

Definition at line 435 of file ExecutionFlowGraph.cpp.

435  {
436 
437  const std::string& algoName = algo->name();
438 
439 
440  DataObjIDColl inputObjs, outputObjs;
441  DHHVisitor avis(inputObjs, outputObjs);
442  algo->acceptDHVisitor(&avis);
443 
444  m_algoNameToAlgoInputsMap[algoName] = inputObjs;
445 
446  debug() << "Inputs of " << algoName << ": ";
447  for (auto tag : inputObjs) {
448  debug() << tag << " | ";
449  }
450  debug() << endmsg;
451 
452  m_algoNameToAlgoOutputsMap[algoName] = outputObjs;
453 
454  debug() << "Outputs of " << algoName << ": ";
455  for (auto tag : outputObjs) {
456  debug() << tag << " | ";
457  }
458  debug() << endmsg;
459  }
STL class.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:820
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
virtual void acceptDHVisitor(IDataHandleVisitor *) const override
Definition: Algorithm.cpp:226
AlgoInputsMap m_algoNameToAlgoInputsMap
Indexes: maps of algorithm's name to algorithm's inputs/outputs.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
SmartIF<ISvcLocator>& concurrency::ExecutionFlowGraph::serviceLocator ( ) const
inlineoverride

Retrieve pointer to service locator.

Definition at line 333 of file ExecutionFlowGraph.h.

333 {return m_svcLocator;}
SmartIF< ISvcLocator > m_svcLocator
Service locator (needed to access the MessageSvc)
void concurrency::ExecutionFlowGraph::updateDecision ( const std::string algo_name,
const int &  slotNum,
AlgsExecutionStates states,
std::vector< int > &  node_decisions 
) const

A method to update algorithm node decision, and propagate it upwards.

Definition at line 699 of file ExecutionFlowGraph.cpp.

702  {
703  //debug() << "(UPDATING)Setting decision of algorithm " << algo_name << " and propagating it upwards.." << endmsg;
704  getAlgorithmNode(algo_name)->updateDecision(slotNum, algo_states, node_decisions);
705  }
virtual void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const
XXX: CF tests.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
void concurrency::ExecutionFlowGraph::updateEventState ( AlgsExecutionStates states,
std::vector< int > &  node_decisions 
) const

XXX CF tests. Is needed for older CF implementation.

Definition at line 693 of file ExecutionFlowGraph.cpp.

694  {
695  m_headNode->updateState(algo_states, node_decisions);
696  }
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
virtual int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const
Method to set algos to CONTROLREADY, if possible.

Member Data Documentation

friend concurrency::ExecutionFlowGraph::ExecutionFlowManager
private

Definition at line 272 of file ExecutionFlowGraph.h.

AlgoInputsMap concurrency::ExecutionFlowGraph::m_algoNameToAlgoInputsMap
private

Indexes: maps of algorithm's name to algorithm's inputs/outputs.

Definition at line 357 of file ExecutionFlowGraph.h.

AlgoNodesMap concurrency::ExecutionFlowGraph::m_algoNameToAlgoNodeMap
private

Index: map of algorithm's name to AlgorithmNode.

Definition at line 351 of file ExecutionFlowGraph.h.

AlgoOutputsMap concurrency::ExecutionFlowGraph::m_algoNameToAlgoOutputsMap
private

Definition at line 358 of file ExecutionFlowGraph.h.

DataNodesMap concurrency::ExecutionFlowGraph::m_dataPathToDataNodeMap
private

Index: map of data path to DataNode.

Definition at line 355 of file ExecutionFlowGraph.h.

DecisionHubsMap concurrency::ExecutionFlowGraph::m_decisionNameToDecisionHubMap
private

Index: map of decision's name to DecisionHub.

Definition at line 353 of file ExecutionFlowGraph.h.

std::vector<EventSlot>* concurrency::ExecutionFlowGraph::m_eventSlots
private

Definition at line 366 of file ExecutionFlowGraph.h.

std::map<std::string,boost::AlgoVertex> concurrency::ExecutionFlowGraph::m_exec_plan_map
private

Definition at line 369 of file ExecutionFlowGraph.h.

boost::ExecPlan concurrency::ExecutionFlowGraph::m_ExecPlan
private

temporary items to experiment with execution planning

Definition at line 368 of file ExecutionFlowGraph.h.

DecisionNode* concurrency::ExecutionFlowGraph::m_headNode
private

the head node of the control flow graph; may want to have multiple ones once supporting trigger paths

Definition at line 349 of file ExecutionFlowGraph.h.

const std::chrono::system_clock::time_point concurrency::ExecutionFlowGraph::m_initTime
private

Definition at line 364 of file ExecutionFlowGraph.h.

const std::string concurrency::ExecutionFlowGraph::m_name
private

Definition at line 363 of file ExecutionFlowGraph.h.

unsigned int concurrency::ExecutionFlowGraph::m_nodeCounter
private

Total number of nodes in the graph.

Definition at line 360 of file ExecutionFlowGraph.h.

SmartIF<ISvcLocator> concurrency::ExecutionFlowGraph::m_svcLocator
mutableprivate

Service locator (needed to access the MessageSvc)

Definition at line 362 of file ExecutionFlowGraph.h.


The documentation for this class was generated from the following files: