All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
AlgResourcePool.cpp
Go to the documentation of this file.
1 // Include Files
2 
3 // Framework
4 #include "AlgResourcePool.h"
9 
10 // C++
11 #include <functional>
12 #include <queue>
13 
14 // DP TODO: Manage smartifs and not pointers to algos
15 
16 // Instantiation of a static factory class used by clients to create instances of this service
18 
19 #define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
20 #define DEBUG_MSG ON_DEBUG debug()
21 
22 //---------------------------------------------------------------------------
23 
24 // destructor
26 
27  for (auto& algoId_algoQueue : m_algqueue_map){
28  auto* queue = algoId_algoQueue.second;
29  delete queue;
30  }
31 
32  delete m_PRGraph;
33 }
34 
35 //---------------------------------------------------------------------------
36 
37 // initialize the pool with the list of algos known to the IAlgManager
39 
41  if (!sc.isSuccess())
42  warning () << "Base class could not be started" << endmsg;
43 
44  // Try to recover the topAlgList from the ApplicationManager for backward-compatibility
45  if (m_topAlgNames.value().empty()){
46  info() << "TopAlg list empty. Recovering the one of Application Manager" << endmsg;
47  const Gaudi::Utils::TypeNameString appMgrName("ApplicationMgr/ApplicationMgr");
48  SmartIF<IProperty> appMgrProps (serviceLocator()->service(appMgrName));
49  m_topAlgNames.assign(appMgrProps->getProperty("TopAlg"));
50  }
51 
52  // Prepare empty graph of precedence rules
53  const std::string& name = "PrecedenceRulesGraph";
56 
57  sc = decodeTopAlgs();
58  if (sc.isFailure())
59  warning() << "Algorithms could not be properly decoded." << endmsg;
60 
61  // let's assume all resources are there
63  return StatusCode::SUCCESS;
64 }
65 
66 //---------------------------------------------------------------------------
67 
69 
70  StatusCode startSc = Service::start();
71  if ( ! startSc.isSuccess() ) return startSc;
72 
73  // sys-Start the algos
74  for (auto& ialgo : m_algList){
75  startSc = ialgo->sysStart();
76  if (startSc.isFailure()){
77  error() << "Unable to start Algorithm: " << ialgo->name() << endmsg;
78  return startSc;
79  }
80  }
81  return StatusCode::SUCCESS;
82 }
83 
84 //---------------------------------------------------------------------------
85 
87 
88  std::hash<std::string> hash_function;
89  size_t algo_id = hash_function(name);
90  auto itQueueIAlgPtr = m_algqueue_map.find(algo_id);
91 
92  if (itQueueIAlgPtr == m_algqueue_map.end()) {
93  error() << "Algorithm " << name << " requested, but not recognised"
94  << endmsg;
95  algo = nullptr;
96  return StatusCode::FAILURE;
97  }
98 
99  StatusCode sc;
100  if (blocking) {
101  itQueueIAlgPtr->second->pop(algo);
102  sc = StatusCode::SUCCESS;
103  } else {
104  sc = itQueueIAlgPtr->second->try_pop(algo);
105  }
106 
107  // Note that reentrant algos are not consumed so we put them
108  // back immediately in the queue at the end of this function.
109  // Now we may still be called again in between and get this
110  // error. In such a case, the Scheduler will retry later.
111  // This is of course not optimal, but should only happen very
112  // seldom and thud won't affect the global efficiency
113  if(sc.isFailure())
114  DEBUG_MSG << "No instance of algorithm " << name << " could be retrieved in non-blocking mode" << endmsg;
115 
116  // if (m_lazyCreation ) {
117  // TODO: fill the lazyCreation part
118  // }
119  if (sc.isSuccess()){
120  state_type requirements = m_resource_requirements[algo_id];
122  if (requirements.is_subset_of(m_available_resources)) {
123  m_available_resources^=requirements;
124  } else {
125  sc = StatusCode::FAILURE;
126  error() << "Failure to allocate resources of algorithm " << name << endmsg;
127  // in case of not reentrant, push it back. Reentrant ones are pushed back
128  // in all cases further down
129  if (0 != algo->cardinality()) {
130  itQueueIAlgPtr->second->push(algo);
131  }
132  }
134  if (0 == algo->cardinality()) {
135  // push back reentrant algos immediately as it can be reused
136  itQueueIAlgPtr->second->push(algo);
137  }
138  }
139  return sc;
140 }
141 
142 //---------------------------------------------------------------------------
143 
145 
146  std::hash<std::string> hash_function;
147  size_t algo_id = hash_function(name);
148 
149  // release resources used by the algorithm
153 
154  // release algorithm itself if not reentrant
155  if (0 != algo->cardinality()) {
156  m_algqueue_map[algo_id]->push(algo);
157  }
158  return StatusCode::SUCCESS;
159  }
160 
161 //---------------------------------------------------------------------------
162 
167  return StatusCode::SUCCESS;
168 }
169 
170 //---------------------------------------------------------------------------
171 
176  return StatusCode::SUCCESS;
177 }
178 
179 //---------------------------------------------------------------------------
180 
181 StatusCode AlgResourcePool::flattenSequencer(Algorithm* algo, ListAlg& alglist, const std::string& parentName, unsigned int recursionDepth){
182 
184 
185  bool isGaudiSequencer(false);
186  bool isAthSequencer(false);
187 
188  if (algo->hasProperty("Members")) {
189  if (algo->hasProperty("ShortCircuit"))
190  isGaudiSequencer = true;
191  else if (algo->hasProperty("StopOverride"))
192  isAthSequencer = true;
193  }
194 
195  std::vector<Algorithm*>* subAlgorithms = algo->subAlgorithms();
196  if ( //we only want to add basic algorithms -> have no subAlgs
197  // and exclude the case of empty sequencers
198  (subAlgorithms->empty() && !(isGaudiSequencer || isAthSequencer)) ) {
199 
200  DEBUG_MSG << std::string(recursionDepth, ' ') << algo->name()
201  << " is not a sequencer. Appending it" << endmsg;
202 
203  alglist.emplace_back(algo);
204  m_PRGraph->addAlgorithmNode(algo, parentName, false, false).ignore();
205  return sc;
206  }
207 
208  // Recursively unroll
209  ++recursionDepth;
210  DEBUG_MSG << std::string(recursionDepth, ' ') << algo->name() << " is a sequencer. Flattening it." << endmsg;
211  bool modeOR = false;
212  bool allPass = false;
213  bool isLazy = false;
214  bool isSequential = false;
215 
216  if ( isGaudiSequencer ) {
217  modeOR = (algo->getProperty("ModeOR").toString() == "True")? true : false;
218  allPass = (algo->getProperty("IgnoreFilterPassed").toString() == "True")? true : false;
219  isLazy = (algo->getProperty("ShortCircuit").toString() == "True")? true : false;
220  if (allPass) isLazy = false; // standard GaudiSequencer behavior on all pass is to execute everything
221  isSequential = (algo->hasProperty("Sequential") &&
222  (algo->getProperty("Sequential").toString() == "True") );
223  } else if (isAthSequencer ) {
224  modeOR = (algo->getProperty("ModeOR").toString() == "True")? true : false;
225  allPass = (algo->getProperty("IgnoreFilterPassed").toString() == "True")? true : false;
226  isLazy = (algo->getProperty("StopOverride").toString() == "True")? false : true;
227  isSequential = (algo->hasProperty("Sequential") &&
228  (algo->getProperty("Sequential").toString() == "True") );
229  }
230  sc = m_PRGraph->addDecisionHubNode(algo, parentName, !isSequential, isLazy, modeOR, allPass);
231  if (sc.isFailure()) {
232  error() << "Failed to add DecisionHub " << algo->name() << " to graph of precedence rules" << endmsg;
233  return sc;
234  }
235 
236  for (Algorithm* subalgo : *subAlgorithms ) {
237  sc = flattenSequencer(subalgo,alglist,algo->name(),recursionDepth);
238  if (sc.isFailure()) {
239  error() << "Algorithm " << subalgo->name() << " could not be flattened" << endmsg;
240  return sc;
241  }
242  }
243  return sc;
244 }
245 
246 //---------------------------------------------------------------------------
247 
249 
251  if (!algMan.isValid()){
252  error() << "Algorithm manager could not be properly fetched." << endmsg;
253  return StatusCode::FAILURE;
254  }
255 
256  // Useful lambda not to repeat ourselves --------------------------
257  auto createAlg = [&algMan,this] (const std::string& item_type,
258  const std::string& item_name,
259  IAlgorithm*& algo){
260  StatusCode createAlgSc = algMan->createAlgorithm(item_type,
261  item_name,
262  algo,
263  true,
264  false);
265  if (createAlgSc.isFailure())
266  this->warning() << "Algorithm " << item_type << "/" << item_name
267  << " could not be created." << endmsg;
268  };
269  // End of lambda --------------------------------------------------
270 
272 
273  // Fill the top alg list ----
274  const std::vector<std::string>& topAlgNames = m_topAlgNames.value();
275  for (auto& name : topAlgNames) {
276  IAlgorithm* algo(nullptr);
277 
279  const std::string& item_name = item.name();
280  const std::string& item_type = item.type();
281  SmartIF<IAlgorithm> algoSmartIF (algMan->algorithm(item_name,false));
282 
283  if (!algoSmartIF.isValid()){
284  createAlg(item_type,item_name,algo);
285  algoSmartIF = algo;
286  }
287  // Init and start
288  algoSmartIF->sysInitialize();
289  m_topAlgList.push_back(algoSmartIF);
290  }
291  // Top Alg list filled ----
292 
293  // start forming the graph of precedence rules by adding the head decision hub
294  m_PRGraph->addHeadNode("RootDecisionHub",true,false,true,true);
295 
296  // Now we unroll it ----
297  for (auto& algoSmartIF : m_topAlgList) {
298  Algorithm* algorithm = dynamic_cast<Algorithm*> (algoSmartIF.get());
299  if (!algorithm) fatal() << "Conversion from IAlgorithm to Algorithm failed" << endmsg;
300  sc = flattenSequencer(algorithm, m_flatUniqueAlgList, "RootDecisionHub");
301  }
302  // stupid O(N^2) unique-ification..
303  for ( auto i = begin(m_flatUniqueAlgList); i!=end(m_flatUniqueAlgList); ++i ) {
304  auto n = next(i);
305  while ( n!=end(m_flatUniqueAlgList) ) {
306  if (*n==*i) n = m_flatUniqueAlgList.erase(n);
307  else ++n;
308  }
309  }
310  if (msgLevel(MSG::DEBUG)){
311  debug() << "List of algorithms is: " << endmsg;
312  for (auto& algo : m_flatUniqueAlgList)
313  debug() << " o " << algo->type() << "/" << algo->name() << " @ " << algo << endmsg;
314  }
315 
316  // Unrolled ---
317 
318  // Now let's manage the clones
319  unsigned int resource_counter(0);
320  std::hash<std::string> hash_function;
321  for (auto& ialgoSmartIF : m_flatUniqueAlgList) {
322 
323  const std::string& item_name = ialgoSmartIF->name();
324 
325  verbose() << "Treating resource management and clones of " << item_name << endmsg;
326 
327  Algorithm* algo = dynamic_cast<Algorithm*> ( ialgoSmartIF.get() );
328  if (!algo) fatal() << "Conversion from IAlgorithm to Algorithm failed" << endmsg;
329  const std::string& item_type = algo->type();
330 
331  size_t algo_id = hash_function(item_name);
333  m_algqueue_map[algo_id] = queue;
334 
335  // DP TODO Do it properly with SmartIFs, also in the queues
336  IAlgorithm* ialgo(ialgoSmartIF.get());
337 
338  queue->push(ialgo);
339  m_algList.push_back(ialgo);
340  m_n_of_allowed_instances[algo_id] = ialgo->cardinality();
341  m_n_of_created_instances[algo_id] = 1;
342 
343  state_type requirements(0);
344 
345  for (auto& resource_name : ialgo->neededResources()){
346  auto ret = m_resource_indices.emplace(resource_name,resource_counter);
347  // insert successful means == wasn't known before. So increment counter
348  if (ret.second) ++resource_counter;
349  // Resize for every algo according to the found resources
350  requirements.resize(resource_counter);
351  // in any case the return value holds the proper product index
352  requirements[ret.first->second] = true;
353 
354  }
355 
356  m_resource_requirements[algo_id] = requirements;
357 
358  // potentially create clones; if not lazy creation we have to do it now
359  if (!m_lazyCreation) {
360  for (unsigned int i =1, end =ialgo->cardinality();i<end; ++i){
361  debug() << "type/name to create clone of: " << item_type << "/"
362  << item_name << endmsg;
363  IAlgorithm* ialgoClone(nullptr);
364  createAlg(item_type,item_name,ialgoClone);
365  ialgoClone->setIndex(i);
366  if (ialgoClone->sysInitialize().isFailure()) {
367  error() << "unable to initialize Algorithm clone "
368  << ialgoClone->name() << endmsg;
369  sc = StatusCode::FAILURE;
370  // FIXME: should we delete this failed clone?
371  } else {
372  queue->push(ialgoClone);
373  m_n_of_created_instances[algo_id]+=1;
374  }
375  }
376  }
377 
379 
380  }
381 
382  // Now resize all the requirement bitsets to the same size
383  for (auto& kv : m_resource_requirements) {
384  kv.second.resize(resource_counter);
385  }
386 
387  // Set all resources to be available
388  m_available_resources.resize(resource_counter);
389  m_available_resources.set();
390 
391  return sc;
392 }
393 
394 //---------------------------------------------------------------------------
395 
398  for (auto algoSmartIF :m_flatUniqueAlgList )
399  m_flatUniqueAlgPtrList.push_back(const_cast<IAlgorithm*>(algoSmartIF.get()));
400  return m_flatUniqueAlgPtrList;
401 }
402 
403 //---------------------------------------------------------------------------
404 
407  for (auto algoSmartIF :m_topAlgList )
408  m_topAlgPtrList.push_back(const_cast<IAlgorithm*>(algoSmartIF.get()));
409  return m_topAlgPtrList;
410 }
411 
412 //---------------------------------------------------------------------------
413 
415  auto algBeginRun = [&] (SmartIF<IAlgorithm>& algoSmartIF) -> StatusCode {
416  StatusCode sc = algoSmartIF->sysBeginRun();
417  if (!sc.isSuccess()) {
418  warning() << "beginRun() of algorithm " << algoSmartIF->name() << " failed" << endmsg;
419  return StatusCode::FAILURE;
420  }
421  return StatusCode::SUCCESS;
422  };
423  // Call the beginRun() method of all algorithms
424  for (auto& algoSmartIF : m_flatUniqueAlgList ) {
425  if (algBeginRun(algoSmartIF).isFailure())
426  return StatusCode::FAILURE;
427  }
428 
429  return StatusCode::SUCCESS;
430 }
431 
432 //---------------------------------------------------------------------------
433 
435 
436  auto algEndRun = [&] (SmartIF<IAlgorithm>& algoSmartIF) -> StatusCode {
437  StatusCode sc = algoSmartIF->sysEndRun();
438  if (!sc.isSuccess()) {
439  warning() << "endRun() of algorithm " << algoSmartIF->name() << " failed" << endmsg;
440  return StatusCode::FAILURE;
441  }
442  return StatusCode::SUCCESS;
443  };
444  // Call the beginRun() method of all top algorithms
445  for (auto& algoSmartIF : m_flatUniqueAlgList ) {
446  if (algEndRun(algoSmartIF).isFailure())
447  return StatusCode::FAILURE;
448  }
449  for (auto& algoSmartIF : m_topAlgList ) {
450  if (algEndRun(algoSmartIF).isFailure())
451  return StatusCode::FAILURE;
452  }
453  return StatusCode::SUCCESS;
454 }
455 
456 //---------------------------------------------------------------------------
457 
459 
460  StatusCode stopSc = Service::stop();
461  if ( ! stopSc.isSuccess() ) return stopSc;
462 
463  // sys-Stop the algos
464  for (auto& ialgo : m_algList){
465  stopSc = ialgo->sysStop();
466  if (stopSc.isFailure()){
467  error() << "Unable to stop Algorithm: " << ialgo->name() << endmsg;
468  return stopSc;
469  }
470  }
471  return StatusCode::SUCCESS;
472 }
ListAlg m_flatUniqueAlgList
The flat list of algorithms w/o clones.
StatusCode getProperty(Gaudi::Details::PropertyBase *p) const override
get the property
virtual SmartIF< IAlgorithm > & algorithm(const Gaudi::Utils::TypeNameString &typeName, const bool createIf=true)=0
Returns a smart pointer to a service.
StatusCode initialize() override
Definition: Service.cpp:64
T unlock(T...args)
T empty(T...args)
StatusCode addAlgorithmNode(Algorithm *daughterAlgo, const std::string &parentName, bool inverted, bool allPass)
Add algorithm node.
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:715
const std::string & name() const override
Retrieve name of the service.
Definition: Service.cpp:289
void attachAlgorithmsToNodes(const std::string &algo_name, const T &container)
Attach pointers to real Algorithms (and their clones) to Algorithm nodes of the graph.
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
StatusCode start() override
Definition: Service.cpp:137
std::map< std::string, unsigned int > m_resource_indices
bool isSuccess() const
Test for a status code of SUCCESS.
Definition: StatusCode.h:74
bool hasProperty(const std::string &name) const override
Return true if we have a property with the given name.
MsgStream & verbose() const
shortcut for the method msgStream(MSG::VERBOSE)
StatusCode addDecisionHubNode(Algorithm *daughterAlgo, const std::string &parentName, bool modeConcurrent, bool modePromptDecision, bool modeOR, bool allPass)
Add a node, which aggregates decisions of direct daughter nodes.
std::mutex m_resource_mutex
virtual StatusCode createAlgorithm(const std::string &algtype, const std::string &algname, IAlgorithm *&alg, bool managed=false, bool checkIfExists=true)=0
Create an instance of a algorithm type that has been declared beforehand and assigns to it a name...
StatusCode beginRun() override
void addHeadNode(const std::string &headName, bool modeConcurrent, bool modePromptDecision, bool modeOR, bool allPass)
Add a node, which has no parents.
StatusCode stop() override
T end(T...args)
virtual StatusCode getProperty(Gaudi::Details::PropertyBase *p) const =0
Get the property by property.
StatusCode endRun() override
The AlgResourcePool is a concrete implementation of the IAlgResourcePool interface.
bool isFailure() const
Test for a status code of FAILURE.
Definition: StatusCode.h:84
state_type m_available_resources
virtual StatusCode sysInitialize()=0
Initialization method invoked by the framework.
auto begin(reverse_wrapper< T > &w)
Definition: reverse.h:48
STL class.
boost::dynamic_bitset state_type
T push_back(T...args)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
Helper class to parse a string of format "type/name".
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
tbb::concurrent_bounded_queue< IAlgorithm * > concurrentQueueIAlgPtr
std::list< IAlgorithm * > m_flatUniqueAlgPtrList
The flat list of algorithms w/o clones which is returned.
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:26
std::map< size_t, state_type > m_resource_requirements
T erase(T...args)
StatusCode start() override
T lock(T...args)
~AlgResourcePool() override
std::list< IAlgorithm * > m_topAlgPtrList
The top list of algorithms.
virtual unsigned int cardinality() const =0
Cardinality (Maximum number of clones that can exist) special value 0 means that algorithm is reentra...
auto end(reverse_wrapper< T > &w)
Definition: reverse.h:50
virtual void setIndex(const unsigned int &idx)=0
Set instantiation index of Alg.
std::list< IAlgorithm * > getFlatAlgList() override
#define DECLARE_SERVICE_FACTORY(x)
Definition: Service.h:242
StatusCode releaseResource(const std::string &name) override
Release a certrain resource.
Gaudi::Property< std::vector< std::string > > m_topAlgNames
std::list< IAlgorithm * > getTopAlgList() override
StatusCode stop() override
Definition: Service.cpp:130
T clear(T...args)
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:27
StatusCode acquireResource(const std::string &name) override
Acquire a certain resource.
std::map< size_t, concurrentQueueIAlgPtr * > m_algqueue_map
const std::vector< Algorithm * > * subAlgorithms() const
List of sub-algorithms. Returns a pointer to a vector of (sub) Algorithms.
Definition: Algorithm.cpp:754
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:78
T find(T...args)
StatusCode decodeTopAlgs()
Decode the top alg list.
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
bool isValid() const
Allow for check if smart pointer is valid.
Definition: SmartIF.h:62
const std::string & type() const
StatusCode flattenSequencer(Algorithm *sequencer, ListAlg &alglist, const std::string &parentName, unsigned int recursionDepth=0)
Recursively flatten an algList.
T emplace(T...args)
StatusCode acquireAlgorithm(const std::string &name, IAlgorithm *&algo, bool blocking=false) override
Acquire a certain algorithm using its name.
StatusCode service(const std::string &name, const T *&psvc, bool createIf=true) const
Access a service by name, creating it if it doesn&#39;t already exist.
Definition: Service.h:85
std::map< size_t, unsigned int > m_n_of_created_instances
#define DEBUG_MSG
const std::string & name() const
StatusCode initialize() override
MsgStream & fatal() const
shortcut for the method msgStream(MSG::FATAL)
void ignore() const
Definition: StatusCode.h:106
concurrency::PrecedenceRulesGraph * m_PRGraph
OMG yet another hack.
ListAlg m_topAlgList
The list of top algorithms.
const std::string & type() const override
The type of the algorithm object.
Definition: Algorithm.h:165
MSG::Level msgLevel() const
get the output level from the embedded MsgStream
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Definition: Service.cpp:292
std::map< size_t, size_t > m_n_of_allowed_instances
ListAlg m_algList
The list of all algorithms created withing the Pool which are not top.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:244
StatusCode releaseAlgorithm(const std::string &name, IAlgorithm *&algo) override
Release a certain algorithm.
Gaudi::Property< bool > m_lazyCreation
T emplace_back(T...args)