concurrency::ExecutionFlowGraph Class Reference

#include <src/ExecutionFlowGraph.h>

Inheritance diagram for concurrency::ExecutionFlowGraph:
Collaboration diagram for concurrency::ExecutionFlowGraph:

Public Member Functions

 ExecutionFlowGraph (const std::string &name, SmartIF< ISvcLocator > svc)
 Constructor. More...
 
 ~ExecutionFlowGraph () override
 Destructor. More...
 
StatusCode initialize (const std::unordered_map< std::string, unsigned int > &algname_index_map)
 Initialize graph. More...
 
StatusCode initialize (const std::unordered_map< std::string, unsigned int > &algname_index_map, std::vector< EventSlot > &eventSlots)
 
void registerIODataObjects (const Algorithm *algo)
 Register algorithm in the Data Dependency index. More...
 
StatusCode buildDataDependenciesRealm ()
 Build data dependency realm WITHOUT data object nodes: just interconnect algorithm nodes directly. More...
 
StatusCode buildAugmentedDataDependenciesRealm ()
 Build data dependency realm WITH data object nodes participating. More...
 
void addHeadNode (const std::string &headName, bool modeOR, bool allPass, bool isLazy)
 Add a node, which has no parents. More...
 
StatusCode addAlgorithmNode (Algorithm *daughterAlgo, const std::string &parentName, bool inverted, bool allPass)
 Add algorithm node. More...
 
template<class T >
void attachAlgorithmsToNodes (const std::string &algo_name, const T &container)
 Attach pointers to real Algorithms (and their clones) to Algorithm nodes of the graph. More...
 
AlgorithmNodegetAlgorithmNode (const std::string &algoName) const
 Get the AlgorithmNode from by algorithm name using graph index. More...
 
StatusCode addDataNode (const DataObjID &dataPath)
 Add DataNode that represents DataObject. More...
 
DataNodegetDataNode (const DataObjID &dataPath) const
 Get DataNode by DataObject path using graph index. More...
 
StatusCode addDecisionHubNode (Algorithm *daughterAlgo, const std::string &parentName, bool modeOR, bool allPass, bool isLazy)
 Add a node, which aggregates decisions of direct daughter nodes. More...
 
unsigned int getControlFlowNodeCounter () const
 Get total number of graph nodes. More...
 
void updateEventState (AlgsExecutionStates &states, std::vector< int > &node_decisions) const
 XXX CF tests. Is needed for older CF implementation. More...
 
