AlgResourcePool.cpp
Go to the documentation of this file.
1 // Include Files
2 
3 // Framework
4 #include "AlgResourcePool.h"
9 
10 // C++
11 #include <functional>
12 #include <queue>
13 
14 // DP TODO: Manage smartifs and not pointers to algos
15 
16 // Instantiation of a static factory class used by clients to create instances of this service
18 
19 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
20 #define DEBUG_MSG ON_DEBUG debug()
21 
22 //---------------------------------------------------------------------------
23 
24 // destructor
26 
27  for (auto& algoId_algoQueue : m_algqueue_map){
28  auto* queue = algoId_algoQueue.second;
29  delete queue;
30  }
31 
32  delete m_EFGraph;
33 }
34 
35 //---------------------------------------------------------------------------
36 
37 // initialize the pool with the list of algos known to the IAlgManager
39 
41  if (!sc.isSuccess())
42  warning () << "Base class could not be started" << endmsg;
43 
44  // Try to recover the topAlgList from the ApplicationManager for backward-compatibility
45  if (m_topAlgNames.value().empty()){
46  info() << "TopAlg list empty. Recovering the one of Application Manager" << endmsg;
47  const Gaudi::Utils::TypeNameString appMgrName("ApplicationMgr/ApplicationMgr");
48  SmartIF<IProperty> appMgrProps (serviceLocator()->service(appMgrName));
49  m_topAlgNames.assign(appMgrProps->getProperty("TopAlg"));
50  }
51 
52  // XXX: Prepare empty Control Flow graph
53  const std::string& name = "ExecutionFlowGraph";
56 
57  sc = decodeTopAlgs();
58  if (sc.isFailure())
59  warning() << "Algorithms could not be properly decoded." << endmsg;
60 
61  // let's assume all resources are there
63  return StatusCode::SUCCESS;
64 }
65 
66 //---------------------------------------------------------------------------
67 
69 
70  StatusCode startSc = Service::start();
71  if ( ! startSc.isSuccess() ) return startSc;
72 
73  // sys-Start the algos
74  for (auto& ialgo : m_algList){
75  startSc = ialgo->sysStart();
76  if (startSc.isFailure()){
77  error() << "Unable to start Algorithm: " << ialgo->name() << endmsg;
78  return startSc;
79  }
80  }
81  return StatusCode::SUCCESS;
82 }
83 
84 //---------------------------------------------------------------------------
85 
87 
88  std::hash<std::string> hash_function;
89  size_t algo_id = hash_function(name);
90  auto itQueueIAlgPtr = m_algqueue_map.find(algo_id);
91 
92  if (itQueueIAlgPtr == m_algqueue_map.end()) {
93  error() << "Algorithm " << name << " requested, but not recognised"
94  << endmsg;
95  algo = nullptr;
96  return StatusCode::FAILURE;
97  }
98 
99  StatusCode sc;
100  if (blocking) {
101  itQueueIAlgPtr->second->pop(algo);
102  sc = StatusCode::SUCCESS;
103  } else {
104  sc = itQueueIAlgPtr->second->try_pop(algo);
105  }
106 
107  // Note that reentrant algos are not consumed so we put them
108  // back immediately in the queue at the end of this function.
109  // Now we may still be called again in between and get this
110  // error. In such a case, the Scheduler will retry later.
111  // This is of course not optimal, but should only happen very
112  // seldom and thud won't affect the global efficiency
113  if(sc.isFailure())
114  DEBUG_MSG << "No instance of algorithm " << name << " could be retrieved in non-blocking mode" << endmsg;
115 
116  // if (m_lazyCreation ) {
117  // TODO: fill the lazyCreation part
118  // }
119  if (sc.isSuccess()){
120  state_type requirements = m_resource_requirements[algo_id];
122  if (requirements.is_subset_of(m_available_resources)) {
123  m_available_resources^=requirements;
124  } else {
125  sc = StatusCode::FAILURE;
126  error() << "Failure to allocate resources of algorithm " << name << endmsg;
127  // in case of not reentrant, push it back. Reentrant ones are pushed back
128  // in all cases further down
129  if (0 != algo->cardinality()) {
130  itQueueIAlgPtr->second->push(algo);
131  }
132  }
134  if (0 == algo->cardinality()) {
135  // push back reentrant algos immediately as it can be reused
136  itQueueIAlgPtr->second->push(algo);
137  }
138  }
139  return sc;
140 }
141 
142 //---------------------------------------------------------------------------
143 
145 
146  std::hash<std::string> hash_function;
147  size_t algo_id = hash_function(name);
148 
149  // release resources used by the algorithm
153 
154  // release algorithm itself if not reentrant
155  if (0 != algo->cardinality()) {
156  m_algqueue_map[algo_id]->push(algo);
157  }
158  return StatusCode::SUCCESS;
159  }
160 
161 //---------------------------------------------------------------------------
162 
167  return StatusCode::SUCCESS;
168 }
169 
170 //---------------------------------------------------------------------------
171 
176  return StatusCode::SUCCESS;
177 }
178 
179 //---------------------------------------------------------------------------
180 
181 StatusCode AlgResourcePool::flattenSequencer(Algorithm* algo, ListAlg& alglist, const std::string& parentName, unsigned int recursionDepth){
182 
184 
185  std::vector<Algorithm*>* subAlgorithms = algo->subAlgorithms();
186  auto isGaudiSequencer = dynamic_cast<GaudiSequencer*> (algo);
187  if ( //we only want to add basic algorithms -> have no subAlgs
188  // and exclude the case of empty GaudiSequencers
189  (subAlgorithms->empty() and not isGaudiSequencer)
190  // we want to add non-empty GaudiAtomicSequencers
191  or (algo->type() == "GaudiAtomicSequencer" and not subAlgorithms->empty())){
192 
193  DEBUG_MSG << std::string(recursionDepth, ' ') << algo->name() << " is "
194  << (algo->type() != "GaudiAtomicSequencer" ? "not a sequencer" : "an atomic sequencer")
195  << ". Appending it" << endmsg;
196 
197  alglist.emplace_back(algo);
198  m_EFGraph->addAlgorithmNode(algo, parentName, false, false).ignore();
199  return sc;
200  }
201 
202  // Recursively unroll
203  ++recursionDepth;
204  DEBUG_MSG << std::string(recursionDepth, ' ') << algo->name() << " is a sequencer. Flattening it." << endmsg;
205  bool modeOR = false;
206  bool allPass = false;
207  bool isLazy = false;
208  if ( isGaudiSequencer ) {
209  modeOR = (algo->getProperty("ModeOR").toString() == "True")? true : false;
210  allPass = (algo->getProperty("IgnoreFilterPassed").toString() == "True")? true : false;
211  isLazy = (algo->getProperty("ShortCircuit").toString() == "True")? true : false;
212  if (allPass) isLazy = false; // standard GaudiSequencer behavior on all pass is to execute everything
213  }
214  sc = m_EFGraph->addDecisionHubNode(algo, parentName, modeOR, allPass, isLazy);
215  if (sc.isFailure()) {
216  error() << "Failed to add DecisionHub " << algo->name() << " to execution flow graph" << endmsg;
217  return sc;
218  }
219 
220  for (Algorithm* subalgo : *subAlgorithms ) {
221  sc = flattenSequencer(subalgo,alglist,algo->name(),recursionDepth);
222  if (sc.isFailure()) {
223  error() << "Algorithm " << subalgo->name() << " could not be flattened" << endmsg;
224  return sc;
225  }
226  }
227  return sc;
228 }
229 
230 //---------------------------------------------------------------------------
231 
233 
235  if (!algMan.isValid()){
236  error() << "Algorithm manager could not be properly fetched." << endmsg;
237  return StatusCode::FAILURE;
238  }
239 
240  // Useful lambda not to repeat ourselves --------------------------
241  auto createAlg = [&algMan,this] (const std::string& item_type,
242  const std::string& item_name,
243  IAlgorithm*& algo){
244  StatusCode createAlgSc = algMan->createAlgorithm(item_type,
245  item_name,
246  algo,
247  true,
248  false);
249  if (createAlgSc.isFailure())
250  this->warning() << "Algorithm " << item_type << "/" << item_name
251  << " could not be created." << endmsg;
252  };
253  // End of lambda --------------------------------------------------
254 
256 
257  // Fill the top alg list ----
258  const std::vector<std::string>& topAlgNames = m_topAlgNames.value();
259  for (auto& name : topAlgNames) {
260  IAlgorithm* algo(nullptr);
261 
263  const std::string& item_name = item.name();
264  const std::string& item_type = item.type();
265  SmartIF<IAlgorithm> algoSmartIF (algMan->algorithm(item_name,false));
266 
267  if (!algoSmartIF.isValid()){
268  createAlg(item_type,item_name,algo);
269  algoSmartIF = algo;
270  }
271  // Init and start
272  algoSmartIF->sysInitialize();
273  m_topAlgList.push_back(algoSmartIF);
274  }
275  // Top Alg list filled ----
276 
277  // start forming the control flow graph by adding the head node
278  m_EFGraph->addHeadNode("EVENT LOOP",true,true,false);
279 
280  // Now we unroll it ----
281  for (auto& algoSmartIF : m_topAlgList){
282  Algorithm* algorithm = dynamic_cast<Algorithm*> (algoSmartIF.get());
283  if (!algorithm) fatal() << "Conversion from IAlgorithm to Algorithm failed" << endmsg;
284  sc = flattenSequencer(algorithm, m_flatUniqueAlgList, "EVENT LOOP");
285  }
286  if (msgLevel(MSG::DEBUG)){
287  debug() << "List of algorithms is: " << endmsg;
288  for (auto& algo : m_flatUniqueAlgList)
289  debug() << " o " << algo->type() << "/" << algo->name() << " @ " << algo << endmsg;
290  }
291 
292  // Unrolled ---
293 
294  // Now let's manage the clones
295  unsigned int resource_counter(0);
296  std::hash<std::string> hash_function;
297  for (auto& ialgoSmartIF : m_flatUniqueAlgList) {
298 
299  const std::string& item_name = ialgoSmartIF->name();
300 
301  verbose() << "Treating resource management and clones of " << item_name << endmsg;
302 
303  Algorithm* algo = dynamic_cast<Algorithm*> ( ialgoSmartIF.get() );
304  if (!algo) fatal() << "Conversion from IAlgorithm to Algorithm failed" << endmsg;
305  const std::string& item_type = algo->type();
306 
307  size_t algo_id = hash_function(item_name);
309  m_algqueue_map[algo_id] = queue;
310 
311  // DP TODO Do it properly with SmartIFs, also in the queues
312  IAlgorithm* ialgo(ialgoSmartIF.get());
313 
314  queue->push(ialgo);
315  m_algList.push_back(ialgo);
316  m_n_of_allowed_instances[algo_id] = ialgo->cardinality();
317  m_n_of_created_instances[algo_id] = 1;
318 
319  state_type requirements(0);
320 
321  for (auto& resource_name : ialgo->neededResources()){
322  auto ret = m_resource_indices.insert(std::pair<std::string, unsigned int>(resource_name,resource_counter));
323  // insert successful means == wasn't known before. So increment counter
324  if (ret.second==true) {
325  ++resource_counter;
326  }
327  // Resize for every algo according to the found resources
328  requirements.resize(resource_counter);
329  // in any case the return value holds the proper product index
330  requirements[ret.first->second] = true;
331 
332  }
333 
334  m_resource_requirements[algo_id] = requirements;
335 
336  // potentially create clones; if not lazy creation we have to do it now
337  if (!m_lazyCreation) {
338  for (unsigned int i =1, end =ialgo->cardinality();i<end; ++i){
339  debug() << "type/name to create clone of: " << item_type << "/"
340  << item_name << endmsg;
341  IAlgorithm* ialgoClone(nullptr);
342  createAlg(item_type,item_name,ialgoClone);
343  ialgoClone->setIndex(i);
344  if (ialgoClone->sysInitialize().isFailure()) {
345  error() << "unable to initialize Algorithm clone "
346  << ialgoClone->name() << endmsg;
347  sc = StatusCode::FAILURE;
348  // FIXME: should we delete this failed clone?
349  } else {
350  queue->push(ialgoClone);
351  m_n_of_created_instances[algo_id]+=1;
352  }
353  }
354  }
355 
357 
358  }
359 
360  // Now resize all the requirement bitsets to the same size
361  for (auto& kv : m_resource_requirements) {
362  kv.second.resize(resource_counter);
363  }
364 
365  // Set all resources to be available
366  m_available_resources.resize(resource_counter);
367  m_available_resources.set();
368 
369  return sc;
370 }
371 
372 //---------------------------------------------------------------------------
373 
376  for (auto algoSmartIF :m_flatUniqueAlgList )
377  m_flatUniqueAlgPtrList.push_back(const_cast<IAlgorithm*>(algoSmartIF.get()));
378  return m_flatUniqueAlgPtrList;
379 }
380 
381 //---------------------------------------------------------------------------
382 
385  for (auto algoSmartIF :m_topAlgList )
386  m_topAlgPtrList.push_back(const_cast<IAlgorithm*>(algoSmartIF.get()));
387  return m_topAlgPtrList;
388 }
389 
390 //---------------------------------------------------------------------------
391 
393  auto algBeginRun = [&] (SmartIF<IAlgorithm>& algoSmartIF) -> StatusCode {
394  StatusCode sc = algoSmartIF->sysBeginRun();
395  if (!sc.isSuccess()) {
396  warning() << "beginRun() of algorithm " << algoSmartIF->name() << " failed" << endmsg;
397  return StatusCode::FAILURE;
398  }
399  return StatusCode::SUCCESS;
400  };
401  // Call the beginRun() method of all algorithms
402  for (auto& algoSmartIF : m_flatUniqueAlgList ) {
403  if (algBeginRun(algoSmartIF).isFailure())
404  return StatusCode::FAILURE;
405  }
406 
407  return StatusCode::SUCCESS;
408 }
409 
410 //---------------------------------------------------------------------------
411 
413 
414  auto algEndRun = [&] (SmartIF<IAlgorithm>& algoSmartIF) -> StatusCode {
415  StatusCode sc = algoSmartIF->sysEndRun();
416  if (!sc.isSuccess()) {
417  warning() << "endRun() of algorithm " << algoSmartIF->name() << " failed" << endmsg;
418  return StatusCode::FAILURE;
419  }
420  return StatusCode::SUCCESS;
421  };
422  // Call the beginRun() method of all top algorithms
423  for (auto& algoSmartIF : m_flatUniqueAlgList ) {
424  if (algEndRun(algoSmartIF).isFailure())
425  return StatusCode::FAILURE;
426  }
427  for (auto& algoSmartIF : m_topAlgList ) {
428  if (algEndRun(algoSmartIF).isFailure())
429  return StatusCode::FAILURE;
430  }
431  return StatusCode::SUCCESS;
432 }
433 
434 //---------------------------------------------------------------------------
435 
437 
438  StatusCode stopSc = Service::stop();
439  if ( ! stopSc.isSuccess() ) return stopSc;
440 
441  // sys-Stop the algos
442  for (auto& ialgo : m_algList){
443  stopSc = ialgo->sysStop();
444  if (stopSc.isFailure()){
445  error() << "Unable to stop Algorithm: " << ialgo->name() << endmsg;
446  return stopSc;
447  }
448  }
449  return StatusCode::SUCCESS;
450 }
ListAlg m_flatUniqueAlgList
The flat list of algorithms w/o clones.
StatusCode getProperty(Gaudi::Details::PropertyBase *p) const override
get the property
virtual SmartIF< IAlgorithm > & algorithm(const Gaudi::Utils::TypeNameString &typeName, const bool createIf=true)=0
Returns a smart pointer to a service.
StatusCode initialize() override
Definition: Service.cpp:64
T unlock(T...args)
T empty(T...args)
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:727
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:289
StatusCode addDecisionHubNode(Algorithm *daughterAlgo, const std::string &parentName, bool modeOR, bool allPass, bool isLazy)
Add a node, which aggregates decisions of direct daughter nodes.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
StatusCode start() override
Definition: Service.cpp:137
std::map< std::string, unsigned int > m_resource_indices
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
Sequencer for executing several algorithms, stopping when one is faulty.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
std::mutex m_resource_mutex
virtual StatusCode createAlgorithm(const std::string &algtype, const std::string &algname, IAlgorithm *&alg, bool managed=false, bool checkIfExists=true)=0
Create an instance of a algorithm type that has been declared beforehand and assigns to it a name...
StatusCode beginRun() override
StatusCode stop() override
T end(T...args)
virtual StatusCode getProperty(Gaudi::Details::PropertyBase *p) const =0
Get the property by property.
StatusCode endRun() override
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:84
state_type m_available_resources
virtual StatusCode sysInitialize()=0
Initialization method invoked by the framework.
STL class.
boost::dynamic_bitset state_type
T push_back(T...args)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
Helper class to parse a string of format "type/name".
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
tbb::concurrent_bounded_queue< IAlgorithm * > concurrentQueueIAlgPtr
std::list< IAlgorithm * > m_flatUniqueAlgPtrList
The flat list of algorithms w/o clones which is returned.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
concurrency::ExecutionFlowGraph * m_EFGraph
OMG yet another hack.
std::map< size_t, state_type > m_resource_requirements
StatusCode start() override
T lock(T...args)
~AlgResourcePool() override
std::list< IAlgorithm * > m_topAlgPtrList
The top list of algorithms.
virtual unsigned int cardinality() const =0
Cardinality (Maximum number of clones that can exist) special value 0 means that algorithm is reentra...
auto end(reverse_wrapper< T > &w)
Definition: reverse.h:50
virtual void setIndex(const unsigned int &idx)=0
Set instantiation index of Alg.
std::list< IAlgorithm * > getFlatAlgList() override
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:242
StatusCode releaseResource(const std::string &name) override
Release a certrain resource.
Gaudi::Property< std::vector< std::string > > m_topAlgNames
std::list< IAlgorithm * > getTopAlgList() override
StatusCode stop() override
Definition: Service.cpp:130
T clear(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
StatusCode acquireResource(const std::string &name) override
Acquire a certain resource.
std::map< size_t, concurrentQueueIAlgPtr * > m_algqueue_map
T insert(T...args)
const std::vector< Algorithm * > * subAlgorithms() const
List of sub-algorithms. Returns a pointer to a vector of (sub) Algorithms.
Definition: Algorithm.cpp:766
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find(T...args)
StatusCode addAlgorithmNode(Algorithm *daughterAlgo, const std::string &parentName, bool inverted, bool allPass)
Add algorithm node.
StatusCode decodeTopAlgs()
Decode the top alg list.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
const std::string & type() const
StatusCode flattenSequencer(Algorithm *sequencer, ListAlg &alglist, const std::string &parentName, unsigned int recursionDepth=0)
Recursively flatten an algList.
StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false) override
Acquire a certain algorithm using its name.
void addHeadNode(const std::string &headName, bool modeOR, bool allPass, bool isLazy)
Add a node, which has no parents.
StatusCode service(const std::string &name, const T *&psvc, bool createIf=true) const
Access a service by name, creating it if it doesn&#39;t already exist.
Definition: Service.h:85
std::map< size_t, unsigned int > m_n_of_created_instances
#define DEBUG_MSG
const std::string & name() const
StatusCode initialize() override
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
void ignore() const
Definition: StatusCode.h:106
ListAlg m_topAlgList
The list of top algorithms.
const std::string & type() const override
The type of the algorithm object.
Definition: Algorithm.h:165
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
void attachAlgorithmsToNodes(const std::string &algo_name, const T &container)
Attach pointers to real Algorithms (and their clones) to Algorithm nodes of the graph.
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
std::map< size_t, size_t > m_n_of_allowed_instances
ListAlg m_algList
The list of all algorithms created withing the Pool which are not top.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo) override
Release a certain algorithm.
Gaudi::Property< bool > m_lazyCreation
T emplace_back(T...args)