The Gaudi Framework  v39r1 (adb068b2)
HiveDataBroker.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2024 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 #include <Gaudi/Algorithm.h>
15 #include <GaudiKernel/Service.h>
16 #include <GaudiKernel/System.h>
17 #include <algorithm>
18 #include <boost/lexical_cast.hpp>
19 #include <boost/tokenizer.hpp>
20 #include <iomanip>
21 #include <stdexcept>
22 #ifdef __cpp_lib_ranges
23 # include <ranges>
24 namespace ranges = std::ranges;
25 #else
26 # include <range/v3/algorithm/for_each.hpp>
27 # include <range/v3/view/filter.hpp>
28 # include <range/v3/view/reverse.hpp>
29 # include <range/v3/view/transform.hpp>
30 // upstream has renamed namespace ranges::view ranges::views
31 # if RANGE_V3_VERSION < 900
32 namespace ranges::views {
33  using namespace ranges::view;
34 }
35 # endif
36 #endif
37 
38 class HiveDataBrokerSvc final : public extends<Service, IDataBroker> {
39 public:
40  using extends::extends;
41 
42  std::vector<Gaudi::Algorithm*> algorithmsRequiredFor( const DataObjIDColl& requested,
43  const std::vector<std::string>& stoppers = {} ) const override;
45  const std::vector<std::string>& stoppers = {} ) const override;
46 
47  StatusCode initialize() override;
48  StatusCode start() override;
49  StatusCode stop() override;
50  StatusCode finalize() override;
51 
52 private:
53  Gaudi::Property<std::string> m_dataLoader{ this, "DataLoader", "",
54  "Attribute any unmet input dependencies to this Algorithm" };
56  this, "DataProducers", {}, "List of algorithms to be used to resolve data dependencies" };
57 
58  struct AlgEntry {
59  size_t index;
63 
64  friend bool operator<( AlgEntry const& lhs, AlgEntry const& rhs ) { return lhs.index < rhs.index; }
65 
66  friend bool operator==( AlgEntry const& lhs, AlgEntry const& rhs ) { return lhs.index == rhs.index; }
67 
68  AlgEntry( size_t i, SmartIF<IAlgorithm>&& p )
69  : index{ i }, ialg{ std::move( p ) }, alg{ dynamic_cast<Gaudi::Algorithm*>( ialg.get() ) } {
70  if ( !alg ) throw std::runtime_error( "algorithm pointer == nullptr???" );
71  }
72  };
73 
75  instantiateAndInitializeAlgorithms( const std::vector<std::string>& names ) const; // algorithms must be fully
76  // initialized first, as
77  // doing so may create
78  // additional data
79  // dependencies...
80 
82 
84 
86 
87  void visit( AlgEntry const& alg, std::vector<std::string> const& stoppers, std::vector<Gaudi::Algorithm*>& sorted,
88  std::vector<bool>& visited, std::vector<bool>& visiting ) const;
89 };
90 
92 
93 namespace {
94  struct AlgorithmRepr {
95  const Gaudi::Algorithm& parent;
96 
97  friend std::ostream& operator<<( std::ostream& s, const AlgorithmRepr& a ) {
98  std::string typ = System::typeinfoName( typeid( a.parent ) );
99  s << typ;
100  if ( a.parent.name() != typ ) s << "/" << a.parent.name();
101  return s;
102  }
103  };
104 
105  // Sort a DataObjIDColl in a well-defined, reproducible manner.
106  // Used for making debugging dumps.
107  std::vector<const DataObjID*> sorted_( const DataObjIDColl& coll ) {
109  v.reserve( coll.size() );
110  for ( const DataObjID& id : coll ) v.push_back( &id );
111  std::sort( v.begin(), v.end(),
112  []( const DataObjID* a, const DataObjID* b ) { return a->fullKey() < b->fullKey(); } );
113  return v;
114  }
115 
116  template <typename T>
117  std::vector<const T*> sorted_( const std::set<T*>& s ) {
118  std::vector<const T*> v{ s.begin(), s.end() };
119  std::sort( v.begin(), v.end(), []( const auto* lhs, const auto* rhs ) { return *lhs < *rhs; } );
120  return v;
121  }
122 
123  SmartIF<IAlgorithm> createAlgorithm( IAlgManager& am, const std::string& type, const std::string& name ) {
124  // Maybe modify the AppMgr interface to return Algorithm* ??
125  IAlgorithm* tmp = nullptr;
126  StatusCode sc = am.createAlgorithm( type, name, tmp );
127  return sc.isSuccess() ? dynamic_cast<Gaudi::Algorithm*>( tmp ) : nullptr;
128  }
129 } // namespace
130 
132  return Service::initialize().andThen( [&] {
133  // populate m_algorithms
134  m_algorithms = instantiateAndInitializeAlgorithms( m_producers );
135 
136  // warn about non-reentrant algorithms
137  ranges::for_each( m_algorithms | ranges::views::transform( []( const auto& entry ) { return entry.second.alg; } ) |
138  ranges::views::filter( []( const auto* alg ) { return alg->cardinality() > 0; } ),
139  [&]( const Gaudi::Algorithm* alg ) {
140  this->warning() << "non-reentrant algorithm: " << AlgorithmRepr{ *alg } << endmsg;
141  } );
142  //== Print the list of the created algorithms
143  if ( msgLevel( MSG::DEBUG ) ) {
144  MsgStream& msg = debug();
145  msg << "Available DataProducers: ";
147  msg, m_algorithms, ", ", []( auto& os, const std::pair<std::string, AlgEntry>& e ) -> decltype( auto ) {
148  return os << AlgorithmRepr{ *e.second.alg };
149  } );
150  msg << endmsg;
151  }
152 
153  // populate m_dependencies and set AlgEntry::dependsOn
154  m_dependencies = mapProducers( m_algorithms );
155  } );
156 }
157 
159 
160  StatusCode ss = Service::start();
161  if ( !ss.isSuccess() ) return ss;
162 
163  // sysStart for m_algorithms
164  for ( auto& [name, algEntry] : m_algorithms ) {
165  ss = algEntry.alg->sysStart();
166  if ( ss.isFailure() ) {
167  error() << "Unable to start Algorithm: " << name << endmsg;
168  return ss;
169  }
170  }
171  return ss;
172 }
173 
175  StatusCode ss = Service::stop();
176  if ( !ss.isSuccess() ) return ss;
177 
178  // sysStart for m_algorithms
179  for ( auto& [name, algEntry] : m_algorithms ) {
180  ss = algEntry.alg->sysStop();
181  if ( ss.isFailure() ) {
182  error() << "Unable to stop Algorithm: " << name << endmsg;
183  return ss;
184  }
185  }
186  return ss;
187 }
188 
190  for ( auto& [name, algEntry] : m_algorithms ) {
191  algEntry.alg->sysFinalize().ignore( /* AUTOMATICALLY ADDED FOR gaudi/Gaudi!763 */ );
192  }
193  m_algorithms.clear();
194  return Service::finalize();
195 }
196 
200 
201  //= Get the Application manager, to see if algorithm exist
202  auto appMgr = service<IAlgManager>( "ApplicationMgr" );
203  size_t index = 0;
204  for ( const Gaudi::Utils::TypeNameString item : names ) {
205  const std::string& theName = item.name();
206  const std::string& theType = item.type();
207 
208  //== Check wether the specified algorithm already exists. If not, create it
209  SmartIF<IAlgorithm> myIAlg = appMgr->algorithm( item, false ); // do not create it now
210  if ( !myIAlg ) {
211  myIAlg = createAlgorithm( *appMgr, theType, theName );
212  } else {
213  // when the algorithm is not created, the ref count is short by one, so we
214  // have to fix it.
215  myIAlg->addRef();
216  }
217 
218  if ( !myIAlg ) {
219  throw GaudiException{ "Failed to create " + boost::lexical_cast<std::string>( item ), __func__,
221  }
222 
223  // propagate the sub-algorithm into own state.
224  StatusCode sc = myIAlg->sysInitialize();
225  if ( sc.isFailure() ) {
226  throw GaudiException{ "Failed to initialize " + boost::lexical_cast<std::string>( item ), __func__,
228  }
229 
230  algorithms.emplace( theName, AlgEntry{ index++, std::move( myIAlg ) } );
231  }
232 
233  return algorithms;
234 }
235 
238  if ( msgLevel( MSG::DEBUG ) ) {
239  debug() << "Data Dependencies for Algorithms:";
240  for ( const auto& [name, entry] : m_algorithms ) {
241  debug() << "\n " << name << " :";
242  for ( const auto* id : sorted_( entry.alg->inputDataObjs() ) ) { debug() << "\n o INPUT " << id->key(); }
243  for ( const auto* id : sorted_( entry.alg->outputDataObjs() ) ) { debug() << "\n o OUTPUT " << id->key(); }
244  }
245  debug() << endmsg;
246  }
247 
248  // figure out all outputs
250  for ( auto& [name, alg] : algorithms ) {
251  const auto& output = alg.alg->outputDataObjs();
252  if ( output.empty() ) { continue; }
253  for ( auto id : output ) {
254  auto r = producers.emplace( id, &alg );
255  if ( !r.second ) {
256  throw GaudiException( "multiple algorithms declare " + id.key() + " as output (" + name + " and " +
257  producers[id]->alg->name() + " at least). This is not allowed",
258  __func__, StatusCode::FAILURE );
259  }
260  }
261  }
262 
263  // resolve dependencies
264  for ( auto& [name, algEntry] : algorithms ) {
265  auto input = sorted_( algEntry.alg->inputDataObjs() );
266  for ( const DataObjID* idp : input ) {
267  DataObjID id = *idp;
268  auto iproducer = producers.find( id );
269  if ( iproducer != producers.end() ) {
270  algEntry.dependsOn.insert( iproducer->second );
271  } else {
272  std::ostringstream error_message;
273  error_message << "\nUnknown requested input by " << AlgorithmRepr{ *( algEntry.alg ) } << " : "
274  << std::quoted( id.key(), '\'' ) << ".\n";
275  error_message << "You can set the OutputLevel of HiveDataBrokerSvc to DEBUG to get a list of inputs and "
276  "outputs of every registered algorithm.\n";
277  throw GaudiException( error_message.str(), __func__, StatusCode::FAILURE );
278  // TODO: assign to dataloader!
279  // algEntry.dependsOn.insert(dataloader.alg);
280  // dataloader.data.emplace( id ); // TODO: we may ask to much of the
281  // dataloader this way...
282  }
283  }
284  }
285  return producers;
286 }
287 
291  std::vector<bool>& visiting ) const {
292  assert( visited.size() == m_algorithms.size() );
293  assert( visiting.size() == m_algorithms.size() );
294  if ( visited[alg.index] ) { return; }
295  if ( visiting[alg.index] ) { throw GaudiException( "Cycle detected ", __func__, StatusCode::FAILURE ); }
296 
297  if ( std::none_of( std::begin( stoppers ), std::end( stoppers ),
298  [alg]( auto& stopper ) { return alg.alg->name() == stopper; } ) ) {
299  visiting[alg.index] = true;
300  for ( auto* dep : sorted_( alg.dependsOn ) ) { visit( *dep, stoppers, sorted, visited, visiting ); }
301  visiting[alg.index] = false;
302  }
303 
304  visited[alg.index] = true;
305  sorted.push_back( alg.alg );
306 }
307 
310  const std::vector<std::string>& stoppers ) const {
312 
314  deps.reserve( requested.size() );
315 
316  // start with seeding from the initial request
317  for ( const auto& id : requested ) {
318  auto i = m_dependencies.find( id );
319  if ( i == m_dependencies.end() )
320  throw GaudiException( "unknown requested input: " + id.key(), __func__, StatusCode::FAILURE );
321  deps.push_back( i->second );
322  }
323  // producers may be responsible for multiple requested DataObjID -- make sure they are only mentioned once
324  std::sort( deps.begin(), deps.end(), []( auto const* lhs, auto const* rhs ) { return *lhs < *rhs; } );
325  deps.erase( std::unique( deps.begin(), deps.end(), []( auto const& lhs, auto const& rhs ) { return *lhs == *rhs; } ),
326  deps.end() );
327 
328  std::vector<bool> visited( m_algorithms.size() );
329  std::vector<bool> visiting( m_algorithms.size() );
330  for ( auto* alg : deps ) { visit( *alg, stoppers, result, visited, visiting ); }
331  return result;
332 }
333 
336  const std::vector<std::string>& stoppers ) const {
338 
339  auto it = m_algorithms.find( requested.name() );
340  if ( it == end( m_algorithms ) ) {
341  throw GaudiException{ "No algorithm with name " + requested.name() + " in DataProducers. Type is " +
342  ( requested.haveType() ? requested.type() : "not specified" ),
343  __func__, StatusCode::FAILURE };
344  }
345  auto const& alg = it->second;
346  if ( requested.haveType() && alg.alg->type() != requested.type() ) {
347  error() << "requested " << requested << " but have matching name with different type: " << alg.alg->type()
348  << endmsg;
349  }
350  assert( alg.alg != nullptr );
351 
352  std::vector<bool> visited( m_algorithms.size() );
353  std::vector<bool> visiting( m_algorithms.size() );
354  visit( alg, stoppers, result, visited, visiting );
355 
356  if ( msgLevel( MSG::DEBUG ) ) {
357  debug() << std::endl << "requested " << requested << " returning " << std::endl << " ";
359  debug(), result, ",\n ",
360  []( auto& os, const Gaudi::Algorithm* a ) -> decltype( auto ) { return os << AlgorithmRepr{ *a }; } );
361  debug() << std::endl << endmsg;
362  }
363  return result;
364 }
MSG::DEBUG
@ DEBUG
Definition: IMessageSvc.h:25
HiveDataBrokerSvc::initialize
StatusCode initialize() override
Definition: HiveDataBroker.cpp:131
IAlgManager.h
Service::initialize
StatusCode initialize() override
Definition: Service.cpp:118
std::string
STL class.
Gaudi::Utils::TypeNameString::name
const std::string & name() const
Definition: TypeNameString.h:49
HiveDataBrokerSvc::AlgEntry::index
size_t index
Definition: HiveDataBroker.cpp:59
StatusCode::andThen
StatusCode andThen(F &&f, ARGS &&... args) const
Chain code blocks making the execution conditional a success result.
Definition: StatusCode.h:163
std::move
T move(T... args)
StatusCode::isSuccess
bool isSuccess() const
Definition: StatusCode.h:314
Service::start
StatusCode start() override
Definition: Service.cpp:187
std::unordered_set< DataObjID, DataObjID_Hasher >
System.h
std::pair
std::vector::reserve
T reserve(T... args)
GaudiException.h
gaudirun.s
string s
Definition: gaudirun.py:346
std::vector< Gaudi::Algorithm * >
std::map::find
T find(T... args)
std::unordered_set::size
T size(T... args)
GaudiException
Definition: GaudiException.h:31
HiveDataBrokerSvc::AlgEntry::dependsOn
std::set< AlgEntry const * > dependsOn
Definition: HiveDataBroker.cpp:62
GaudiMP.FdsRegistry.msg
msg
Definition: FdsRegistry.py:19
HiveDataBrokerSvc::AlgEntry::operator==
friend bool operator==(AlgEntry const &lhs, AlgEntry const &rhs)
Definition: HiveDataBroker.cpp:66
std::map::emplace
T emplace(T... args)
ranges
Definition: details.h:30
HiveDataBrokerSvc::mapProducers
std::map< DataObjID, AlgEntry const * > mapProducers(std::map< std::string, AlgEntry > &algorithms) const
Definition: HiveDataBroker.cpp:237
HiveDataBrokerSvc::AlgEntry::AlgEntry
AlgEntry(size_t i, SmartIF< IAlgorithm > &&p)
Definition: HiveDataBroker.cpp:68
std::none_of
T none_of(T... args)
ranges::views
Definition: details.h:30
System::typeinfoName
GAUDI_API const std::string typeinfoName(const std::type_info &)
Get platform independent information about the class type.
Definition: System.cpp:315
gaudirun.output
output
Definition: gaudirun.py:521
IOTest.start
start
Definition: IOTest.py:110
HiveDataBrokerSvc::stop
StatusCode stop() override
Definition: HiveDataBroker.cpp:174
std::sort
T sort(T... args)
Service::finalize
StatusCode finalize() override
Definition: Service.cpp:222
std::vector::push_back
T push_back(T... args)
ManySmallAlgs.alg
alg
Definition: ManySmallAlgs.py:81
IAlgManager
Definition: IAlgManager.h:37
Gaudi::Utils::TypeNameString
Helper class to parse a string of format "type/name".
Definition: TypeNameString.h:20
StatusCode
Definition: StatusCode.h:65
Gaudi::cxx::for_each
void for_each(ContainerOfSynced &c, Fun &&f)
Definition: SynchronizedValue.h:105
IAlgorithm
Definition: IAlgorithm.h:38
Gaudi::Parsers::operator<<
std::ostream & operator<<(std::ostream &o, const Catalog &c)
printout operator
Definition: Catalog.h:68
std::ostream
STL class.
HiveDataBrokerSvc::start
StatusCode start() override
Definition: HiveDataBroker.cpp:158
HiveDataBrokerSvc::AlgEntry::operator<
friend bool operator<(AlgEntry const &lhs, AlgEntry const &rhs)
Definition: HiveDataBroker.cpp:64
HiveDataBrokerSvc::m_algorithms
std::map< std::string, AlgEntry > m_algorithms
Definition: HiveDataBroker.cpp:81
HiveDataBrokerSvc::algorithmsRequiredFor
std::vector< Gaudi::Algorithm * > algorithmsRequiredFor(const DataObjIDColl &requested, const std::vector< std::string > &stoppers={}) const override
Definition: HiveDataBroker.cpp:309
Gaudi::Algorithm
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:90
HiveDataBrokerSvc::instantiateAndInitializeAlgorithms
std::map< std::string, AlgEntry > instantiateAndInitializeAlgorithms(const std::vector< std::string > &names) const
Definition: HiveDataBroker.cpp:198
std::vector::erase
T erase(T... args)
Algorithm.h
std::runtime_error
STL class.
SmartIF< IAlgorithm >
HiveDataBrokerSvc::m_dependencies
std::map< DataObjID, AlgEntry const * > m_dependencies
Definition: HiveDataBroker.cpp:85
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:203
std::map
STL class.
extends
Base class used to extend a class implementing other interfaces.
Definition: extends.h:20
MsgStream
Definition: MsgStream.h:34
IAlgManager::createAlgorithm
virtual StatusCode createAlgorithm(std::string algtype, std::string algname, IAlgorithm *&alg, bool managed=false, bool checkIfExists=true)=0
Create an instance of a algorithm type that has been declared beforehand and assigns to it a name.
Gaudi::Utils::TypeNameString::type
const std::string & type() const
Definition: TypeNameString.h:48
DataObjID
Definition: DataObjID.h:47
IDataBroker.h
StatusCode::ignore
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition: StatusCode.h:139
Service.h
std::ostringstream
STL class.
StatusCode::isFailure
bool isFailure() const
Definition: StatusCode.h:129
gaudirun.type
type
Definition: gaudirun.py:160
ConditionsStallTest.name
name
Definition: ConditionsStallTest.py:77
std::endl
T endl(T... args)
Service::stop
StatusCode stop() override
Definition: Service.cpp:181
event_timeout_check.algorithms
algorithms
Definition: event_timeout_check.py:40
HiveDataBrokerSvc
Definition: HiveDataBroker.cpp:38
SmartIF::get
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:86
std::begin
T begin(T... args)
HiveDataBrokerSvc::AlgEntry::ialg
SmartIF< IAlgorithm > ialg
Definition: HiveDataBroker.cpp:60
DECLARE_COMPONENT
#define DECLARE_COMPONENT(type)
Definition: PluginServiceV1.h:46
Gaudi::Utils::TypeNameString::haveType
bool haveType() const
Definition: TypeNameString.h:50
std::unique
T unique(T... args)
Properties.v
v
Definition: Properties.py:122
std::ostringstream::str
T str(T... args)
std::map::end
T end(T... args)
IOTest.end
end
Definition: IOTest.py:125
HiveDataBrokerSvc::AlgEntry
Definition: HiveDataBroker.cpp:58
HiveDataBrokerSvc::AlgEntry::alg
Gaudi::Algorithm * alg
Definition: HiveDataBroker.cpp:61
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:101
HiveDataBrokerSvc::visit
void visit(AlgEntry const &alg, std::vector< std::string > const &stoppers, std::vector< Gaudi::Algorithm * > &sorted, std::vector< bool > &visited, std::vector< bool > &visiting) const
Implements DFS topological sorting.
Definition: HiveDataBroker.cpp:289
HiveDataBrokerSvc::finalize
StatusCode finalize() override
Definition: HiveDataBroker.cpp:189
ProduceConsume.key
key
Definition: ProduceConsume.py:84
IOTest.appMgr
appMgr
Definition: IOTest.py:105
std::set
STL class.
Gaudi::Property< std::string >
GaudiUtils::details::ostream_joiner
Stream & ostream_joiner(Stream &os, Iterator first, Iterator last, Separator sep, OutputElement output=OutputElement{})
Definition: SerializeSTL.h:75
Gaudi::ParticleProperties::index
size_t index(const Gaudi::ParticleProperty *property, const Gaudi::Interfaces::IParticlePropertySvc *service)
helper utility for mapping of Gaudi::ParticleProperty object into non-negative integral sequential id...
Definition: IParticlePropertySvc.cpp:39