void updateDecision (const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
 A method to update algorithm node decision, and propagate it upwards. More...
 
void rankAlgorithms (IGraphVisitor &ranker) const
 Rank Algorithm nodes by the number of data outputs. More...
 
void printState (std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const
 Print a string representing the control flow state. More...
 
const std::vector< AlgorithmNode * > getDataIndependentNodes () const
 
const std::stringname () const override
 Retrieve name of the service. More...
 
SmartIF< ISvcLocator > & serviceLocator () const override
 Retrieve pointer to service locator. More...
 
const std::chrono::system_clock::time_point getInitTime () const
 
AlgsExecutionStatesgetAlgoStates (const int &slotNum) const
 
std::vector< int > & getNodeDecisions (const int &slotNum) const
 
std::string dumpDataFlow () const
 Print out all data origins and destinations, as reflected in the EF graph. More...
 
void dumpExecutionPlan ()
 dump to file encountered execution plan More...
 
void addEdgeToExecutionPlan (const AlgorithmNode *u, const AlgorithmNode *v)
 set cause-effect connection between two algorithms in the execution plan More...
 
- Public Member Functions inherited from CommonMessagingBase
virtual ~CommonMessagingBase ()=default
 Virtual destructor. More...
 
SmartIF< IMessageSvc > & msgSvc () const
 The standard message service. More...
 
MsgStreammsgStream () const
 Return an uninitialized MsgStream. More...
 
MsgStreammsgStream (const MSG::Level level) const
 Predefined configurable message stream for the efficient printouts. More...
 
MsgStreamalways () const
 shortcut for the method msgStream(MSG::ALWAYS) More...
 
MsgStreamfatal () const
 shortcut for the method msgStream(MSG::FATAL) More...
 
MsgStreamerr () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamerror () const
 shortcut for the method msgStream(MSG::ERROR) More...
 
MsgStreamwarning () const
 shortcut for the method msgStream(MSG::WARNING) More...
 
MsgStreaminfo () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MsgStreamdebug () const
 shortcut for the method msgStream(MSG::DEBUG) More...
 
MsgStreamverbose () const
 shortcut for the method msgStream(MSG::VERBOSE) More...
 
MsgStreammsg () const
 shortcut for the method msgStream(MSG::INFO) More...
 
MSG::Level msgLevel () const
 get the output level from the embedded MsgStream More...
 
MSG::Level outputLevel () const __attribute__((deprecated))
 Backward compatibility function for getting the output level. More...
 
bool msgLevel (MSG::Level lvl) const
 get the output level from the embedded MsgStream More...
 

Private Attributes

friend ExecutionFlowManager
 
DecisionNodem_headNode
 the head node of the control flow graph; may want to have multiple ones once supporting trigger paths More...
 
AlgoNodesMap m_algoNameToAlgoNodeMap
 Index: map of algorithm's name to AlgorithmNode. More...
 
DecisionHubsMap m_decisionNameToDecisionHubMap
 Index: map of decision's name to DecisionHub. More...
 
DataNodesMap m_dataPathToDataNodeMap
 Index: map of data path to DataNode. More...
 
AlgoInputsMap m_algoNameToAlgoInputsMap
 Indexes: maps of algorithm's name to algorithm's inputs/outputs. More...
 
AlgoOutputsMap m_algoNameToAlgoOutputsMap
 
unsigned int m_nodeCounter
 Total number of nodes in the graph. More...
 
SmartIF< ISvcLocatorm_svcLocator
 Service locator (needed to access the MessageSvc) More...
 
const std::string m_name
 
const std::chrono::system_clock::time_point m_initTime
 
std::vector< EventSlot > * m_eventSlots
 
boost::ExecPlan m_ExecPlan
 temporary items to experiment with execution planning More...
 
std::map< std::string, boost::AlgoVertexm_exec_plan_map
 

Additional Inherited Members

- Public Types inherited from CommonMessaging< IExecutionFlowGraph >
using base_class = CommonMessaging
 
- Protected Member Functions inherited from CommonMessaging< IExecutionFlowGraph >
void updateMsgStreamOutputLevel (int level)
 Update the output level of the cached MsgStream. More...
 

Detailed Description

Definition at line 277 of file ExecutionFlowGraph.h.

Constructor & Destructor Documentation

concurrency::ExecutionFlowGraph::ExecutionFlowGraph ( const std::string name,
SmartIF< ISvcLocator svc 
)
inline

Constructor.

Definition at line 281 of file ExecutionFlowGraph.h.

281  :
283  m_eventSlots(nullptr) {}
unsigned int m_nodeCounter
Total number of nodes in the graph.
SmartIF< ISvcLocator > m_svcLocator
Service locator (needed to access the MessageSvc)
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
const std::chrono::system_clock::time_point m_initTime
std::vector< EventSlot > * m_eventSlots
concurrency::ExecutionFlowGraph::~ExecutionFlowGraph ( )
inlineoverride

Destructor.

Definition at line 285 of file ExecutionFlowGraph.h.

285  {
286  if (m_headNode != 0) delete m_headNode;
287  }
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...

Member Function Documentation

StatusCode concurrency::ExecutionFlowGraph::addAlgorithmNode ( Algorithm daughterAlgo,
const std::string parentName,
bool  inverted,
bool  allPass 
)

Add algorithm node.

Definition at line 589 of file ExecutionFlowGraph.cpp.

591  {
592 
594 
595  auto& algoName = algo->name();
596 
597  auto itP = m_decisionNameToDecisionHubMap.find( parentName );
598  concurrency::DecisionNode* parentNode;
599  if ( itP != m_decisionNameToDecisionHubMap.end() ) {
600  parentNode = itP->second;
601  auto itA = m_algoNameToAlgoNodeMap.find( algoName );
602  concurrency::AlgorithmNode* algoNode;
603  if ( itA != m_algoNameToAlgoNodeMap.end() ) {
604  algoNode = itA->second;
605  } else {
606  algoNode = new concurrency::AlgorithmNode( *this, m_nodeCounter, algoName, inverted, allPass, algo->isIOBound() );
607  ++m_nodeCounter;
608  m_algoNameToAlgoNodeMap[algoName] = algoNode;
609  if (msgLevel(MSG::DEBUG))
610  debug() << "AlgoNode " << algoName << " added @ " << algoNode << endmsg;
611  registerIODataObjects(algo);
612  }
613 
614  parentNode->addDaughterNode( algoNode );
615  algoNode->addParentNode( parentNode );
616  } else {
617  sc = StatusCode::FAILURE;
618  error() << "DecisionHubNode " << parentName << ", meant to be used as parent, is not registered in the EFG."
619  << endmsg;
620  }
621 
622  return sc;
623  }
unsigned int m_nodeCounter
Total number of nodes in the graph.
void addDaughterNode(ControlFlowNode *node)
Add a daughter node.
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm&#39;s name to AlgorithmNode.
void registerIODataObjects(const Algorithm *algo)
Register algorithm in the Data Dependency index.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
DecisionHubsMap m_decisionNameToDecisionHubMap
Index: map of decision&#39;s name to DecisionHub.
T find(T...args)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode concurrency::ExecutionFlowGraph::addDataNode ( const DataObjID dataPath)

Add DataNode that represents DataObject.

Definition at line 633 of file ExecutionFlowGraph.cpp.

634  {
635 
636  StatusCode sc;
637 
638  auto itD = m_dataPathToDataNodeMap.find( dataPath );
639  concurrency::DataNode* dataNode;
640  if ( itD != m_dataPathToDataNodeMap.end() ) {
641  dataNode = itD->second;
642  sc = StatusCode::SUCCESS;
643  } else {
644  dataNode = new concurrency::DataNode( *this, dataPath );
645  m_dataPathToDataNodeMap[dataPath] = dataNode;
646  if (msgLevel(MSG::DEBUG))
647  debug() << " DataNode for " << dataPath << " added @ " << dataNode << endmsg;
648  sc = StatusCode::SUCCESS;
649  }
650 
651  return sc;
652  }
DataNodesMap m_dataPathToDataNodeMap
Index: map of data path to DataNode.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T find(T...args)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode concurrency::ExecutionFlowGraph::addDecisionHubNode ( Algorithm daughterAlgo,
const std::string parentName,
bool  modeOR,
bool  allPass,
bool  isLazy 
)

Add a node, which aggregates decisions of direct daughter nodes.

Definition at line 662 of file ExecutionFlowGraph.cpp.

664  {
665 
667 
668  auto& decisionHubName = decisionHubAlgo->name();
669 
670  auto itP = m_decisionNameToDecisionHubMap.find( parentName );
671  concurrency::DecisionNode* parentNode;
672  if ( itP != m_decisionNameToDecisionHubMap.end() ) {
673  parentNode = itP->second;
674  auto itA = m_decisionNameToDecisionHubMap.find( decisionHubName );
675  concurrency::DecisionNode* decisionHubNode;
676  if ( itA != m_decisionNameToDecisionHubMap.end() ) {
677  decisionHubNode = itA->second;
678  } else {
679  decisionHubNode =
680  new concurrency::DecisionNode( *this, m_nodeCounter, decisionHubName, modeOR, allPass, isLazy );
681  ++m_nodeCounter;
682  m_decisionNameToDecisionHubMap[decisionHubName] = decisionHubNode;
683  if (msgLevel(MSG::DEBUG))
684  debug() << "DecisionHubNode " << decisionHubName << " added @ " << decisionHubNode << endmsg;
685  }
686 
687  parentNode->addDaughterNode( decisionHubNode );
688  decisionHubNode->addParentNode( parentNode );
689  } else {
690  sc = StatusCode::FAILURE;
691  error() << "DecisionHubNode " << parentName << ", meant to be used as parent, is not registered in the EFG."
692  << endmsg;
693  }
694 
695  return sc;
696  }
unsigned int m_nodeCounter
Total number of nodes in the graph.
void addDaughterNode(ControlFlowNode *node)
Add a daughter node.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
DecisionHubsMap m_decisionNameToDecisionHubMap
Index: map of decision&#39;s name to DecisionHub.
T find(T...args)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void concurrency::ExecutionFlowGraph::addEdgeToExecutionPlan ( const AlgorithmNode u,
const AlgorithmNode v 
)

set cause-effect connection between two algorithms in the execution plan

Definition at line 801 of file ExecutionFlowGraph.cpp.

802  {
803 
804  boost::AlgoVertex source;
805  float runtime( 0. );
806  if ( u == nullptr ) {
807  auto itT = m_exec_plan_map.find( "ENTRY" );
808  if ( itT != m_exec_plan_map.end() ) {
809  source = itT->second;
810  } else {
811  source = boost::add_vertex( boost::AlgoNodeStruct( "ENTRY", -999, -999, 0 ), m_ExecPlan );
812  m_exec_plan_map["ENTRY"] = source;
813  }
814  } else {
815  auto itS = m_exec_plan_map.find( u->getNodeName() );
816  if ( itS != m_exec_plan_map.end() ) {
817  source = itS->second;
818  } else {
819  auto alg = dynamic_cast<Algorithm*>( u->getAlgorithmRepresentatives()[0] );
820  if ( alg == 0 ) {
821  fatal() << "could not convert IAlgorithm to Algorithm!" << endmsg;
822  } else {
823  try {
824  const Gaudi::Details::PropertyBase& p = alg->getProperty( "AvgRuntime" );
825  runtime = std::stof( p.toString() );
826  } catch(...) {
827  if (msgLevel(MSG::DEBUG))
828  debug() << "no AvgRuntime for " << alg->name() << endmsg;
829  runtime = 1.;
830  }
831  }
832  source = boost::add_vertex( boost::AlgoNodeStruct( u->getNodeName(), u->getAlgoIndex(), u->getRank(), runtime ),
833  m_ExecPlan );
834  m_exec_plan_map[u->getNodeName()] = source;
835  }
836  }
837 
838  boost::AlgoVertex target;
839  auto itP = m_exec_plan_map.find( v->getNodeName() );
840  if ( itP != m_exec_plan_map.end() ) {
841  target = itP->second;
842  } else {
843  auto alg = dynamic_cast<Algorithm*>( v->getAlgorithmRepresentatives()[0] );
844  if ( alg == 0 ) {
845  fatal() << "could not convert IAlgorithm to Algorithm!" << endmsg;
846  } else {
847  try {
848  const Gaudi::Details::PropertyBase& p = alg->getProperty( "AvgRuntime" );
849  runtime = std::stof( p.toString() );
850  } catch(...) {
851  if (msgLevel(MSG::DEBUG))
852  debug() << "no AvgRuntime for " << alg->name() << endmsg;
853  runtime = 1.;
854  }
855  }
856  target = boost::add_vertex( boost::AlgoNodeStruct( v->getNodeName(), v->getAlgoIndex(), v->getRank(), runtime ),
857  m_ExecPlan );
858  m_exec_plan_map[v->getNodeName()] = target;
859  }
860 
861  if (msgLevel(MSG::DEBUG))
862  debug() << "Edge added to execution plan" << endmsg;
863  boost::add_edge(source, target, m_ExecPlan);
864  }
boost::ExecPlan m_ExecPlan
temporary items to experiment with execution planning
T stof(T...args)
T end(T...args)
std::map< std::string, boost::AlgoVertex > m_exec_plan_map
virtual std::string toString() const =0
value -> string
graph_traits< ExecPlan >::vertex_descriptor AlgoVertex
PropertyBase base class allowing PropertyBase* collections to be "homogeneous".
Definition: Property.h:32
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find(T...args)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void concurrency::ExecutionFlowGraph::addHeadNode ( const std::string headName,
bool  modeOR,
bool  allPass,
bool  isLazy 
)

Add a node, which has no parents.

Definition at line 699 of file ExecutionFlowGraph.cpp.

700  {
701 
702  auto itH = m_decisionNameToDecisionHubMap.find( headName );
703  if ( itH != m_decisionNameToDecisionHubMap.end() ) {
704  m_headNode = itH->second;
705  } else {
706  m_headNode = new concurrency::DecisionNode( *this, m_nodeCounter, headName, modeOR, allPass, isLazy );
707  ++m_nodeCounter;
709  }
710  }
unsigned int m_nodeCounter
Total number of nodes in the graph.
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
DecisionHubsMap m_decisionNameToDecisionHubMap
Index: map of decision&#39;s name to DecisionHub.
T find(T...args)
template<class T >
void concurrency::ExecutionFlowGraph::attachAlgorithmsToNodes ( const std::string algo_name,
const T &  container 
)
inline

Attach pointers to real Algorithms (and their clones) to Algorithm nodes of the graph.

Definition at line 304 of file ExecutionFlowGraph.h.

304  {
305  auto node = getAlgorithmNode(algo_name);
306  for (auto ialgoIt = container.unsafe_begin(); ialgoIt != container.unsafe_end(); ++ialgoIt)
307  node->attachAlgorithm(*ialgoIt);
308  }
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
StatusCode concurrency::ExecutionFlowGraph::buildAugmentedDataDependenciesRealm ( )

Build data dependency realm WITH data object nodes participating.

Definition at line 530 of file ExecutionFlowGraph.cpp.

531  {
532 
533  StatusCode global_sc( StatusCode::SUCCESS, true );
534 
535  // Create the DataObjects (DO) realm (represented by DataNodes in the graph),
536  // connected to DO producers (AlgorithmNodes)
537  for (auto algo : m_algoNameToAlgoNodeMap) {
538 
539  auto& outCollection = m_algoNameToAlgoOutputsMap[algo.first];
540  for (auto outputTag : outCollection) {
541  const auto sc = addDataNode(outputTag);
542  if (!sc.isSuccess()) {
543  error() << "Extra producer (" << algo.first << ") for DataObject @ "
544  << outputTag
545  << " has been detected: this is not allowed." << endmsg;
546  global_sc = sc;
547  }
548  auto dataNode = getDataNode(outputTag);
549  dataNode->addProducerNode(algo.second);
550  algo.second->addOutputDataNode(dataNode);
551  }
552  }
553 
554  // Connect previously created DO realm to DO consumers (AlgorithmNodes)
555  for ( auto algo : m_algoNameToAlgoNodeMap ) {
556  auto& inCollection = m_algoNameToAlgoInputsMap[algo.first];
557  for (auto inputTag : inCollection) {
558  DataNode* dataNode = nullptr;
559  auto primaryPath = inputTag;
560  auto itP = m_dataPathToDataNodeMap.find(primaryPath);
561  if (itP != m_dataPathToDataNodeMap.end()) {
562  dataNode = getDataNode(primaryPath);
563  //if (!inCollection[inputTag].alternativeDataProductNames().empty())
564  // warning() << "Dropping all alternative data dependencies in the graph, but '" << primaryPath
565  // << "', for algorithm " << algo.first << endmsg;
566  //} else {
567  // for (auto alterPath : inCollection[inputTag].alternativeDataProductNames()) {
568  // auto itAP = m_dataPathToDataNodeMap.find(alterPath);
569  // if (itAP != m_dataPathToDataNodeMap.end()) {
570  // dataNode = getDataNode(alterPath);
571  // warning() << "Dropping all alternative data dependencies in the graph, but '" << alterPath
572  // << "', for algorithm " << algo.first << endmsg;
573  // break;
574  // }
575  //}
576  }
577 
578  if (dataNode) {
579  dataNode->addConsumerNode(algo.second);
580  algo.second->addInputDataNode(dataNode);
581  }
582  }
583  }
584 
585  return global_sc;
586  }
DataNodesMap m_dataPathToDataNodeMap
Index: map of data path to DataNode.
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm&#39;s name to AlgorithmNode.
DataNode * getDataNode(const DataObjID &dataPath) const
Get DataNode by DataObject path using graph index.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T find(T...args)
AlgoInputsMap m_algoNameToAlgoInputsMap
Indexes: maps of algorithm&#39;s name to algorithm&#39;s inputs/outputs.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode addDataNode(const DataObjID &dataPath)
Add DataNode that represents DataObject.
StatusCode concurrency::ExecutionFlowGraph::buildDataDependenciesRealm ( )

Build data dependency realm WITHOUT data object nodes: just interconnect algorithm nodes directly.

Definition at line 477 of file ExecutionFlowGraph.cpp.

478  {
479 
480  StatusCode global_sc( StatusCode::SUCCESS );
481 
482  for ( auto algo : m_algoNameToAlgoNodeMap ) {
483 
484  auto targetNode = m_algoNameToAlgoNodeMap[algo.first];
485 
486  // Find producers for all the inputs of the target node
487  auto& targetInCollection = m_algoNameToAlgoInputsMap[algo.first];
488  for (auto inputTag : targetInCollection) {
489  for (auto producer : m_algoNameToAlgoOutputsMap) {
490  auto& outputs = m_algoNameToAlgoOutputsMap[producer.first];
491  for (auto outputTag : outputs) {
492  if (inputTag == outputTag) {
493  auto& known_producers = targetNode->getSupplierNodes();
494  auto valid_producer = m_algoNameToAlgoNodeMap[producer.first];
495  auto& known_consumers = valid_producer->getConsumerNodes();
496  if ( std::find( known_producers.begin(), known_producers.end(), valid_producer ) ==
497  known_producers.end() )
498  targetNode->addSupplierNode( valid_producer );
499  if ( std::find( known_consumers.begin(), known_consumers.end(), targetNode ) == known_consumers.end() )
500  valid_producer->addConsumerNode( targetNode );
501  }
502  }
503  }
504  }
505 
506  // Find consumers for all the outputs of the target node
507  auto& targetOutCollection = m_algoNameToAlgoOutputsMap[algo.first];
508  for (auto outputTag : targetOutCollection) {
509  for (auto consumer : m_algoNameToAlgoInputsMap) {
510  auto& inputs = m_algoNameToAlgoInputsMap[consumer.first];
511  for (auto inputTag : inputs) {
512  if (inputTag == outputTag) {
513  auto& known_consumers = targetNode->getConsumerNodes();
514  auto valid_consumer = m_algoNameToAlgoNodeMap[consumer.first];
515  auto& known_producers = valid_consumer->getSupplierNodes();
516  if ( std::find( known_producers.begin(), known_producers.end(), targetNode ) == known_producers.end() )
517  valid_consumer->addSupplierNode( targetNode );
518  if ( std::find( known_consumers.begin(), known_consumers.end(), valid_consumer ) ==
519  known_consumers.end() )
520  targetNode->addConsumerNode( valid_consumer );
521  }
522  }
523  }
524  }
525  }
526  return global_sc;
527  }
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm&#39;s name to AlgorithmNode.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
T find(T...args)
AlgoInputsMap m_algoNameToAlgoInputsMap
Indexes: maps of algorithm&#39;s name to algorithm&#39;s inputs/outputs.
std::string concurrency::ExecutionFlowGraph::dumpDataFlow ( ) const

Print out all data origins and destinations, as reflected in the EF graph.

Definition at line 757 of file ExecutionFlowGraph.cpp.

758  {
759 
760  const char idt[] = " ";
761  std::ostringstream ost;
762 
763  ost << "\n" << idt << "====================================\n";
764  ost << idt << "Data origins and destinations:\n";
765  ost << idt << "====================================\n";
766 
767  for ( auto& pair : m_dataPathToDataNodeMap ) {
768 
769  for ( auto algoNode : pair.second->getProducers() ) ost << idt << " " << algoNode->getNodeName() << "\n";
770 
771  ost << idt << " V\n";
772  ost << idt << " o " << pair.first << "\n";
773  ost << idt << " V\n";
774 
775  for ( auto algoNode : pair.second->getConsumers() ) ost << idt << " " << algoNode->getNodeName() << "\n";
776 
777  ost << idt << "====================================\n";
778  }
779 
780  return ost.str();
781  }
DataNodesMap m_dataPathToDataNodeMap
Index: map of data path to DataNode.
void concurrency::ExecutionFlowGraph::dumpExecutionPlan ( )

dump to file encountered execution plan

Definition at line 785 of file ExecutionFlowGraph.cpp.

786  {
787  std::ofstream myfile;
788  myfile.open( "ExecutionPlan.graphml", std::ios::app );
789 
790  boost::dynamic_properties dp;
791  dp.property( "name", boost::get( &boost::AlgoNodeStruct::m_name, m_ExecPlan ) );
792  dp.property( "index", boost::get( &boost::AlgoNodeStruct::m_index, m_ExecPlan ) );
793  dp.property( "rank", boost::get( &boost::AlgoNodeStruct::m_rank, m_ExecPlan ) );
794  dp.property( "runtime", boost::get( &boost::AlgoNodeStruct::m_runtime, m_ExecPlan ) );
795 
796  boost::write_graphml( myfile, m_ExecPlan, dp );
797 
798  myfile.close();
799  }
T open(T...args)
boost::ExecPlan m_ExecPlan
temporary items to experiment with execution planning
STL class.
T close(T...args)
AlgorithmNode * concurrency::ExecutionFlowGraph::getAlgorithmNode ( const std::string algoName) const

Get the AlgorithmNode from by algorithm name using graph index.

Definition at line 626 of file ExecutionFlowGraph.cpp.

627  {
628 
629  return m_algoNameToAlgoNodeMap.at( algoName );
630  }
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm&#39;s name to AlgorithmNode.
AlgsExecutionStates& concurrency::ExecutionFlowGraph::getAlgoStates ( const int &  slotNum) const
inline

Definition at line 343 of file ExecutionFlowGraph.h.

343 {return m_eventSlots->at(slotNum).algsStates;}
AlgsExecutionStates algsStates
Vector of algorithms states.
Definition: EventSlot.h:37
T at(T...args)
std::vector< EventSlot > * m_eventSlots
unsigned int concurrency::ExecutionFlowGraph::getControlFlowNodeCounter ( ) const
inline

Get total number of graph nodes.

Definition at line 318 of file ExecutionFlowGraph.h.

318 {return m_nodeCounter;}
unsigned int m_nodeCounter
Total number of nodes in the graph.
const std::vector< AlgorithmNode * > concurrency::ExecutionFlowGraph::getDataIndependentNodes ( ) const

Definition at line 742 of file ExecutionFlowGraph.cpp.

743  {
744 
746 
747  for (auto node : m_algoNameToAlgoInputsMap) {
748  DataObjIDColl collection = (node.second);
749  if (collection.empty())
750  result.push_back(getAlgorithmNode(node.first));
751  }
752 
753  return result;
754  }
T empty(T...args)
T push_back(T...args)
STL class.
AlgoInputsMap m_algoNameToAlgoInputsMap
Indexes: maps of algorithm&#39;s name to algorithm&#39;s inputs/outputs.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
DataNode * concurrency::ExecutionFlowGraph::getDataNode ( const DataObjID dataPath) const

Get DataNode by DataObject path using graph index.

Definition at line 655 of file ExecutionFlowGraph.cpp.

656  {
657 
658  return m_dataPathToDataNodeMap.at( dataPath );
659  }
DataNodesMap m_dataPathToDataNodeMap
Index: map of data path to DataNode.
const std::chrono::system_clock::time_point concurrency::ExecutionFlowGraph::getInitTime ( ) const
inline

Definition at line 341 of file ExecutionFlowGraph.h.

341 {return m_initTime;}
const std::chrono::system_clock::time_point m_initTime
std::vector<int>& concurrency::ExecutionFlowGraph::getNodeDecisions ( const int &  slotNum) const
inline

Definition at line 345 of file ExecutionFlowGraph.h.

345 {return m_eventSlots->at(slotNum).controlFlowState;}
T at(T...args)
std::vector< int > controlFlowState
State of the control flow.
Definition: EventSlot.h:43
std::vector< EventSlot > * m_eventSlots
StatusCode concurrency::ExecutionFlowGraph::initialize ( const std::unordered_map< std::string, unsigned int > &  algname_index_map)

Initialize graph.

Definition at line 420 of file ExecutionFlowGraph.cpp.

421  {
422 
423  m_headNode->initialize( algname_index_map );
424  // StatusCode sc = buildDataDependenciesRealm();
426 
427  if ( !sc.isSuccess() ) error() << "Could not build the data dependency realm." << endmsg;
428 
429  return sc;
430  }
StatusCode buildAugmentedDataDependenciesRealm()
Build data dependency realm WITH data object nodes participating.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map) override
Initialize.
StatusCode concurrency::ExecutionFlowGraph::initialize ( const std::unordered_map< std::string, unsigned int > &  algname_index_map,
std::vector< EventSlot > &  eventSlots 
)

Definition at line 433 of file ExecutionFlowGraph.cpp.

435  {
436 
437  m_eventSlots = &eventSlots;
438  m_headNode->initialize( algname_index_map );
439  // StatusCode sc = buildDataDependenciesRealm();
441 
442  if ( !sc.isSuccess() ) error() << "Could not build the data dependency realm." << endmsg;
443 
444  if (msgLevel(MSG::DEBUG))
445  debug() << dumpDataFlow() << endmsg;
446 
447  return sc;
448  }
StatusCode buildAugmentedDataDependenciesRealm()
Build data dependency realm WITH data object nodes participating.
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
std::string dumpDataFlow() const
Print out all data origins and destinations, as reflected in the EF graph.
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
std::vector< EventSlot > * m_eventSlots
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map) override
Initialize.
const std::string& concurrency::ExecutionFlowGraph::name ( ) const
inlineoverride

