15 else if ( 1 == stateId )
25 for (
auto node : m_children )
delete node;
32 for (
auto daughter : m_children ) daughter->initialize( algname_index_map );
39 if (
std::find( m_parents.begin(), m_parents.end(), node ) == m_parents.end() ) m_parents.push_back( node );
46 if (
std::find( m_children.begin(), m_children.end(), node ) == m_children.end() ) m_children.push_back( node );
51 const std::vector<int>& node_decisions,
const unsigned int& recursionLevel )
const 57 for (
auto daughter : m_children ) {
58 daughter->printState( output, states, node_decisions, recursionLevel + 2 );
67 int decision = ( ( m_allPass && m_modePromptDecision ) ? 1 : -1 );
68 bool hasUndecidedChild =
false;
69 for (
auto daughter : m_children ) {
70 if ( m_modePromptDecision && ( -1 != decision || hasUndecidedChild ) ) {
75 auto res = daughter->updateState( states, node_decisions );
77 hasUndecidedChild =
true;
78 }
else if (
false == m_modeOR && res == 0 ) {
81 else if (
true == m_modeOR && res == 1 ) {
86 if ( !hasUndecidedChild && -1 == decision ) {
88 if (
true == m_modeOR ) {
98 if ( m_allPass ) decision = 1;
107 int decision = ( ( m_allPass && m_modePromptDecision ) ? 1 : -1 );
108 bool keepGoing =
true;
109 bool hasUndecidedChild =
false;
113 for (
auto daughter : m_children ) {
117 if ( m_modePromptDecision && !keepGoing ) {
125 int& res = node_decisions[daughter->getNodeIndex()];
127 hasUndecidedChild =
true;
130 algod->promoteToControlReadyState( slotNum, states, node_decisions );
131 bool result = algod->promoteToDataReadyState( slotNum, requestor );
132 if ( result ) keepGoing =
false;
134 daughter->updateDecision( slotNum, states, node_decisions, requestor );
138 }
else if (
false == m_modeOR && res == 0 ) {
142 }
else if (
true == m_modeOR && res == 1 ) {
149 if ( !hasUndecidedChild && -1 == decision ) {
151 if (
true == m_modeOR ) {
163 if ( -1 != decision )
164 for (
auto p : m_parents ) p->updateDecision( slotNum, states, node_decisions, requestor );
178 for (
auto daughter : m_children ) {
179 auto res = node_decisions[daughter->getNodeIndex()];
181 daughter->promoteToControlReadyState( slotNum, states, node_decisions );
182 if ( m_modePromptDecision )
return true;
183 }
else if ( m_modePromptDecision ) {
184 if ( (
false == m_modeOR && res == 0 ) || (
true == m_modeOR && res == 1 ) )
return true;
197 bool result = visitor.
visit( *
this );
201 for (
auto parent : m_parents ) {
202 parent->accept( visitor );
208 for (
auto child : m_children ) {
209 bool result = child->accept( visitor );
210 if (!m_modeConcurrent)
224 for (
auto node : m_outputs ) {
233 m_algoIndex = algname_index_map.
at( m_algoName );
241 auto&
state = states[m_algoIndex];
244 if ( State::INITIAL ==
state ) {
248 }
else if ( State::CONTROLREADY ==
state ) {
260 auto&
state = states[m_algoIndex];
263 if ( State::CONTROLREADY ==
state ) {
264 if ( dataDependenciesSatisfied( slotNum ) ) {
266 states.updateState( m_algoIndex, State::DATAREADY ).ignore();
282 }
else if ( State::DATAREADY ==
state ) {
284 }
else if ( State::SCHEDULED ==
state ) {
302 for (
auto algoNode : dataNode->getProducers() )
303 if ( State::EVTACCEPTED == states[algoNode->getAlgoIndex()] ) {
322 for (
auto algoNode : dataNode->getProducers() )
323 if ( State::EVTACCEPTED == states[algoNode->getAlgoIndex()] ) {
328 if ( !result )
break;
336 const std::vector<int>& node_decisions,
const unsigned int& recursionLevel )
const 352 unsigned int decision = -1;
353 if ( State::INITIAL == state ) {
357 if (
true == m_allPass ) {
359 }
else if ( State::EVTACCEPTED == state ) {
360 decision = !m_inverted;
361 }
else if ( State::EVTREJECTED == state ) {
362 decision = m_inverted;
380 if (
true == m_allPass ) {
382 }
else if ( State::EVTACCEPTED == state ) {
383 decision = !m_inverted;
384 }
else if ( State::EVTREJECTED == state ) {
385 decision = m_inverted;
392 if ( -1 != decision ) {
393 for (
auto output : m_outputs )
394 for (
auto consumer :
output->getConsumers() ) consumer->promoteToDataReadyState( slotNum, requestor );
397 for (
auto p : m_parents ) {
410 visitor.
visit( *
this );
421 if (
std::find( m_parents.begin(), m_parents.end(), node ) == m_parents.end() ) m_parents.push_back( node );
428 if (
std::find( m_outputs.begin(), m_outputs.end(), node ) == m_outputs.end() ) m_outputs.push_back( node );
442 m_headNode->initialize( algname_index_map );
444 StatusCode sc = buildAugmentedDataDependenciesRealm();
446 if ( !sc.
isSuccess() ) error() <<
"Could not build the data dependency realm." <<
endmsg;
456 m_eventSlots = &eventSlots;
457 m_headNode->initialize( algname_index_map );
459 StatusCode sc = buildAugmentedDataDependenciesRealm();
461 if ( !sc.
isSuccess() ) error() <<
"Could not build the data dependency realm." <<
endmsg;
464 debug() << dumpDataFlow() <<
endmsg;
479 m_algoNameToAlgoInputsMap[algoName] = inputObjs;
480 m_algoNameToAlgoOutputsMap[algoName] = outputObjs;
483 debug() <<
"Inputs of " << algoName <<
": ";
484 for (
auto tag : inputObjs)
485 debug() << tag <<
" | ";
488 debug() <<
"Outputs of " << algoName <<
": ";
489 for (
auto tag : outputObjs)
490 debug() << tag <<
" | ";
501 for (
auto algo : m_algoNameToAlgoNodeMap ) {
503 auto targetNode = m_algoNameToAlgoNodeMap[algo.first];
506 auto& targetInCollection = m_algoNameToAlgoInputsMap[algo.first];
507 for (
auto inputTag : targetInCollection) {
508 for (
auto producer : m_algoNameToAlgoOutputsMap) {
509 auto& outputs = m_algoNameToAlgoOutputsMap[producer.first];
510 for (
auto outputTag : outputs) {
511 if (inputTag == outputTag) {
512 auto& known_producers = targetNode->getSupplierNodes();
513 auto valid_producer = m_algoNameToAlgoNodeMap[producer.first];
514 auto& known_consumers = valid_producer->getConsumerNodes();
515 if (
std::find( known_producers.begin(), known_producers.end(), valid_producer ) ==
516 known_producers.end() )
517 targetNode->addSupplierNode( valid_producer );
518 if (
std::find( known_consumers.begin(), known_consumers.end(), targetNode ) == known_consumers.end() )
519 valid_producer->addConsumerNode( targetNode );
526 auto& targetOutCollection = m_algoNameToAlgoOutputsMap[algo.first];
527 for (
auto outputTag : targetOutCollection) {
528 for (
auto consumer : m_algoNameToAlgoInputsMap) {
529 auto&
inputs = m_algoNameToAlgoInputsMap[consumer.first];
530 for (
auto inputTag :
inputs) {
531 if (inputTag == outputTag) {
532 auto& known_consumers = targetNode->getConsumerNodes();
533 auto valid_consumer = m_algoNameToAlgoNodeMap[consumer.first];
534 auto& known_producers = valid_consumer->getSupplierNodes();
535 if (
std::find( known_producers.begin(), known_producers.end(), targetNode ) == known_producers.end() )
536 valid_consumer->addSupplierNode( targetNode );
537 if (
std::find( known_consumers.begin(), known_consumers.end(), valid_consumer ) ==
538 known_consumers.end() )
539 targetNode->addConsumerNode( valid_consumer );
556 for (
auto algo : m_algoNameToAlgoNodeMap) {
558 auto& outCollection = m_algoNameToAlgoOutputsMap[algo.first];
559 for (
auto outputTag : outCollection) {
560 const auto sc = addDataNode(outputTag);
561 if (!sc.isSuccess()) {
562 error() <<
"Extra producer (" << algo.first <<
") for DataObject @ " 564 <<
" has been detected: this is not allowed." <<
endmsg;
567 auto dataNode = getDataNode(outputTag);
568 dataNode->addProducerNode(algo.second);
569 algo.second->addOutputDataNode(dataNode);
574 for (
auto algo : m_algoNameToAlgoNodeMap ) {
575 auto& inCollection = m_algoNameToAlgoInputsMap[algo.first];
576 for (
auto inputTag : inCollection) {
578 auto primaryPath = inputTag;
579 auto itP = m_dataPathToDataNodeMap.find(primaryPath);
580 if (itP != m_dataPathToDataNodeMap.end()) {
581 dataNode = getDataNode(primaryPath);
599 algo.second->addInputDataNode(dataNode);
614 auto& algoName = algo->
name();
616 auto itP = m_decisionNameToDecisionHubMap.
find( parentName );
618 if ( itP != m_decisionNameToDecisionHubMap.end() ) {
619 parentNode = itP->second;
620 auto itA = m_algoNameToAlgoNodeMap.find( algoName );
622 if ( itA != m_algoNameToAlgoNodeMap.end() ) {
623 algoNode = itA->second;
627 m_algoNameToAlgoNodeMap[algoName] = algoNode;
629 debug() <<
"AlgoNode " << algoName <<
" added @ " << algoNode <<
endmsg;
630 registerIODataObjects(algo);
637 error() <<
"Decision hub node " << parentName <<
", requested to be parent, is not registered." 648 return m_algoNameToAlgoNodeMap.at( algoName );
657 auto itD = m_dataPathToDataNodeMap.find( dataPath );
659 if ( itD != m_dataPathToDataNodeMap.end() ) {
660 dataNode = itD->second;
664 m_dataPathToDataNodeMap[dataPath] = dataNode;
666 debug() <<
" DataNode for " << dataPath <<
" added @ " << dataNode <<
endmsg;
677 return m_dataPathToDataNodeMap.at( dataPath );
682 bool modeConcurrent,
bool modePromptDecision,
bool modeOR,
bool allPass)
687 auto& decisionHubName = decisionHubAlgo->
name();
689 auto itP = m_decisionNameToDecisionHubMap.
find( parentName );
691 if ( itP != m_decisionNameToDecisionHubMap.end() ) {
692 parentNode = itP->second;
693 auto itA = m_decisionNameToDecisionHubMap.find( decisionHubName );
695 if ( itA != m_decisionNameToDecisionHubMap.end() ) {
696 decisionHubNode = itA->second;
699 new concurrency::DecisionNode( *
this, m_nodeCounter, decisionHubName, modeConcurrent, modePromptDecision, modeOR, allPass);
701 m_decisionNameToDecisionHubMap[decisionHubName] = decisionHubNode;
703 debug() <<
"Decision hub node " << decisionHubName <<
" added @ " << decisionHubNode <<
endmsg;
710 error() <<
"Decision hub node " << parentName <<
", requested to be parent, is not registered." 721 auto itH = m_decisionNameToDecisionHubMap.find( headName );
722 if ( itH != m_decisionNameToDecisionHubMap.end() ) {
723 m_headNode = itH->second;
725 m_headNode =
new concurrency::DecisionNode( *
this, m_nodeCounter, headName, modeConcurrent, modePromptDecision, modeOR, allPass );
727 m_decisionNameToDecisionHubMap[headName] = m_headNode;
734 m_headNode->updateState( algo_states, node_decisions );
743 getAlgorithmNode( algo_name )->updateDecision( slotNum, algo_states, node_decisions );
750 info() <<
"Starting ranking by data outputs .. " <<
endmsg;
751 for (
auto& pair : m_algoNameToAlgoNodeMap) {
753 debug() <<
" Ranking " << pair.first <<
"... " <<
endmsg;
754 pair.second->accept(ranker);
756 debug() <<
" ... rank of " << pair.first <<
": " << pair.second->getRank() <<
endmsg;
766 for (
auto node : m_algoNameToAlgoInputsMap) {
768 if (collection.
empty())
769 result.
push_back(getAlgorithmNode(node.first));
779 const char idt[] =
" ";
782 ost <<
"\n" << idt <<
"====================================\n";
783 ost << idt <<
"Data origins and destinations:\n";
784 ost << idt <<
"====================================\n";
786 for (
auto& pair : m_dataPathToDataNodeMap ) {
788 for (
auto algoNode : pair.second->getProducers() ) ost << idt <<
" " << algoNode->getNodeName() <<
"\n";
790 ost << idt <<
" V\n";
791 ost << idt <<
" o " << pair.first <<
"\n";
792 ost << idt <<
" V\n";
794 for (
auto algoNode : pair.second->getConsumers() ) ost << idt <<
" " << algoNode->getNodeName() <<
"\n";
796 ost << idt <<
"====================================\n";
809 boost::dynamic_properties dp;
815 boost::write_graphml( myfile, m_ExecPlan, dp );
825 if ( u ==
nullptr ) {
826 auto itT = m_exec_plan_map.find(
"ENTRY" );
827 if ( itT != m_exec_plan_map.end() ) {
828 source = itT->second;
831 m_exec_plan_map[
"ENTRY"] = source;
834 auto itS = m_exec_plan_map.find( u->
getNodeName() );
835 if ( itS != m_exec_plan_map.end() ) {
836 source = itS->second;
840 fatal() <<
"could not convert IAlgorithm to Algorithm!" <<
endmsg;
847 debug() <<
"no AvgRuntime for " << alg->name() <<
endmsg;
858 auto itP = m_exec_plan_map.find( v->
getNodeName() );
859 if ( itP != m_exec_plan_map.end() ) {
860 target = itP->second;
864 fatal() <<
"could not convert IAlgorithm to Algorithm!" <<
endmsg;
871 debug() <<
"no AvgRuntime for " << alg->name() <<
endmsg;
881 debug() <<
"Edge added to execution plan" <<
endmsg;
882 boost::add_edge(source, target, m_ExecPlan);
void dumpExecutionPlan()
dump to file encountered execution plan
bool promoteToControlReadyState(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
XXX: CF tests.
virtual bool visitEnter(DecisionNode &) const =0
const unsigned int & getAlgoIndex() const
XXX: CF tests.
StatusCode addAlgorithmNode(Algorithm *daughterAlgo, const std::string &parentName, bool inverted, bool allPass)
Add algorithm node.
const std::string & name() const override
The identifying name of the algorithm object.
void addDaughterNode(ControlFlowNode *node)
Add a daughter node.
virtual void acceptDHVisitor(IDataHandleVisitor *) const override
bool dataDependenciesSatisfied(const int &slotNum) const
Method to check whether the Algorithm has its all data dependency satisfied.
bool isSuccess() const
Test for a status code of SUCCESS.
StatusCode addDecisionHubNode(Algorithm *daughterAlgo, const std::string &parentName, bool modeConcurrent, bool modePromptDecision, bool modeOR, bool allPass)
Add a node, which aggregates decisions of direct daughter nodes.
const std::vector< IAlgorithm * > & getAlgorithmRepresentatives() const
get Algorithm representatives
void addHeadNode(const std::string &headName, bool modeConcurrent, bool modePromptDecision, bool modeOR, bool allPass)
Add a node, which has no parents.
void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const override
XXX: CF tests.
std::string stateToString(const int &stateId) const
Translation between state id and name.
void addEdgeToExecutionPlan(const AlgorithmNode *u, const AlgorithmNode *v)
set cause-effect connection between two algorithms in the execution plan
StatusCode addDataNode(const DataObjID &dataPath)
Add DataNode that represents DataObject.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
void rankAlgorithms(IGraphVisitor &ranker) const
Rank Algorithm nodes by the number of data outputs.
~AlgorithmNode()
Destructor.
virtual std::string toString() const =0
value -> string
void addInputDataNode(DataNode *node)
Associate an AlgorithmNode, which is a data consumer of this one.
bool accept(IGraphVisitor &visitor) override
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
A method to update algorithm node decision, and propagate it upwards.
const float & getRank() const
Get Algorithm rank.
void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const override
XXX: CF tests.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
PrecedenceRulesGraph * m_graph
This class is used for returning status codes from appropriate routines.
std::string dumpDataFlow() const
Print out all data origins and destinations, as reflected in the EF graph.
graph_traits< ExecPlan >::vertex_descriptor AlgoVertex
void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map) override
Initialize.
virtual bool visit(DecisionNode &)=0
PropertyBase base class allowing PropertyBase* collections to be "homogeneous".
const std::vector< AlgorithmNode * > getDataIndependentNodes() const
DataNode * getDataNode(const DataObjID &dataPath) const
Get DataNode by DataObject path using graph index.
StatusCode buildAugmentedDataDependenciesRealm()
Build data dependency realm WITH data object nodes participating.
AlgsExecutionStates & getAlgoStates(const int &slotNum) const
void addOutputDataNode(DataNode *node)
Associate an AlgorithmNode, which is a data supplier for this one.
Base class from which all concrete algorithm classes should be derived.
StatusCode initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize graph.
std::vector< InputHandle_t< In > > m_inputs
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
void registerIODataObjects(const Algorithm *algo)
Register algorithm in the Data Dependency index.
int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
Method to set algos to CONTROLREADY, if possible.
void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const override
Print a string representing the control flow state.
void updateEventState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const
XXX CF tests. Is needed for older CF implementation.
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
bool accept(IGraphVisitor &visitor) override
int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
Method to set algos to CONTROLREADY, if possible.
bool promoteToControlReadyState(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
XXX: CF tests. Method to set algos to CONTROLREADY, if possible.
const std::string & getNodeName() const
void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const override
Print a string representing the control flow state.
StatusCode buildDataDependenciesRealm()
Build data dependency realm WITHOUT data object nodes: just interconnect algorithm nodes directly...
~DecisionNode() override
Destructor.
State
Execution states of the algorithms.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map) override
Initialize.
bool promoteToDataReadyState(const int &slotNum, const AlgorithmNode *requestor=nullptr) const
static std::map< State, std::string > stateNames
void addConsumerNode(AlgorithmNode *node)
Associate an AlgorithmNode, which is a data consumer of this one.
StatusCode updateState(unsigned int iAlgo, State newState)