The Gaudi Framework  v33r0 (d5ea422b)
HiveDataBroker.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 #include "HiveDataBroker.h"
14 #include "GaudiKernel/System.h"
15 #include "boost/lexical_cast.hpp"
16 #include "boost/tokenizer.hpp"
17 #include "range/v3/algorithm/for_each.hpp"
18 #include "range/v3/view/remove_if.hpp"
19 #include "range/v3/view/reverse.hpp"
20 #include "range/v3/view/transform.hpp"
21 #include <Gaudi/Algorithm.h>
22 #include <algorithm>
23 
25 
26 namespace {
27  struct AlgorithmRepr {
28  const Gaudi::Algorithm& parent;
29 
30  friend std::ostream& operator<<( std::ostream& s, const AlgorithmRepr& a ) {
31  std::string typ = System::typeinfoName( typeid( a.parent ) );
32  s << typ;
33  if ( a.parent.name() != typ ) s << "/" << a.parent.name();
34  return s;
35  }
36  };
37 
38  struct DataObjIDSorter {
39  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
40  };
41 
42  // Sort a DataObjIDColl in a well-defined, reproducible manner.
43  // Used for making debugging dumps.
44  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll ) {
46  v.reserve( coll.size() );
47  for ( const DataObjID& id : coll ) v.push_back( &id );
48  std::sort( v.begin(), v.end(), DataObjIDSorter() );
49  return v;
50  }
51 
52  SmartIF<IAlgorithm> createAlgorithm( IAlgManager& am, const std::string& type, const std::string& name ) {
53  // Maybe modify the AppMgr interface to return Algorithm* ??
54  IAlgorithm* tmp;
55  StatusCode sc = am.createAlgorithm( type, name, tmp );
56  return {sc.isSuccess() ? dynamic_cast<Gaudi::Algorithm*>( tmp ) : nullptr};
57  }
58 } // namespace
59 
61  auto sc = Service::initialize();
62  if ( sc.isFailure() ) return sc;
63  // populate m_algorithms
65  if ( sc.isFailure() ) return sc;
66 
67  // warn about non-reentrant algorithms
68  ranges::for_each( m_algorithms | ranges::view::transform( []( const auto& entry ) { return entry.alg; } ) |
69  ranges::view::remove_if( []( const auto* alg ) { return alg->cardinality() == 0; } ),
70  [&]( const Gaudi::Algorithm* alg ) {
71  this->warning() << "non-reentrant algorithm: " << AlgorithmRepr{*alg} << endmsg;
72  } );
73  //== Print the list of the created algorithms
74  if ( msgLevel( MSG::DEBUG ) ) {
75  MsgStream& msg = debug();
76  msg << "Available DataProducers: ";
78  msg, m_algorithms, ", ",
79  []( auto& os, const AlgEntry& e ) -> decltype( auto ) { return os << AlgorithmRepr{*e.alg}; } );
80  msg << endmsg;
81  }
82 
83  // populate m_dependencies
85  return sc;
86 }
87 
89 
91  if ( !ss.isSuccess() ) return ss;
92 
93  // sysStart for m_algorithms
94  for ( AlgEntry& algEntry : m_algorithms ) {
95  ss = algEntry.alg->sysStart();
96  if ( ss.isFailure() ) {
97  error() << "Unable to start Algorithm: " << algEntry.alg->name() << endmsg;
98  return ss;
99  }
100  }
101  // sysStart for m_cfnodes
102  for ( AlgEntry& algEntry : m_cfnodes ) {
103  ss = algEntry.alg->sysStart();
104  if ( ss.isFailure() ) {
105  error() << "Unable to start Algorithm: " << algEntry.alg->name() << endmsg;
106  return ss;
107  }
108  }
109  return ss;
110 }
111 
113  StatusCode ss = Service::stop();
114  if ( !ss.isSuccess() ) return ss;
115 
116  // sysStart for m_algorithms
117  for ( AlgEntry& algEntry : m_algorithms ) {
118  ss = algEntry.alg->sysStop();
119  if ( ss.isFailure() ) {
120  error() << "Unable to stop Algorithm: " << algEntry.alg->name() << endmsg;
121  return ss;
122  }
123  }
124  // sysStart for m_cfnodes
125  for ( AlgEntry& algEntry : m_cfnodes ) {
126  ss = algEntry.alg->sysStop();
127  if ( ss.isFailure() ) {
128  error() << "Unable to stop Algorithm: " << algEntry.alg->name() << endmsg;
129  return ss;
130  }
131  }
132  return ss;
133 }
134 
136  ranges::for_each( m_algorithms | ranges::view::transform( &AlgEntry::alg ),
137  []( Gaudi::Algorithm* alg ) { alg->sysFinalize(); } );
138  m_algorithms.clear();
139  return Service::finalize();
140 }
141 
142 // populate m_algorithms
145  std::vector<AlgEntry> algorithms;
146 
147  //= Get the Application manager, to see if algorithm exist
148  auto appMgr = service<IAlgManager>( "ApplicationMgr" );
149  for ( const Gaudi::Utils::TypeNameString item : names ) {
150  const std::string& theName = item.name();
151  const std::string& theType = item.type();
152 
153  //== Check wether the specified algorithm already exists. If not, create it
154  SmartIF<IAlgorithm> myIAlg = appMgr->algorithm( item, false ); // do not create it now
155  if ( !myIAlg ) {
156  myIAlg = createAlgorithm( *appMgr, theType, theName );
157  } else {
158  // when the algorithm is not created, the ref count is short by one, so we
159  // have to fix it.
160  myIAlg->addRef();
161  }
162 
163  if ( !myIAlg ) {
164  throw GaudiException{"Failed to create " + boost::lexical_cast<std::string>( item ), __func__,
166  }
167 
168  // propagate the sub-algorithm into own state.
169  StatusCode sc = myIAlg->sysInitialize();
170  if ( sc.isFailure() ) {
171  throw GaudiException{"Failed to initialize " + boost::lexical_cast<std::string>( item ), __func__,
173  }
174 
175  algorithms.emplace_back( std::move( myIAlg ) );
176  }
177 
178  return algorithms;
179 }
180 
183  if ( msgLevel( MSG::DEBUG ) ) {
184  debug() << "Data Dependencies for Algorithms:";
185  for ( const auto& entry : m_algorithms ) {
186  debug() << "\n " << entry.alg->name() << " :";
187  for ( const auto* id : sortedDataObjIDColl( entry.alg->inputDataObjs() ) ) {
188  debug() << "\n o INPUT " << id->key();
189  }
190  for ( const auto* id : sortedDataObjIDColl( entry.alg->outputDataObjs() ) ) {
191  debug() << "\n o OUTPUT " << id->key();
192  }
193  }
194  debug() << endmsg;
195  }
196 
197  // figure out all outputs
199  for ( AlgEntry& alg : algorithms ) {
200  const auto& output = alg.alg->outputDataObjs();
201  if ( output.empty() ) { continue; }
202  for ( auto id : output ) {
203  if ( id.key().find( ":" ) != std::string::npos ) {
204  error() << " in Alg " << AlgorithmRepr{*alg.alg} << " alternatives are NOT allowed for outputs! id: " << id
205  << endmsg;
206  }
207 
208  auto r = producers.emplace( id, &alg );
209  if ( !r.second ) {
210  throw GaudiException( "multiple algorithms declare " + id.key() + " as output (" + alg.alg->name() + " and " +
211  producers[id]->alg->name() + " at least). This is not allowed",
212  __func__, StatusCode::FAILURE );
213  }
214  }
215  }
216 
217  // resolve dependencies
218  for ( auto& algEntry : algorithms ) {
219  auto input = sortedDataObjIDColl( algEntry.alg->inputDataObjs() );
220  for ( const DataObjID* idp : input ) {
221  DataObjID id = *idp;
222  if ( id.key().find( ":" ) != std::string::npos ) {
223  warning() << AlgorithmRepr{*( algEntry.alg )} << " contains alternatives which require resolution...\n";
224  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
225  auto itok = std::find_if( tokens.begin(), tokens.end(),
226  [&]( DataObjID t ) { return producers.find( t ) != producers.end(); } );
227  if ( itok != tokens.end() ) {
228  warning() << "found matching output for " << *itok << " -- updating info\n";
229  id.updateKey( *itok );
230  warning() << "Please update input to not require alternatives, and "
231  "instead properly configure the dataloader"
232  << endmsg;
233  } else {
234  error() << "failed to find alternate in global output list"
235  << " for id: " << id << " in Alg " << algEntry.alg << endmsg;
236  }
237  }
238  auto iproducer = producers.find( id );
239  if ( iproducer != producers.end() ) {
240  algEntry.dependsOn.insert( iproducer->second );
241  } else {
242  std::ostringstream error_message;
243  error_message << "\nUnknown requested input by " << AlgorithmRepr{*( algEntry.alg )} << " : " << id.key()
244  << " .\n";
245  error_message << "You can set the OutputLevel of HiveDataBrokerSvc to DEBUG to get a list of inputs and "
246  "outputs of every algorithm.\n";
247  throw GaudiException( error_message.str(), __func__, StatusCode::FAILURE );
248  // TODO: assign to dataloader!
249  // algEntry.dependsOn.insert(dataloader.alg);
250  // dataloader.data.emplace( id ); // TODO: we may ask to much of the
251  // dataloader this way...
252  }
253  }
254  }
255  return producers;
256 }
257 
260  const std::vector<std::string>& stoppers ) const {
262 
264  deps.reserve( requested.size() );
265 
266  // start with seeding from the initial request
267  for ( const auto& req : requested ) {
268  DataObjID id = req;
269  if ( id.key().find( ":" ) != std::string::npos ) {
270  warning() << req.key() << " contains alternatives which require resolution...\n";
271  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
272  auto itok = std::find_if( tokens.begin(), tokens.end(),
273  [&]( DataObjID t ) { return m_dependencies.find( t ) != m_dependencies.end(); } );
274  if ( itok != tokens.end() ) {
275  warning() << "found matching output for " << *itok << " -- updating info\n";
276  id.updateKey( *itok );
277  warning() << "Please update input to not require alternatives, and "
278  "instead properly configure the dataloader"
279  << endmsg;
280  } else {
281  error() << "failed to find alternate in global output list"
282  << " for id: " << id << endmsg;
283  }
284  }
285  auto i = m_dependencies.find( id );
286  if ( i == m_dependencies.end() )
287  throw GaudiException( "unknown requested input: " + id.key(), __func__, StatusCode::FAILURE );
288  deps.push_back( i->second );
289  }
290  // insert the (direct) dependencies of 'current' right after 'current', and
291  // interate until done...
292  for ( auto current = deps.begin(); current != deps.end(); ++current ) {
293  if ( std::any_of( std::begin( stoppers ), std::end( stoppers ),
294  [current]( auto& stopper ) { return ( *current )->alg->name() == stopper; } ) ) {
295  continue;
296  }
297  for ( auto* entry : ( *current )->dependsOn ) {
298  if ( std::find( std::next( current ), deps.end(), entry ) != deps.end() ) continue; // already there downstream...
299 
300  auto dup = std::find( deps.begin(), current, entry );
301  // if present upstream, move it downstream. Otherwise, insert
302  // downstream...
303  current = std::prev( dup != current ? std::rotate( dup, std::next( dup ), std::next( current ) )
304  : deps.insert( std::next( current ), entry ) );
305  }
306  }
307  auto range = ( deps | ranges::view::transform( []( auto& i ) { return i->alg; } ) | ranges::view::reverse );
308  return {begin( range ), end( range )};
309 }
310 
313  const std::vector<std::string>& stoppers ) const {
315 
316  auto alg = std::find_if( begin( m_cfnodes ), end( m_cfnodes ),
317  [&]( const AlgEntry& ae ) { return ae.alg->name() == requested.name(); } );
318 
319  if ( alg != end( m_cfnodes ) && alg->alg->type() != requested.type() ) {
320  error() << "requested " << requested << " but have matching name with different type: " << alg->alg->type()
321  << endmsg;
322  }
323  if ( alg == end( m_cfnodes ) ) {
324  auto av = instantiateAndInitializeAlgorithms( {requested.type() + '/' + requested.name()} );
325  assert( av.size() == 1 );
326  m_cfnodes.push_back( std::move( av.front() ) );
327  alg = std::next( m_cfnodes.rbegin() ).base();
328  }
329  assert( alg != end( m_cfnodes ) );
330  assert( alg->alg != nullptr );
331  if ( std::find_if( std::begin( stoppers ), std::end( stoppers ),
332  [&requested]( auto& stopper ) { return requested.name() == stopper; } ) == std::end( stoppers ) ) {
333  result = algorithmsRequiredFor( alg->alg->inputDataObjs(), stoppers );
334  }
335  result.push_back( alg->alg );
336  if ( msgLevel( MSG::DEBUG ) ) {
337  debug() << std::endl << "requested " << requested << " returning " << std::endl << " ";
339  debug(), result, ",\n ",
340  []( auto& os, const Gaudi::Algorithm* a ) -> decltype( auto ) { return os << AlgorithmRepr{*a}; } );
341  debug() << std::endl << endmsg;
342  }
343  return result;
344 }
Definition of the MsgStream class used to transmit messages.
Definition: MsgStream.h:34
StatusCode initialize() override
Definition: Service.cpp:70
Define general base for Gaudi exception.
StatusCode finalize() override
Definition: Service.cpp:174
MsgStream & warning() const
shortcut for the method msgStream(MSG::WARNING)
GAUDI_API const std::string typeinfoName(const std::type_info &)
Get platform independent information about the class type.
Definition: System.cpp:308
StatusCode start() override
Definition: Service.cpp:139
::details::reverse_wrapper< T > reverse(T &&iterable)
Definition: reverse.h:59
std::vector< AlgEntry > m_cfnodes
T endl(T... args)
The IAlgManager is the interface implemented by the Algorithm Factory in the Application Manager to s...
Definition: IAlgManager.h:37
virtual StatusCode createAlgorithm(const std::string &algtype, const std::string &algname, IAlgorithm *&alg, bool managed=false, bool checkIfExists=true)=0
Create an instance of a algorithm type that has been declared beforehand and assigns to it a name.
std::map< DataObjID, AlgEntry * > mapProducers(std::vector< AlgEntry > &algorithms) const
const std::string & type() const
StatusCode finalize() override
StatusCode start() override
T end(T... args)
std::map< DataObjID, AlgEntry * > m_dependencies
std::string fullKey() const
Definition: DataObjID.cpp:98
virtual StatusCode sysInitialize()=0
Initialization method invoked by the framework.
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
T prev(T... args)
STL class.
StatusCode stop() override
#define DECLARE_COMPONENT(type)
T push_back(T... args)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
std::set< AlgEntry * > dependsOn
Helper class to parse a string of format "type/name".
T next(T... args)
const std::string & name() const
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:61
Gaudi::Property< std::vector< std::string > > m_producers
T str(T... args)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
def end
Definition: IOTest.py:123
StatusCode stop() override
Definition: Service.cpp:133
bool isSuccess() const
Definition: StatusCode.h:361
std::vector< AlgEntry > m_algorithms
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:38
T move(T... args)
Stream & ostream_joiner(Stream &os, Iterator first, Iterator last, Separator sep, OutputElement output=OutputElement{})
Definition: SerializeSTL.h:47
std::vector< Gaudi::Algorithm * > algorithmsRequiredFor(const DataObjIDColl &requested, const std::vector< std::string > &stoppers={}) const override
T insert(T... args)
T find_if(T... args)
T size(T... args)
STL class.
MsgStream & msg() const
shortcut for the method msgStream(MSG::INFO)
T begin(T... args)
T any_of(T... args)
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:89
T emplace(T... args)
T rotate(T... args)
appMgr
Definition: IOTest.py:103
string s
Definition: gaudirun.py:328
constexpr static const auto FAILURE
Definition: StatusCode.h:97
std::vector< AlgEntry > instantiateAndInitializeAlgorithms(const std::vector< std::string > &names) const
T sort(T... args)
decltype(auto) range(Args &&... args)
Zips multiple containers together to form a single range.
StatusCode initialize() override
bool isFailure() const
Definition: StatusCode.h:141
AttribStringParser::Iterator begin(const AttribStringParser &parser)
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:202
Gaudi::Algorithm * alg
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:556
T reserve(T... args)
std::ostream & operator<<(std::ostream &str, const GaudiAlg::ID &id)
Operator overloading for ostream.
Definition: GaudiHistoID.h:142
T emplace_back(T... args)