Retrieve name of the service.

Definition at line 337 of file ExecutionFlowGraph.h.

337 {return m_name;}
void concurrency::ExecutionFlowGraph::printState ( std::stringstream output,
AlgsExecutionStates states,
const std::vector< int > &  node_decisions,
const unsigned int &  recursionLevel 
) const
inline

Print a string representing the control flow state.

Definition at line 330 of file ExecutionFlowGraph.h.

333  {m_headNode->printState(output,states,node_decisions,recursionLevel);}
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const override
Print a string representing the control flow state.
void concurrency::ExecutionFlowGraph::rankAlgorithms ( IGraphVisitor ranker) const

Rank Algorithm nodes by the number of data outputs.

Definition at line 728 of file ExecutionFlowGraph.cpp.

729  {
730 
731  info() << "Starting ranking by data outputs .. " << endmsg;
732  for (auto& pair : m_algoNameToAlgoNodeMap) {
733  if (msgLevel(MSG::DEBUG))
734  debug() << " Ranking " << pair.first << "... " << endmsg;
735  pair.second->accept(ranker);
736  if (msgLevel(MSG::DEBUG))
737  debug() << " ... rank of " << pair.first << ": " << pair.second->getRank() << endmsg;
738  }
739  }
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
AlgoNodesMap m_algoNameToAlgoNodeMap
Index: map of algorithm&#39;s name to AlgorithmNode.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
void concurrency::ExecutionFlowGraph::registerIODataObjects ( const Algorithm algo)

