Loading [MathJax]/extensions/tex2jax.js
The Gaudi Framework  v31r0 (aeb156f0)
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
HiveDataBroker.cpp
Go to the documentation of this file.
1 #include "HiveDataBroker.h"
4 #include "GaudiKernel/System.h"
5 #include "boost/lexical_cast.hpp"
6 #include "boost/tokenizer.hpp"
7 #include "range/v3/algorithm/for_each.hpp"
8 #include "range/v3/view/remove_if.hpp"
9 #include "range/v3/view/reverse.hpp"
10 #include "range/v3/view/transform.hpp"
11 #include <Gaudi/Algorithm.h>
12 #include <algorithm>
13 
15 
16 namespace {
17  struct AlgorithmRepr {
18  const Gaudi::Algorithm& parent;
19 
20  friend std::ostream& operator<<( std::ostream& s, const AlgorithmRepr& a ) {
21  std::string typ = System::typeinfoName( typeid( a.parent ) );
22  s << typ;
23  if ( a.parent.name() != typ ) s << "/" << a.parent.name();
24  return s;
25  }
26  };
27 
28  struct DataObjIDSorter {
29  bool operator()( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); }
30  };
31 
32  // Sort a DataObjIDColl in a well-defined, reproducible manner.
33  // Used for making debugging dumps.
34  std::vector<const DataObjID*> sortedDataObjIDColl( const DataObjIDColl& coll ) {
36  v.reserve( coll.size() );
37  for ( const DataObjID& id : coll ) v.push_back( &id );
38  std::sort( v.begin(), v.end(), DataObjIDSorter() );
39  return v;
40  }
41 
42  SmartIF<IAlgorithm> createAlgorithm( IAlgManager& am, const std::string& type, const std::string& name ) {
43  // Maybe modify the AppMgr interface to return Algorithm* ??
44  IAlgorithm* tmp;
45  StatusCode sc = am.createAlgorithm( type, name, tmp );
46  return {sc.isSuccess() ? dynamic_cast<Gaudi::Algorithm*>( tmp ) : nullptr};
47  }
48 } // namespace
49 
51  auto sc = Service::initialize();
52  if ( sc.isFailure() ) return sc;
53  // populate m_algorithms
54  m_algorithms = instantiateAndInitializeAlgorithms( m_producers );
55  if ( sc.isFailure() ) return sc;
56 
57  // warn about non-reentrant algorithms
58  ranges::for_each( m_algorithms | ranges::view::transform( []( const auto& entry ) { return entry.alg; } ) |
59  ranges::view::remove_if( []( const auto* alg ) { return alg->cardinality() == 0; } ),
60  [&]( const Gaudi::Algorithm* alg ) {
61  this->warning() << "non-reentrant algorithm: " << AlgorithmRepr{*alg} << endmsg;
62  } );
63  //== Print the list of the created algorithms
64  if ( msgLevel( MSG::DEBUG ) ) {
65  MsgStream& msg = debug();
66  msg << "Available DataProducers: ";
68  msg, m_algorithms, ", ",
69  []( auto& os, const AlgEntry& e ) -> decltype( auto ) { return os << AlgorithmRepr{*e.alg}; } );
70  msg << endmsg;
71  }
72 
73  // populate m_dependencies
74  m_dependencies = mapProducers( m_algorithms );
75  return sc;
76 }
77 
79 
81  if ( !ss.isSuccess() ) return ss;
82 
83  // sysStart for m_algorithms
84  for ( AlgEntry& algEntry : m_algorithms ) {
85  ss = algEntry.alg->sysStart();
86  if ( ss.isFailure() ) {
87  error() << "Unable to start Algorithm: " << algEntry.alg->name() << endmsg;
88  return ss;
89  }
90  }
91  // sysStart for m_cfnodes
92  for ( AlgEntry& algEntry : m_cfnodes ) {
93  ss = algEntry.alg->sysStart();
94  if ( ss.isFailure() ) {
95  error() << "Unable to start Algorithm: " << algEntry.alg->name() << endmsg;
96  return ss;
97  }
98  }
99  return ss;
100 }
101 
103  StatusCode ss = Service::stop();
104  if ( !ss.isSuccess() ) return ss;
105 
106  // sysStart for m_algorithms
107  for ( AlgEntry& algEntry : m_algorithms ) {
108  ss = algEntry.alg->sysStop();
109  if ( ss.isFailure() ) {
110  error() << "Unable to stop Algorithm: " << algEntry.alg->name() << endmsg;
111  return ss;
112  }
113  }
114  // sysStart for m_cfnodes
115  for ( AlgEntry& algEntry : m_cfnodes ) {
116  ss = algEntry.alg->sysStop();
117  if ( ss.isFailure() ) {
118  error() << "Unable to stop Algorithm: " << algEntry.alg->name() << endmsg;
119  return ss;
120  }
121  }
122  return ss;
123 }
124 
126  ranges::for_each( m_algorithms | ranges::view::transform( &AlgEntry::alg ),
127  []( Gaudi::Algorithm* alg ) { alg->sysFinalize(); } );
128  m_algorithms.clear();
129  return Service::finalize();
130 }
131 
132 // populate m_algorithms
135  std::vector<AlgEntry> algorithms;
136 
137  //= Get the Application manager, to see if algorithm exist
138  auto appMgr = service<IAlgManager>( "ApplicationMgr" );
139  for ( const Gaudi::Utils::TypeNameString item : names ) {
140  const std::string& theName = item.name();
141  const std::string& theType = item.type();
142 
143  //== Check wether the specified algorithm already exists. If not, create it
144  SmartIF<IAlgorithm> myIAlg = appMgr->algorithm( item, false ); // do not create it now
145  if ( !myIAlg ) {
146  myIAlg = createAlgorithm( *appMgr, theType, theName );
147  } else {
148  // when the algorithm is not created, the ref count is short by one, so we
149  // have to fix it.
150  myIAlg->addRef();
151  }
152 
153  if ( !myIAlg ) {
154  throw GaudiException{"Failed to create " + boost::lexical_cast<std::string>( item ), __func__,
156  }
157 
158  // propagate the sub-algorithm into own state.
159  StatusCode sc = myIAlg->sysInitialize();
160  if ( sc.isFailure() ) {
161  throw GaudiException{"Failed to initialize " + boost::lexical_cast<std::string>( item ), __func__,
163  }
164 
165  algorithms.emplace_back( std::move( myIAlg ) );
166  }
167 
168  return algorithms;
169 }
170 
173  if ( msgLevel( MSG::DEBUG ) ) {
174  debug() << "Data Dependencies for Algorithms:";
175  for ( const auto& entry : m_algorithms ) {
176  debug() << "\n " << entry.alg->name() << " :";
177  for ( const auto& id : entry.alg->inputDataObjs() ) { debug() << "\n o INPUT " << id.key(); }
178  for ( const auto& id : entry.alg->outputDataObjs() ) { debug() << "\n o OUTPUT " << id.key(); }
179  }
180  debug() << endmsg;
181  }
182 
183  // figure out all outputs
185  for ( AlgEntry& alg : algorithms ) {
186  const auto& output = alg.alg->outputDataObjs();
187  if ( output.empty() ) { continue; }
188  for ( auto id : output ) {
189  if ( id.key().find( ":" ) != std::string::npos ) {
190  error() << " in Alg " << AlgorithmRepr{*alg.alg} << " alternatives are NOT allowed for outputs! id: " << id
191  << endmsg;
192  }
193 
194  auto r = producers.emplace( id, &alg );
195  if ( !r.second ) {
196  if ( output.size() == 1 ) {
197  error() << "multiple algorithms declare " << id << " as output! -- IGNORING " << AlgorithmRepr{*alg.alg}
198  << endmsg;
199  } else {
200  error() << "multiple algorithms declare " << id << " as output; given that " << AlgorithmRepr{*alg.alg}
201  << " produces multiple outputs ";
202  //<< output <<
203  error() << " this could lead to clashes in case any of the other "
204  "items is ever requested"
205  << endmsg;
206  }
207  }
208  }
209  }
210 
211  // resolve dependencies
212  for ( auto& algEntry : algorithms ) {
213  auto input = sortedDataObjIDColl( algEntry.alg->inputDataObjs() );
214  for ( const DataObjID* idp : input ) {
215  DataObjID id = *idp;
216  if ( id.key().find( ":" ) != std::string::npos ) {
217  warning() << " contains alternatives which require resolution...\n";
218  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
219  auto itok = std::find_if( tokens.begin(), tokens.end(),
220  [&]( DataObjID t ) { return producers.find( t ) != producers.end(); } );
221  if ( itok != tokens.end() ) {
222  warning() << "found matching output for " << *itok << " -- updating info\n";
223  id.updateKey( *itok );
224  warning() << "Please update input to not require alternatives, and "
225  "instead properly configure the dataloader"
226  << endmsg;
227  } else {
228  error() << "failed to find alternate in global output list"
229  << " for id: " << id << " in Alg " << algEntry.alg << endmsg;
230  }
231  }
232  auto iproducer = producers.find( id );
233  if ( iproducer != producers.end() ) {
234  algEntry.dependsOn.insert( iproducer->second );
235  } else {
236  throw GaudiException( "unknown requested input: " + id.key(), __func__, StatusCode::FAILURE );
237  // TODO: assign to dataloader!
238  // algEntry.dependsOn.insert(dataloader.alg);
239  // dataloader.data.emplace( id ); // TODO: we may ask to much of the
240  // dataloader this way...
241  }
242  }
243  }
244  return producers;
245 }
246 
249  const std::vector<std::string>& stoppers ) const {
251 
253  deps.reserve( requested.size() );
254 
255  // start with seeding from the initial request
256  for ( const auto& req : requested ) {
257  DataObjID id = req;
258  if ( id.key().find( ":" ) != std::string::npos ) {
259  warning() << req.key() << " contains alternatives which require resolution...\n";
260  auto tokens = boost::tokenizer<boost::char_separator<char>>{id.key(), boost::char_separator<char>{":"}};
261  auto itok = std::find_if( tokens.begin(), tokens.end(),
262  [&]( DataObjID t ) { return m_dependencies.find( t ) != m_dependencies.end(); } );
263  if ( itok != tokens.end() ) {
264  warning() << "found matching output for " << *itok << " -- updating info\n";
265  id.updateKey( *itok );
266  warning() << "Please update input to not require alternatives, and "
267  "instead properly configure the dataloader"
268  << endmsg;
269  } else {
270  error() << "failed to find alternate in global output list"
271  << " for id: " << id << endmsg;
272  }
273  }
274  auto i = m_dependencies.find( id );
275  if ( i == m_dependencies.end() )
276  throw GaudiException( "unknown requested input: " + id.key(), __func__, StatusCode::FAILURE );
277  deps.push_back( i->second );
278  }
279  // insert the (direct) dependencies of 'current' right after 'current', and
280  // interate until done...
281  for ( auto current = deps.begin(); current != deps.end(); ++current ) {
282  if ( std::any_of( std::begin( stoppers ), std::end( stoppers ),
283  [current]( auto& stopper ) { return ( *current )->alg->name() == stopper; } ) ) {
284  continue;
285  }
286  for ( auto* entry : ( *current )->dependsOn ) {
287  if ( std::find( std::next( current ), deps.end(), entry ) != deps.end() ) continue; // already there downstream...
288 
289  auto dup = std::find( deps.begin(), current, entry );
290  // if present upstream, move it downstream. Otherwise, insert
291  // downstream...
292  current = std::prev( dup != current ? std::rotate( dup, std::next( dup ), std::next( current ) )
293  : deps.insert( std::next( current ), entry ) );
294  }
295  }
296  auto range = ( deps | ranges::view::transform( []( auto& i ) { return i->alg; } ) | ranges::view::reverse );
297  return {begin( range ), end( range )};
298 }
299 
302  const std::vector<std::string>& stoppers ) const {
304 
305  auto alg = std::find_if( begin( m_cfnodes ), end( m_cfnodes ),
306  [&]( const AlgEntry& ae ) { return ae.alg->name() == requested.name(); } );
307 
308  if ( alg != end( m_cfnodes ) && alg->alg->type() != requested.type() ) {
309  error() << "requested " << requested << " but have matching name with different type: " << alg->alg->type()
310  << endmsg;
311  }
312  if ( alg == end( m_cfnodes ) ) {
313  auto av = instantiateAndInitializeAlgorithms( {requested.type() + '/' + requested.name()} );
314  assert( av.size() == 1 );
315  m_cfnodes.push_back( std::move( av.front() ) );
316  alg = std::next( m_cfnodes.rbegin() ).base();
317  }
318  assert( alg != end( m_cfnodes ) );
319  assert( alg->alg != nullptr );
320  if ( std::find_if( std::begin( stoppers ), std::end( stoppers ),
321  [&requested]( auto& stopper ) { return requested.name() == stopper; } ) == std::end( stoppers ) ) {
322  result = algorithmsRequiredFor( alg->alg->inputDataObjs(), stoppers );
323  }
324  result.push_back( alg->alg );
325  if ( msgLevel( MSG::DEBUG ) ) {
326  debug() << std::endl << "requested " << requested << " returning " << std::endl << " ";
328  debug(), result, ",\n ",
329  []( auto& os, const Gaudi::Algorithm* a ) -> decltype( auto ) { return os << AlgorithmRepr{*a}; } );
330  debug() << std::endl << endmsg;
331  }
332  return result;
333 }
Definition of the MsgStream class used to transmit messages.
Definition: MsgStream.h:24
StatusCode initialize() override
Definition: Service.cpp:60
Define general base for Gaudi exception.
std::map< DataObjID, AlgEntry * > mapProducers(std::vector< AlgEntry > &algorithms) const
StatusCode finalize() override
Definition: Service.cpp:164
GAUDI_API const std::string typeinfoName(const std::type_info &)
Get platform independent information about the class type.
Definition: System.cpp:309
StatusCode start() override
Definition: Service.cpp:129
const std::string & name() const override
The identifying name of the algorithm object.
Definition: Algorithm.cpp:635
bool isSuccess() const
Definition: StatusCode.h:267
::details::reverse_wrapper< T > reverse(T &&iterable)
Definition: reverse.h:49
T endl(T...args)
The IAlgManager is the interface implemented by the Algorithm Factory in the Application Manager to s...
Definition: IAlgManager.h:27
StatusCode finalize() override
StatusCode start() override
T end(T...args)
bool isFailure() const
Definition: StatusCode.h:130
StatusCode sysFinalize() override
System finalization.
Definition: Algorithm.cpp:558
virtual StatusCode sysInitialize()=0
Initialization method invoked by the framework.
T prev(T...args)
STL class.
StatusCode stop() override
#define DECLARE_COMPONENT(type)
T push_back(T...args)
Helper class to parse a string of format "type/name".
T next(T...args)
This class is used for returning status codes from appropriate routines.
Definition: StatusCode.h:50
std::vector< AlgEntry > instantiateAndInitializeAlgorithms(const std::vector< std::string > &names) const
const std::string & key() const
Definition: DataObjID.h:48
decltype(auto) range(Args &&...args)
Zips multiple containers together to form a single range.
StatusCode stop() override
Definition: Service.cpp:123
The IAlgorithm is the interface implemented by the Algorithm base class.
Definition: IAlgorithm.h:28
T move(T...args)
Stream & ostream_joiner(Stream &os, Iterator first, Iterator last, Separator sep, OutputElement output=OutputElement{})
Definition: SerializeSTL.h:37
T insert(T...args)
T find_if(T...args)
T size(T...args)
STL class.
virtual Out operator()(const vector_of_const_< In > &inputs) const =0
T begin(T...args)
const std::string & type() const
std::vector< Gaudi::Algorithm * > algorithmsRequiredFor(const DataObjIDColl &requested, const std::vector< std::string > &stoppers={}) const override
T any_of(T...args)
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:79
T emplace(T...args)
T rotate(T...args)
virtual StatusCode createAlgorithm(const std::string &algtype, const std::string &algname, IAlgorithm *&alg, bool managed=false, bool checkIfExists=true )=0
Create an instance of a algorithm type that has been declared beforehand and assigns to it a name...
appMgr
Definition: IOTest.py:92
string s
Definition: gaudirun.py:312
constexpr static const auto FAILURE
Definition: StatusCode.h:86
T sort(T...args)
StatusCode initialize() override
AttribStringParser::Iterator begin(const AttribStringParser &parser)
const std::string & name() const
std::string fullKey() const
Definition: DataObjID.cpp:88
STL class.
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:192
Gaudi::Algorithm * alg
T reserve(T...args)
std::ostream & operator<<(std::ostream &str, const GaudiAlg::ID &id)
Operator overloading for ostream.
Definition: GaudiHistoID.h:132
T emplace_back(T...args)