13 else if ( 1 == stateId )
23 for (
auto node : m_children )
delete node;
30 for (
auto daughter : m_children ) daughter->initialize( algname_index_map );
37 if (
std::find( m_parents.begin(), m_parents.end(), node ) == m_parents.end() ) m_parents.push_back( node );
44 if (
std::find( m_children.begin(), m_children.end(), node ) == m_children.end() ) m_children.push_back( node );
49 const std::vector<int>& node_decisions,
const unsigned int& recursionLevel )
const 55 for (
auto daughter : m_children ) {
56 daughter->printState( output, states, node_decisions, recursionLevel + 2 );
65 int decision = ( ( m_allPass && m_isLazy ) ? 1 : -1 );
66 bool hasUndecidedChild =
false;
67 for (
auto daughter : m_children ) {
68 if ( m_isLazy && ( -1 != decision || hasUndecidedChild ) ) {
73 auto res = daughter->updateState( states, node_decisions );
75 hasUndecidedChild =
true;
76 }
else if (
false == m_modeOR && res == 0 ) {
79 else if (
true == m_modeOR && res == 1 ) {
84 if ( !hasUndecidedChild && -1 == decision ) {
86 if (
true == m_modeOR ) {
96 if ( m_allPass ) decision = 1;
105 int decision = ( ( m_allPass && m_isLazy ) ? 1 : -1 );
106 bool keepGoing =
true;
107 bool hasUndecidedChild =
false;
111 for (
auto daughter : m_children ) {
115 if ( m_isLazy && !keepGoing ) {
123 int& res = node_decisions[daughter->getNodeIndex()];
125 hasUndecidedChild =
true;
128 algod->promoteToControlReadyState( slotNum, states, node_decisions );
129 bool result = algod->promoteToDataReadyState( slotNum, requestor );
130 if ( result ) keepGoing =
false;
132 daughter->updateDecision( slotNum, states, node_decisions, requestor );
136 }
else if (
false == m_modeOR && res == 0 ) {
140 }
else if (
true == m_modeOR && res == 1 ) {
147 if ( !hasUndecidedChild && -1 == decision ) {
149 if (
true == m_modeOR ) {
161 if ( -1 != decision )
162 for (
auto p : m_parents ) p->updateDecision( slotNum, states, node_decisions, requestor );
176 for (
auto daughter : m_children ) {
177 auto res = node_decisions[daughter->getNodeIndex()];
179 daughter->promoteToControlReadyState( slotNum, states, node_decisions );
180 if ( m_isLazy )
return true;
181 }
else if ( m_isLazy ) {
182 if ( (
false == m_modeOR && res == 0 ) || (
true == m_modeOR && res == 1 ) )
return true;
194 bool result = visitor.
visit( *
this );
195 if ( result )
return visitor.
visitLeave( *
this );
197 for (
auto child : m_children ) {
198 bool keepGoing = child->accept( visitor );
199 if ( m_isLazy && !keepGoing )
return false;
210 for (
auto node : m_outputs ) {
219 m_algoIndex = algname_index_map.
at( m_algoName );
227 auto&
state = states[m_algoIndex];
230 if ( State::INITIAL ==
state ) {
234 }
else if ( State::CONTROLREADY ==
state ) {
246 auto&
state = states[m_algoIndex];
249 if ( State::CONTROLREADY ==
state ) {
250 if ( dataDependenciesSatisfied( slotNum ) ) {
252 states.updateState( m_algoIndex, State::DATAREADY ).ignore();
268 }
else if ( State::DATAREADY ==
state ) {
270 }
else if ( State::SCHEDULED ==
state ) {
284 for (
auto dataNode : m_inputs ) {
288 for (
auto algoNode : dataNode->getProducers() )
289 if ( State::EVTACCEPTED == states[algoNode->getAlgoIndex()] ) {
305 for (
auto dataNode : m_inputs ) {
308 for (
auto algoNode : dataNode->getProducers() )
309 if ( State::EVTACCEPTED == states[algoNode->getAlgoIndex()] ) {
314 if ( !result )
break;
322 const std::vector<int>& node_decisions,
const unsigned int& recursionLevel )
const 338 unsigned int decision = -1;
339 if ( State::INITIAL == state ) {
343 if (
true == m_allPass ) {
345 }
else if ( State::EVTACCEPTED == state ) {
346 decision = !m_inverted;
347 }
else if ( State::EVTREJECTED == state ) {
348 decision = m_inverted;
366 if (
true == m_allPass ) {
368 }
else if ( State::EVTACCEPTED == state ) {
369 decision = !m_inverted;
370 }
else if ( State::EVTREJECTED == state ) {
371 decision = m_inverted;
378 if ( -1 != decision ) {
379 for (
auto output : m_outputs )
380 for (
auto consumer :
output->getConsumers() ) consumer->promoteToDataReadyState( slotNum, requestor );
382 for (
auto p : m_parents ) p->updateDecision( slotNum, states, node_decisions, requestor );
391 bool result = visitor.
visit( *
this );
392 if ( result )
return false;
402 if (
std::find( m_parents.begin(), m_parents.end(), node ) == m_parents.end() ) m_parents.push_back( node );
409 if (
std::find( m_outputs.begin(), m_outputs.end(), node ) == m_outputs.end() ) m_outputs.push_back( node );
416 if (
std::find( m_inputs.begin(), m_inputs.end(), node ) == m_inputs.end() ) m_inputs.push_back( node );
423 m_headNode->initialize( algname_index_map );
425 StatusCode sc = buildAugmentedDataDependenciesRealm();
427 if ( !sc.
isSuccess() ) error() <<
"Could not build the data dependency realm." <<
endmsg;
437 m_eventSlots = &eventSlots;
438 m_headNode->initialize( algname_index_map );
440 StatusCode sc = buildAugmentedDataDependenciesRealm();
442 if ( !sc.
isSuccess() ) error() <<
"Could not build the data dependency realm." <<
endmsg;
445 debug() << dumpDataFlow() <<
endmsg;
460 m_algoNameToAlgoInputsMap[algoName] = inputObjs;
461 m_algoNameToAlgoOutputsMap[algoName] = outputObjs;
464 debug() <<
"Inputs of " << algoName <<
": ";
465 for (
auto tag : inputObjs)
466 debug() << tag <<
" | ";
469 debug() <<
"Outputs of " << algoName <<
": ";
470 for (
auto tag : outputObjs)
471 debug() << tag <<
" | ";
482 for (
auto algo : m_algoNameToAlgoNodeMap ) {
484 auto targetNode = m_algoNameToAlgoNodeMap[algo.first];
487 auto& targetInCollection = m_algoNameToAlgoInputsMap[algo.first];
488 for (
auto inputTag : targetInCollection) {
489 for (
auto producer : m_algoNameToAlgoOutputsMap) {
490 auto& outputs = m_algoNameToAlgoOutputsMap[producer.first];
491 for (
auto outputTag : outputs) {
492 if (inputTag == outputTag) {
493 auto& known_producers = targetNode->getSupplierNodes();
494 auto valid_producer = m_algoNameToAlgoNodeMap[producer.first];
495 auto& known_consumers = valid_producer->getConsumerNodes();
496 if (
std::find( known_producers.begin(), known_producers.end(), valid_producer ) ==
497 known_producers.end() )
498 targetNode->addSupplierNode( valid_producer );
499 if (
std::find( known_consumers.begin(), known_consumers.end(), targetNode ) == known_consumers.end() )
500 valid_producer->addConsumerNode( targetNode );
507 auto& targetOutCollection = m_algoNameToAlgoOutputsMap[algo.first];
508 for (
auto outputTag : targetOutCollection) {
509 for (
auto consumer : m_algoNameToAlgoInputsMap) {
510 auto&
inputs = m_algoNameToAlgoInputsMap[consumer.first];
511 for (
auto inputTag :
inputs) {
512 if (inputTag == outputTag) {
513 auto& known_consumers = targetNode->getConsumerNodes();
514 auto valid_consumer = m_algoNameToAlgoNodeMap[consumer.first];
515 auto& known_producers = valid_consumer->getSupplierNodes();
516 if (
std::find( known_producers.begin(), known_producers.end(), targetNode ) == known_producers.end() )
517 valid_consumer->addSupplierNode( targetNode );
518 if (
std::find( known_consumers.begin(), known_consumers.end(), valid_consumer ) ==
519 known_consumers.end() )
520 targetNode->addConsumerNode( valid_consumer );
537 for (
auto algo : m_algoNameToAlgoNodeMap) {
539 auto& outCollection = m_algoNameToAlgoOutputsMap[algo.first];
540 for (
auto outputTag : outCollection) {
541 const auto sc = addDataNode(outputTag);
542 if (!sc.isSuccess()) {
543 error() <<
"Extra producer (" << algo.first <<
") for DataObject @ " 545 <<
" has been detected: this is not allowed." <<
endmsg;
548 auto dataNode = getDataNode(outputTag);
549 dataNode->addProducerNode(algo.second);
550 algo.second->addOutputDataNode(dataNode);
555 for (
auto algo : m_algoNameToAlgoNodeMap ) {
556 auto& inCollection = m_algoNameToAlgoInputsMap[algo.first];
557 for (
auto inputTag : inCollection) {
559 auto primaryPath = inputTag;
560 auto itP = m_dataPathToDataNodeMap.find(primaryPath);
561 if (itP != m_dataPathToDataNodeMap.end()) {
562 dataNode = getDataNode(primaryPath);
580 algo.second->addInputDataNode(dataNode);
595 auto& algoName = algo->
name();
597 auto itP = m_decisionNameToDecisionHubMap.
find( parentName );
599 if ( itP != m_decisionNameToDecisionHubMap.end() ) {
600 parentNode = itP->second;
601 auto itA = m_algoNameToAlgoNodeMap.find( algoName );
603 if ( itA != m_algoNameToAlgoNodeMap.end() ) {
604 algoNode = itA->second;
608 m_algoNameToAlgoNodeMap[algoName] = algoNode;
610 debug() <<
"AlgoNode " << algoName <<
" added @ " << algoNode <<
endmsg;
611 registerIODataObjects(algo);
618 error() <<
"DecisionHubNode " << parentName <<
", meant to be used as parent, is not registered in the EFG." 629 return m_algoNameToAlgoNodeMap.at( algoName );
638 auto itD = m_dataPathToDataNodeMap.find( dataPath );
640 if ( itD != m_dataPathToDataNodeMap.end() ) {
641 dataNode = itD->second;
645 m_dataPathToDataNodeMap[dataPath] = dataNode;
647 debug() <<
" DataNode for " << dataPath <<
" added @ " << dataNode <<
endmsg;
658 return m_dataPathToDataNodeMap.at( dataPath );
663 bool modeOR,
bool allPass,
bool isLazy )
668 auto& decisionHubName = decisionHubAlgo->
name();
670 auto itP = m_decisionNameToDecisionHubMap.
find( parentName );
672 if ( itP != m_decisionNameToDecisionHubMap.end() ) {
673 parentNode = itP->second;
674 auto itA = m_decisionNameToDecisionHubMap.find( decisionHubName );
676 if ( itA != m_decisionNameToDecisionHubMap.end() ) {
677 decisionHubNode = itA->second;
682 m_decisionNameToDecisionHubMap[decisionHubName] = decisionHubNode;
684 debug() <<
"DecisionHubNode " << decisionHubName <<
" added @ " << decisionHubNode <<
endmsg;
691 error() <<
"DecisionHubNode " << parentName <<
", meant to be used as parent, is not registered in the EFG." 702 auto itH = m_decisionNameToDecisionHubMap.find( headName );
703 if ( itH != m_decisionNameToDecisionHubMap.end() ) {
704 m_headNode = itH->second;
708 m_decisionNameToDecisionHubMap[headName] = m_headNode;
715 m_headNode->updateState( algo_states, node_decisions );
724 getAlgorithmNode( algo_name )->updateDecision( slotNum, algo_states, node_decisions );
731 info() <<
"Starting ranking by data outputs .. " <<
endmsg;
732 for (
auto& pair : m_algoNameToAlgoNodeMap) {
734 debug() <<
" Ranking " << pair.first <<
"... " <<
endmsg;
735 pair.second->accept(ranker);
737 debug() <<
" ... rank of " << pair.first <<
": " << pair.second->getRank() <<
endmsg;
747 for (
auto node : m_algoNameToAlgoInputsMap) {
749 if (collection.
empty())
750 result.
push_back(getAlgorithmNode(node.first));
760 const char idt[] =
" ";
763 ost <<
"\n" << idt <<
"====================================\n";
764 ost << idt <<
"Data origins and destinations:\n";
765 ost << idt <<
"====================================\n";
767 for (
auto& pair : m_dataPathToDataNodeMap ) {
769 for (
auto algoNode : pair.second->getProducers() ) ost << idt <<
" " << algoNode->getNodeName() <<
"\n";
771 ost << idt <<
" V\n";
772 ost << idt <<
" o " << pair.first <<
"\n";
773 ost << idt <<
" V\n";
775 for (
auto algoNode : pair.second->getConsumers() ) ost << idt <<
" " << algoNode->getNodeName() <<
"\n";
777 ost << idt <<
"====================================\n";
790 boost::dynamic_properties dp;
796 boost::write_graphml( myfile, m_ExecPlan, dp );
806 if ( u ==
nullptr ) {
807 auto itT = m_exec_plan_map.find(
"ENTRY" );
808 if ( itT != m_exec_plan_map.end() ) {
809 source = itT->second;
812 m_exec_plan_map[
"ENTRY"] = source;
815 auto itS = m_exec_plan_map.find( u->
getNodeName() );
816 if ( itS != m_exec_plan_map.end() ) {
817 source = itS->second;
821 fatal() <<
"could not convert IAlgorithm to Algorithm!" <<
endmsg;
828 debug() <<
"no AvgRuntime for " << alg->name() <<
endmsg;
839 auto itP = m_exec_plan_map.find( v->
getNodeName() );
840 if ( itP != m_exec_plan_map.end() ) {
841 target = itP->second;
845 fatal() <<
"could not convert IAlgorithm to Algorithm!" <<
endmsg;
852 debug() <<
"no AvgRuntime for " << alg->name() <<
endmsg;
862 debug() <<
"Edge added to execution plan" <<
endmsg;
863 boost::add_edge(source, target, m_ExecPlan);
std::string stateToString(const int &stateId) const
Translation between state id and name.
bool promoteToControlReadyState(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
XXX: CF tests.
virtual bool visitEnter(DecisionNode &) const =0
const std::vector< AlgorithmNode * > getDataIndependentNodes() const
const unsigned int & getAlgoIndex() const
XXX: CF tests.
StatusCode initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize graph.
const std::string & name() const override
The identifying name of the algorithm object.
StatusCode buildAugmentedDataDependenciesRealm()
Build data dependency realm WITH data object nodes participating.
void addDaughterNode(ControlFlowNode *node)
Add a daughter node.
StatusCode addDecisionHubNode(Algorithm *daughterAlgo, const std::string &parentName, bool modeOR, bool allPass, bool isLazy)
Add a node, which aggregates decisions of direct daughter nodes.
virtual void acceptDHVisitor(IDataHandleVisitor *) const override
bool dataDependenciesSatisfied(const int &slotNum) const
Method to check whether the Algorithm has its all data dependency satisfied.
bool isSuccess() const
Test for a status code of SUCCESS.
const std::vector< IAlgorithm * > & getAlgorithmRepresentatives() const
get Algorithm representatives
void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const override
XXX: CF tests.
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
A method to update algorithm node decision, and propagate it upwards.
std::string dumpDataFlow() const
Print out all data origins and destinations, as reflected in the EF graph.
ExecutionFlowGraph * m_graph
~AlgorithmNode()
Destructor.
virtual std::string toString() const =0
value -> string
DataNode * getDataNode(const DataObjID &dataPath) const
Get DataNode by DataObject path using graph index.
AlgsExecutionStates & getAlgoStates(const int &slotNum) const
void registerIODataObjects(const Algorithm *algo)
Register algorithm in the Data Dependency index.
void addInputDataNode(DataNode *node)
Associate an AlgorithmNode, which is a data consumer of this one.
bool accept(IGraphVisitor &visitor) override
const float & getRank() const
Get Algorithm rank.
void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const override
XXX: CF tests.
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
This class is used for returning status codes from appropriate routines.
graph_traits< ExecPlan >::vertex_descriptor AlgoVertex
void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map) override
Initialize.
virtual bool visit(DecisionNode &)=0
PropertyBase base class allowing PropertyBase* collections to be "homogeneous".
StatusCode buildDataDependenciesRealm()
Build data dependency realm WITHOUT data object nodes: just interconnect algorithm nodes directly...
void addOutputDataNode(DataNode *node)
Associate an AlgorithmNode, which is a data supplier for this one.
Base class from which all concrete algorithm classes should be derived.
void rankAlgorithms(IGraphVisitor &ranker) const
Rank Algorithm nodes by the number of data outputs.
void addEdgeToExecutionPlan(const AlgorithmNode *u, const AlgorithmNode *v)
set cause-effect connection between two algorithms in the execution plan
StatusCode addAlgorithmNode(Algorithm *daughterAlgo, const std::string &parentName, bool inverted, bool allPass)
Add algorithm node.
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
virtual bool visitLeave(DecisionNode &) const =0
void dumpExecutionPlan()
dump to file encountered execution plan
int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
Method to set algos to CONTROLREADY, if possible.
void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const override
Print a string representing the control flow state.
void addHeadNode(const std::string &headName, bool modeOR, bool allPass, bool isLazy)
Add a node, which has no parents.
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
bool accept(IGraphVisitor &visitor) override
int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
Method to set algos to CONTROLREADY, if possible.
bool promoteToControlReadyState(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
XXX: CF tests. Method to set algos to CONTROLREADY, if possible.
const std::string & getNodeName() const
void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const override
Print a string representing the control flow state.
~DecisionNode() override
Destructor.
State
Execution states of the algorithms.
void updateEventState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const
XXX CF tests. Is needed for older CF implementation.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map) override
Initialize.
bool promoteToDataReadyState(const int &slotNum, const AlgorithmNode *requestor=nullptr) const
static std::map< State, std::string > stateNames
StatusCode addDataNode(const DataObjID &dataPath)
Add DataNode that represents DataObject.
void addConsumerNode(AlgorithmNode *node)
Associate an AlgorithmNode, which is a data consumer of this one.
StatusCode updateState(unsigned int iAlgo, State newState)