Register algorithm in the Data Dependency index.

Definition at line 451 of file ExecutionFlowGraph.cpp.

452  {
453 
454  const std::string& algoName = algo->name();
455 
456  DataObjIDColl inputObjs, outputObjs;
457  DHHVisitor avis( inputObjs, outputObjs );
458  algo->acceptDHVisitor( &avis );
459 
460  m_algoNameToAlgoInputsMap[algoName] = inputObjs;
461  m_algoNameToAlgoOutputsMap[algoName] = outputObjs;
462 
463  if (msgLevel(MSG::DEBUG)) {
464  debug() << "Inputs of " << algoName << ": ";
465  for (auto tag : inputObjs)
466  debug() << tag << " | ";
467  debug() << endmsg;
468 
469  debug() << "Outputs of " << algoName << ": ";
470  for (auto tag : outputObjs)
471  debug() << tag << " | ";
472  debug() << endmsg;
473  }
474  }
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:727
virtual void acceptDHVisitor(IDataHandleVisitor *) const override
Definition: Algorithm.cpp:205
STL class.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
AlgoInputsMap m_algoNameToAlgoInputsMap
Indexes: maps of algorithm&#39;s name to algorithm&#39;s inputs/outputs.
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
SmartIF<ISvcLocator>& concurrency::ExecutionFlowGraph::serviceLocator ( ) const
inlineoverride

