15 else if ( 1 == stateId )
25 for (
auto node : m_children )
delete node;
32 for (
auto daughter : m_children ) daughter->initialize( algname_index_map );
39 if (
std::find( m_parents.begin(), m_parents.end(), node ) == m_parents.end() ) m_parents.push_back( node );
46 if (
std::find( m_children.begin(), m_children.end(), node ) == m_children.end() ) m_children.push_back( node );
51 const std::vector<int>& node_decisions,
const unsigned int& recursionLevel )
const 57 for (
auto daughter : m_children ) {
58 daughter->printState( output, states, node_decisions, recursionLevel + 2 );
67 int decision = ( ( m_allPass && m_modePromptDecision ) ? 1 : -1 );
68 bool hasUndecidedChild =
false;
69 for (
auto daughter : m_children ) {
70 if ( m_modePromptDecision && ( -1 != decision || hasUndecidedChild ) ) {
75 auto res = daughter->updateState( states, node_decisions );
77 hasUndecidedChild =
true;
78 }
else if (
false == m_modeOR && res == 0 ) {
81 else if (
true == m_modeOR && res == 1 ) {
86 if ( !hasUndecidedChild && -1 == decision ) {
88 if (
true == m_modeOR ) {
98 if ( m_allPass ) decision = 1;
107 int decision = ( ( m_allPass && m_modePromptDecision ) ? 1 : -1 );
108 bool keepGoing =
true;
109 bool hasUndecidedChild =
false;
113 for (
auto daughter : m_children ) {
117 if ( m_modePromptDecision && !keepGoing ) {
125 int& res = node_decisions[daughter->getNodeIndex()];
127 hasUndecidedChild =
true;
130 algod->promoteToControlReadyState( slotNum, states, node_decisions );
131 bool result = algod->promoteToDataReadyState( slotNum, requestor );
132 if ( result ) keepGoing =
false;
134 daughter->updateDecision( slotNum, states, node_decisions, requestor );
138 }
else if (
false == m_modeOR && res == 0 ) {
142 }
else if (
true == m_modeOR && res == 1 ) {
149 if ( !hasUndecidedChild && -1 == decision ) {
151 if (
true == m_modeOR ) {
163 if ( -1 != decision )
164 for (
auto p : m_parents ) p->updateDecision( slotNum, states, node_decisions, requestor );
178 for (
auto daughter : m_children ) {
179 auto res = node_decisions[daughter->getNodeIndex()];
181 daughter->promoteToControlReadyState( slotNum, states, node_decisions );
182 if ( m_modePromptDecision )
return true;
183 }
else if ( m_modePromptDecision ) {
184 if ( (
false == m_modeOR && res == 0 ) || (
true == m_modeOR && res == 1 ) )
return true;
197 bool result = visitor.
visit( *
this );
201 for (
auto parent : m_parents ) {
202 parent->accept( visitor );
208 for (
auto child : m_children ) {
209 bool result = child->accept( visitor );
210 if (!m_modeConcurrent)
224 for (
auto node : m_outputs ) {
233 m_algoIndex = algname_index_map.
at( m_algoName );
241 auto&
state = states[m_algoIndex];
244 if ( State::INITIAL ==
state ) {
248 }
else if ( State::CONTROLREADY ==
state ) {
260 auto&
state = states[m_algoIndex];
263 if ( State::CONTROLREADY ==
state ) {
264 if ( dataDependenciesSatisfied( slotNum ) ) {
266 states.updateState( m_algoIndex, State::DATAREADY ).ignore();
282 }
else if ( State::DATAREADY ==
state ) {
284 }
else if ( State::SCHEDULED ==
state ) {
302 for (
auto algoNode : dataNode->getProducers() )
303 if ( State::EVTACCEPTED == states[algoNode->getAlgoIndex()] ) {
316 const std::vector<int>& node_decisions,
const unsigned int& recursionLevel )
const 332 unsigned int decision = -1;
333 if ( State::INITIAL == state ) {
337 if (
true == m_allPass ) {
339 }
else if ( State::EVTACCEPTED == state ) {
340 decision = !m_inverted;
341 }
else if ( State::EVTREJECTED == state ) {
342 decision = m_inverted;
360 if (
true == m_allPass ) {
362 }
else if ( State::EVTACCEPTED == state ) {
363 decision = !m_inverted;
364 }
else if ( State::EVTREJECTED == state ) {
365 decision = m_inverted;
372 if ( -1 != decision ) {
375 for (
auto output : m_outputs )
376 for (
auto consumer :
output->getConsumers() )
377 if (State::CONTROLREADY == states[consumer->getAlgoIndex()])
378 consumer->accept(promoter);
381 for (
auto p : m_parents ) {
394 visitor.
visit( *
this );
405 if (
std::find( m_parents.begin(), m_parents.end(), node ) == m_parents.end() ) m_parents.push_back( node );
412 if (
std::find( m_outputs.begin(), m_outputs.end(), node ) == m_outputs.end() ) m_outputs.push_back( node );
426 m_headNode->initialize( algname_index_map );
428 StatusCode sc = buildAugmentedDataDependenciesRealm();
430 if ( !sc.
isSuccess() ) error() <<
"Could not build the data dependency realm." <<
endmsg;
440 m_eventSlots = &eventSlots;
441 m_headNode->initialize( algname_index_map );
443 StatusCode sc = buildAugmentedDataDependenciesRealm();
445 if ( !sc.
isSuccess() ) error() <<
"Could not build the data dependency realm." <<
endmsg;
448 debug() << dumpDataFlow() <<
endmsg;
463 debug() <<
"Inputs of " << algoName <<
": ";
465 debug() << tag <<
" | ";
468 debug() <<
"Outputs of " << algoName <<
": ";
470 debug() << tag <<
" | ";
481 for (
auto algo : m_algoNameToAlgoNodeMap ) {
483 auto targetNode = m_algoNameToAlgoNodeMap[algo.first];
486 auto& targetInCollection = m_algoNameToAlgoInputsMap[algo.first];
487 for (
auto inputTag : targetInCollection) {
488 for (
auto producer : m_algoNameToAlgoOutputsMap) {
489 auto& outputs = m_algoNameToAlgoOutputsMap[producer.first];
490 for (
auto outputTag : outputs) {
491 if (inputTag == outputTag) {
492 auto& known_producers = targetNode->getSupplierNodes();
493 auto valid_producer = m_algoNameToAlgoNodeMap[producer.first];
494 auto& known_consumers = valid_producer->getConsumerNodes();
495 if (
std::find( known_producers.begin(), known_producers.end(), valid_producer ) ==
496 known_producers.end() )
497 targetNode->addSupplierNode( valid_producer );
498 if (
std::find( known_consumers.begin(), known_consumers.end(), targetNode ) == known_consumers.end() )
499 valid_producer->addConsumerNode( targetNode );
506 auto& targetOutCollection = m_algoNameToAlgoOutputsMap[algo.first];
507 for (
auto outputTag : targetOutCollection) {
508 for (
auto consumer : m_algoNameToAlgoInputsMap) {
509 auto&
inputs = m_algoNameToAlgoInputsMap[consumer.first];
510 for (
auto inputTag :
inputs) {
511 if (inputTag == outputTag) {
512 auto& known_consumers = targetNode->getConsumerNodes();
513 auto valid_consumer = m_algoNameToAlgoNodeMap[consumer.first];
514 auto& known_producers = valid_consumer->getSupplierNodes();
515 if (
std::find( known_producers.begin(), known_producers.end(), targetNode ) == known_producers.end() )
516 valid_consumer->addSupplierNode( targetNode );
517 if (
std::find( known_consumers.begin(), known_consumers.end(), valid_consumer ) ==
518 known_consumers.end() )
519 targetNode->addConsumerNode( valid_consumer );
536 for (
auto algo : m_algoNameToAlgoNodeMap) {
538 auto& outCollection = m_algoNameToAlgoOutputsMap[algo.first];
539 for (
auto outputTag : outCollection) {
540 const auto sc = addDataNode(outputTag);
541 if (!sc.isSuccess()) {
542 error() <<
"Extra producer (" << algo.first <<
") for DataObject @ " 544 <<
" has been detected: this is not allowed." <<
endmsg;
547 auto dataNode = getDataNode(outputTag);
548 dataNode->addProducerNode(algo.second);
549 algo.second->addOutputDataNode(dataNode);
554 for (
auto algo : m_algoNameToAlgoNodeMap ) {
555 auto& inCollection = m_algoNameToAlgoInputsMap[algo.first];
556 for (
auto inputTag : inCollection) {
558 auto primaryPath = inputTag;
559 auto itP = m_dataPathToDataNodeMap.find(primaryPath);
560 if (itP != m_dataPathToDataNodeMap.end()) {
561 dataNode = getDataNode(primaryPath);
579 algo.second->addInputDataNode(dataNode);
594 auto& algoName = algo->
name();
596 auto itP = m_decisionNameToDecisionHubMap.
find( parentName );
598 if ( itP != m_decisionNameToDecisionHubMap.end() ) {
599 parentNode = itP->second;
600 auto itA = m_algoNameToAlgoNodeMap.find( algoName );
602 if ( itA != m_algoNameToAlgoNodeMap.end() ) {
603 algoNode = itA->second;
607 m_algoNameToAlgoNodeMap[algoName] = algoNode;
609 debug() <<
"AlgoNode " << algoName <<
" added @ " << algoNode <<
endmsg;
610 registerIODataObjects(algo);
617 error() <<
"Decision hub node " << parentName <<
", requested to be parent, is not registered." 628 return m_algoNameToAlgoNodeMap.at( algoName );
637 auto itD = m_dataPathToDataNodeMap.find( dataPath );
639 if ( itD != m_dataPathToDataNodeMap.end() ) {
640 dataNode = itD->second;
644 m_dataPathToDataNodeMap[dataPath] = dataNode;
646 debug() <<
" DataNode for " << dataPath <<
" added @ " << dataNode <<
endmsg;
657 return m_dataPathToDataNodeMap.at( dataPath );
662 bool modeConcurrent,
bool modePromptDecision,
bool modeOR,
bool allPass)
667 auto& decisionHubName = decisionHubAlgo->
name();
669 auto itP = m_decisionNameToDecisionHubMap.
find( parentName );
671 if ( itP != m_decisionNameToDecisionHubMap.end() ) {
672 parentNode = itP->second;
673 auto itA = m_decisionNameToDecisionHubMap.find( decisionHubName );
675 if ( itA != m_decisionNameToDecisionHubMap.end() ) {
676 decisionHubNode = itA->second;
679 new concurrency::DecisionNode( *
this, m_nodeCounter, decisionHubName, modeConcurrent, modePromptDecision, modeOR, allPass);
681 m_decisionNameToDecisionHubMap[decisionHubName] = decisionHubNode;
683 debug() <<
"Decision hub node " << decisionHubName <<
" added @ " << decisionHubNode <<
endmsg;
690 error() <<
"Decision hub node " << parentName <<
", requested to be parent, is not registered." 701 auto itH = m_decisionNameToDecisionHubMap.find( headName );
702 if ( itH != m_decisionNameToDecisionHubMap.end() ) {
703 m_headNode = itH->second;
705 m_headNode =
new concurrency::DecisionNode( *
this, m_nodeCounter, headName, modeConcurrent, modePromptDecision, modeOR, allPass );
707 m_decisionNameToDecisionHubMap[headName] = m_headNode;
714 m_headNode->updateState( algo_states, node_decisions );
724 auto& slot = (*m_eventSlots)[slotNum];
726 getAlgorithmNode( algo_name )->accept(updater);
733 info() <<
"Starting ranking by data outputs .. " <<
endmsg;
734 for (
auto& pair : m_algoNameToAlgoNodeMap) {
736 debug() <<
" Ranking " << pair.first <<
"... " <<
endmsg;
737 pair.second->accept(ranker);
739 debug() <<
" ... rank of " << pair.first <<
": " << pair.second->getRank() <<
endmsg;
749 for (
auto node : m_algoNameToAlgoInputsMap) {
751 if (collection.
empty())
752 result.
push_back(getAlgorithmNode(node.first));
760 dumpControlFlow(ost,m_headNode,0);
766 const int& indent)
const {
771 if (node != m_headNode) {
775 ost << ( (dn->
m_modeOR) ?
" [OR] " :
"" );
776 ost << ( (dn->
m_allPass) ?
" [PASS] " :
"" );
781 itr != dth.
end(); ++itr) {
782 dumpControlFlow(ost,*itr,indent+1);
784 }
else if (an != 0) {
788 ost <<
" [n= " << ar.at(0)->cardinality() <<
"]";
789 ost << ( (! ar.at(0)->isClonable()) ?
" [unclonable] " :
"" );
800 const char idt[] =
" ";
803 ost <<
"\n" << idt <<
"====================================\n";
804 ost << idt <<
"Data origins and destinations:\n";
805 ost << idt <<
"====================================\n";
807 for (
auto& pair : m_dataPathToDataNodeMap ) {
809 for (
auto algoNode : pair.second->getProducers() ) ost << idt <<
" " << algoNode->getNodeName() <<
"\n";
811 ost << idt <<
" V\n";
812 ost << idt <<
" o " << pair.first <<
"\n";
813 ost << idt <<
" V\n";
815 for (
auto algoNode : pair.second->getConsumers() ) ost << idt <<
" " << algoNode->getNodeName() <<
"\n";
817 ost << idt <<
"====================================\n";
830 boost::dynamic_properties dp;
836 boost::write_graphml( myfile, m_ExecPlan, dp );
846 if ( u ==
nullptr ) {
847 auto itT = m_exec_plan_map.find(
"ENTRY" );
848 if ( itT != m_exec_plan_map.end() ) {
849 source = itT->second;
852 m_exec_plan_map[
"ENTRY"] = source;
855 auto itS = m_exec_plan_map.find( u->
getNodeName() );
856 if ( itS != m_exec_plan_map.end() ) {
857 source = itS->second;
861 fatal() <<
"could not convert IAlgorithm to Algorithm!" <<
endmsg;
868 debug() <<
"no AvgRuntime for " << alg->name() <<
endmsg;
879 auto itP = m_exec_plan_map.find( v->
getNodeName() );
880 if ( itP != m_exec_plan_map.end() ) {
881 target = itP->second;
885 fatal() <<
"could not convert IAlgorithm to Algorithm!" <<
endmsg;
892 debug() <<
"no AvgRuntime for " << alg->name() <<
endmsg;
902 debug() <<
"Edge added to execution plan" <<
endmsg;
903 boost::add_edge(source, target, m_ExecPlan);
void dumpExecutionPlan()
dump to file encountered execution plan
bool promoteToControlReadyState(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
XXX: CF tests.
const unsigned int & getAlgoIndex() const
XXX: CF tests.
StatusCode addAlgorithmNode(Algorithm *daughterAlgo, const std::string &parentName, bool inverted, bool allPass)
Add algorithm node.
const std::string & name() const override
The identifying name of the algorithm object.
void addDaughterNode(ControlFlowNode *node)
Add a daughter node.
bool dataDependenciesSatisfied(const int &slotNum) const
Method to check whether the Algorithm has its all data dependency satisfied.
const DataObjIDColl & outputDataObjs() const override
bool isSuccess() const
Test for a status code of SUCCESS.
virtual bool visit(DecisionNode &)
StatusCode addDecisionHubNode(Algorithm *daughterAlgo, const std::string &parentName, bool modeConcurrent, bool modePromptDecision, bool modeOR, bool allPass)
Add a node, which aggregates decisions of direct daughter nodes.
const std::vector< IAlgorithm * > & getAlgorithmRepresentatives() const
get Algorithm representatives
void addHeadNode(const std::string &headName, bool modeConcurrent, bool modePromptDecision, bool modeOR, bool allPass)
Add a node, which has no parents.
void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const override
XXX: CF tests.
std::string stateToString(const int &stateId) const
Translation between state id and name.
void addEdgeToExecutionPlan(const AlgorithmNode *u, const AlgorithmNode *v)
set cause-effect connection between two algorithms in the execution plan
StatusCode addDataNode(const DataObjID &dataPath)
Add DataNode that represents DataObject.
AlgorithmNode * getAlgorithmNode(const std::string &algoName) const
Get the AlgorithmNode from by algorithm name using graph index.
void rankAlgorithms(IGraphVisitor &ranker) const
Rank Algorithm nodes by the number of data outputs.
bool m_allPass
Whether always passing regardless of daughter results.
~AlgorithmNode()
Destructor.
virtual std::string toString() const =0
value -> string
virtual bool visitEnter(DecisionNode &) const
bool m_modeOR
Whether acting as "and" (false) or "or" node (true)
void addInputDataNode(DataNode *node)
Associate an AlgorithmNode, which is a data consumer of this one.
bool accept(IGraphVisitor &visitor) override
void updateDecision(const std::string &algo_name, const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const
A method to update algorithm node decision, and propagate it upwards.
const float & getRank() const
Get Algorithm rank.
void updateDecision(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions, const AlgorithmNode *requestor=nullptr) const override
XXX: CF tests.
std::vector< EventSlot > * m_eventSlots
The AlgsExecutionStates encodes the state machine for the execution of algorithms within a single eve...
PrecedenceRulesGraph * m_graph
This class is used for returning status codes from appropriate routines.
const DataObjIDColl & inputDataObjs() const override
std::string dumpDataFlow() const
Print out all data origins and destinations, as reflected in the EF graph.
const std::vector< ControlFlowNode * > & getDaughters() const
graph_traits< ExecPlan >::vertex_descriptor AlgoVertex
void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map) override
Initialize.
PropertyBase base class allowing PropertyBase* collections to be "homogeneous".
const std::vector< AlgorithmNode * > getDataIndependentNodes() const
DataNode * getDataNode(const DataObjID &dataPath) const
Get DataNode by DataObject path using graph index.
bool m_modePromptDecision
Whether to evaluate the hub decision ASA its child decisions allow to do that.
StatusCode buildAugmentedDataDependenciesRealm()
Build data dependency realm WITH data object nodes participating.
AlgsExecutionStates & getAlgoStates(const int &slotNum) const
void addOutputDataNode(DataNode *node)
Associate an AlgorithmNode, which is a data supplier for this one.
Base class from which all concrete algorithm classes should be derived.
StatusCode initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map)
Initialize graph.
std::vector< InputHandle_t< In > > m_inputs
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
void registerIODataObjects(const Algorithm *algo)
Register algorithm in the Data Dependency index.
bool m_modeConcurrent
Whether all daughters will be evaluated concurrently or sequentially.
int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
Method to set algos to CONTROLREADY, if possible.
void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const override
Print a string representing the control flow state.
void updateEventState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const
XXX CF tests. Is needed for older CF implementation.
void addParentNode(DecisionNode *node)
XXX: CF tests. Method to add a parent node.
bool accept(IGraphVisitor &visitor) override
int updateState(AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
Method to set algos to CONTROLREADY, if possible.
bool promoteToControlReadyState(const int &slotNum, AlgsExecutionStates &states, std::vector< int > &node_decisions) const override
XXX: CF tests. Method to set algos to CONTROLREADY, if possible.
const std::string & getNodeName() const
void printState(std::stringstream &output, AlgsExecutionStates &states, const std::vector< int > &node_decisions, const unsigned int &recursionLevel) const override
Print a string representing the control flow state.
StatusCode buildDataDependenciesRealm()
Build data dependency realm WITHOUT data object nodes: just interconnect algorithm nodes directly...
std::string dumpControlFlow() const
Print out control flow of Algorithms and Sequences.
~DecisionNode() override
Destructor.
State
Execution states of the algorithms.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
void initialize(const std::unordered_map< std::string, unsigned int > &algname_index_map) override
Initialize.
bool promoteToDataReadyState(const int &slotNum, const AlgorithmNode *requestor=nullptr) const
static std::map< State, std::string > stateNames
void addConsumerNode(AlgorithmNode *node)
Associate an AlgorithmNode, which is a data consumer of this one.
StatusCode updateState(unsigned int iAlgo, State newState)