Retrieve pointer to service locator.

Definition at line 339 of file ExecutionFlowGraph.h.

339 {return m_svcLocator;}
SmartIF< ISvcLocator > m_svcLocator
Service locator (needed to access the MessageSvc)
void concurrency::ExecutionFlowGraph::updateDecision ( const std::string algo_name,
const int &  slotNum,
AlgsExecutionStates states,
std::vector< int > &  node_decisions 
) const

A method to update algorithm node decision, and propagate it upwards.

Definition at line 719 of file ExecutionFlowGraph.cpp.

721  {
722  //if (msgLevel(MSG::DEBUG))
723  // debug() << "(UPDATING)Setting decision of algorithm " << algo_name << " and propagating it upwards.." << endmsg;
724  getAlgorithmNode( algo_name )->updateDecision( slotNum, algo_states, node_decisions );
725  }
void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const override
XXX: CF tests.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
void concurrency::ExecutionFlowGraph::updateEventState ( AlgsExecutionStates states,
std::vector< int > &  node_decisions 
) const

XXX CF tests. Is needed for older CF implementation.

Definition at line 713 of file ExecutionFlowGraph.cpp.

714  {
715  m_headNode->updateState( algo_states, node_decisions );
716  }
DecisionNode * m_headNode
the head node of the control flow graph; may want to have multiple ones once supporting trigger paths...
int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
Method to set algos to CONTROLREADY, if possible.

Member Data Documentation

friend concurrency::ExecutionFlowGraph::ExecutionFlowManager
private

Definition at line 278 of file ExecutionFlowGraph.h.

AlgoInputsMap concurrency::ExecutionFlowGraph::m_algoNameToAlgoInputsMap
private

Indexes: maps of algorithm's name to algorithm's inputs/outputs.

Definition at line 363 of file ExecutionFlowGraph.h.

AlgoNodesMap concurrency::ExecutionFlowGraph::m_algoNameToAlgoNodeMap
private

Index: map of algorithm's name to AlgorithmNode.

Definition at line 357 of file ExecutionFlowGraph.h.

AlgoOutputsMap concurrency::ExecutionFlowGraph::m_algoNameToAlgoOutputsMap
private

Definition at line 364 of file ExecutionFlowGraph.h.

DataNodesMap concurrency::ExecutionFlowGraph::m_dataPathToDataNodeMap
private

Index: map of data path to DataNode.

Definition at line 361 of file ExecutionFlowGraph.h.

DecisionHubsMap concurrency::ExecutionFlowGraph::m_decisionNameToDecisionHubMap
private

Index: map of decision's name to DecisionHub.

Definition at line 359 of file ExecutionFlowGraph.h.

std::vector<EventSlot>* concurrency::ExecutionFlowGraph::m_eventSlots
private

Definition at line 372 of file ExecutionFlowGraph.h.

std::map<std::string,boost::AlgoVertex> concurrency::ExecutionFlowGraph::m_exec_plan_map
private

Definition at line 375 of file ExecutionFlowGraph.h.

boost::ExecPlan concurrency::ExecutionFlowGraph::m_ExecPlan
private

temporary items to experiment with execution planning

Definition at line 374 of file ExecutionFlowGraph.h.

DecisionNode* concurrency::ExecutionFlowGraph::m_headNode
private

the head node of the control flow graph; may want to have multiple ones once supporting trigger paths

Definition at line 355 of file ExecutionFlowGraph.h.

const std::chrono::system_clock::time_point concurrency::ExecutionFlowGraph::m_initTime
private

Definition at line 370 of file ExecutionFlowGraph.h.

const std::string concurrency::ExecutionFlowGraph::m_name
private

Definition at line 369 of file ExecutionFlowGraph.h.

unsigned int concurrency::ExecutionFlowGraph::m_nodeCounter
private

Total number of nodes in the graph.

Definition at line 366 of file ExecutionFlowGraph.h.

SmartIF<ISvcLocator> concurrency::ExecutionFlowGraph::m_svcLocator
mutableprivate

Service locator (needed to access the MessageSvc)

Definition at line 368 of file ExecutionFlowGraph.h.


The documentation for this class was generated from the following files: