The Gaudi Framework  master (01b473db)
HiveDataBroker.cpp
Go to the documentation of this file.
1 /***********************************************************************************\
2 * (c) Copyright 1998-2025 CERN for the benefit of the LHCb and ATLAS collaborations *
3 * *
4 * This software is distributed under the terms of the Apache version 2 licence, *
5 * copied verbatim in the file "LICENSE". *
6 * *
7 * In applying this licence, CERN does not waive the privileges and immunities *
8 * granted to it by virtue of its status as an Intergovernmental Organization *
9 * or submit itself to any jurisdiction. *
10 \***********************************************************************************/
11 #include "GraphDumper.h"
12 
13 #include <Gaudi/Algorithm.h>
17 #include <GaudiKernel/Service.h>
18 #include <GaudiKernel/System.h>
19 #include <algorithm>
20 #include <boost/lexical_cast.hpp>
21 #include <boost/tokenizer.hpp>
22 #include <iomanip>
23 #include <ranges>
24 #include <stdexcept>
25 
26 namespace {
27  struct AlgEntry {
28  size_t index;
31  std::set<AlgEntry const*> dependsOn;
32  std::vector<DataObjID const*> inputs;
33  std::vector<DataObjID const*> outputs;
34 
35  friend bool operator<( AlgEntry const& lhs, AlgEntry const& rhs ) { return lhs.index < rhs.index; }
36 
37  friend bool operator==( AlgEntry const& lhs, AlgEntry const& rhs ) { return lhs.index == rhs.index; }
38 
39  AlgEntry( size_t i, SmartIF<IAlgorithm>&& p )
40  : index{ i }, ialg{ std::move( p ) }, alg{ dynamic_cast<Gaudi::Algorithm*>( ialg.get() ) } {
41  if ( !alg ) throw std::runtime_error( "algorithm pointer == nullptr???" );
42  // gather _all_ the inputs and outputs in a well-defined, reproducible manner,
43  // removing any duplication (sometimes, extra{Out,In}putDeps entries will already appear in {Out,In}putDataObjs)
44  constexpr auto gather = []( auto& c, auto const& in1, auto const& in2 ) {
45  for ( const DataObjID& id : in1 ) c.push_back( &id );
46  for ( const DataObjID& id : in2 ) c.push_back( &id );
47  constexpr auto by_key = []( const DataObjID* id ) { return id->fullKey(); };
48  std::ranges::sort( c, std::less{}, by_key );
49  auto od = std::ranges::unique( c, std::equal_to{}, by_key );
50  c.erase( od.begin(), od.end() );
51  };
52  gather( outputs, alg->outputDataObjs(), alg->extraOutputDeps() );
53  gather( inputs, alg->inputDataObjs(), alg->extraInputDeps() );
54  }
55  };
56 
57  template <std::ranges::range R>
58  requires std::common_reference_with<std::ranges::range_reference_t<R>, const AlgEntry&>
59  void dumpGraphFile( std::string const& fname, R const& algorithms ) {
60  Gaudi::Hive::Graph g{ fname };
61 
62  // loop over all algorithms to create nodes
63  for ( const AlgEntry& entry : algorithms ) { g.addNode( entry.alg->name(), std::to_string( entry.index ) ); }
64 
65  // loop over all algorithms to create list of outputs with corresponding alg indexes
66  std::unordered_map<std::string, size_t> output2Idx;
67  for ( const AlgEntry& entry : algorithms ) {
68  for ( const auto* id : entry.outputs ) { output2Idx[id->key()] = entry.index; }
69  }
70 
71  // loop over all algorithms to create edges
72  for ( const AlgEntry& entry : algorithms ) {
73  for ( const auto* id : entry.inputs ) {
74  g.addEdge( entry.alg->name(), std::to_string( entry.index ), id->key(), std::to_string( output2Idx[id->key()] ),
75  id->key() );
76  }
77  }
78  }
79 
80  struct AlgorithmRepr {
81  const Gaudi::Algorithm& parent;
82 
83  friend std::ostream& operator<<( std::ostream& s, const AlgorithmRepr& a ) {
84  std::string typ = System::typeinfoName( typeid( a.parent ) );
85  s << typ;
86  if ( a.parent.name() != typ ) s << "/" << a.parent.name();
87  return s;
88  }
89  };
90 
91  // Used for making debugging dumps.
92  template <typename T>
93  std::vector<const T*> sorted_( const std::set<T*>& s ) {
94  std::vector<const T*> v{ s.begin(), s.end() };
95  std::sort( v.begin(), v.end(), []( const auto* lhs, const auto* rhs ) { return *lhs < *rhs; } );
96  return v;
97  }
98 
99  SmartIF<IAlgorithm> createAlgorithm( IAlgManager& am, const std::string& type, const std::string& name ) {
100  // Maybe modify the AppMgr interface to return Algorithm* ??
101  IAlgorithm* tmp = nullptr;
102  StatusCode sc = am.createAlgorithm( type, name, tmp );
103  return sc.isSuccess() ? dynamic_cast<Gaudi::Algorithm*>( tmp ) : nullptr;
104  }
105 } // namespace
106 
107 class HiveDataBrokerSvc final : public extends<Service, IDataBroker> {
108 public:
109  using extends::extends;
110 
111  std::vector<Gaudi::Algorithm*> algorithmsRequiredFor( const DataObjIDColl& requested,
112  const std::vector<std::string>& stoppers = {} ) const override;
113  std::vector<Gaudi::Algorithm*> algorithmsRequiredFor( const Gaudi::Utils::TypeNameString& alg,
114  const std::vector<std::string>& stoppers = {} ) const override;
115 
116  StatusCode initialize() override;
117  StatusCode start() override;
118  StatusCode stop() override;
119  StatusCode finalize() override;
120 
121 private:
123  "Attribute any unmet input dependencies to this Algorithm" };
125  this, "DataProducers", {}, "List of algorithms to be used to resolve data dependencies" };
126 
128  this, "DataDepsGraphFile", "",
129  "Name of the output file (.dot or .md extensions allowed) containing the data dependency graph. If empty, no "
130  "graph is dumped" };
131 
132  std::map<std::string, AlgEntry>
133  instantiateAndInitializeAlgorithms( const std::vector<std::string>& names ) const; // algorithms must be fully
134  // initialized first, as
135  // doing so may create
136  // additional data
137  // dependencies...
138 
139  std::map<std::string, AlgEntry> m_algorithms;
140 
141  std::map<DataObjID, AlgEntry const*> mapProducers( std::map<std::string, AlgEntry>& algorithms ) const;
142 
143  std::map<DataObjID, AlgEntry const*> m_dependencies;
144 
145  void visit( AlgEntry const& alg, std::vector<std::string> const& stoppers, std::vector<Gaudi::Algorithm*>& sorted,
146  std::vector<bool>& visited, std::vector<bool>& visiting ) const;
147 };
148 
150 
152  return Service::initialize().andThen( [&] {
153  // populate m_algorithms
154  m_algorithms = instantiateAndInitializeAlgorithms( m_producers );
155 
156  // warn about non-reentrant algorithms
157  std::ranges::for_each( m_algorithms | std::ranges::views::transform( []( const auto& entry ) -> decltype( auto ) {
158  return entry.second.alg;
159  } ) |
160  std::ranges::views::filter( []( const auto* alg ) { return alg->cardinality() > 0; } ),
161  [&]( const Gaudi::Algorithm* alg ) {
162  this->warning() << "non-reentrant algorithm: " << AlgorithmRepr{ *alg } << endmsg;
163  } );
164  //== Print the list of the created algorithms
165  if ( msgLevel( MSG::DEBUG ) ) {
166  debug() << "Available DataProducers:\n";
167  std::ranges::for_each( m_algorithms | std::ranges::views::transform( []( const auto& entry ) -> decltype( auto ) {
168  return entry.second.alg;
169  } ),
170  [&]( const Gaudi::Algorithm* alg ) {
171  this->debug() << " " << AlgorithmRepr{ *alg } << " " << alg->outputDataObjs() << " "
172  << alg->extraOutputDeps() << endmsg;
173  } );
174  }
175 
176  // populate m_dependencies and set AlgEntry::dependsOn
177  m_dependencies = mapProducers( m_algorithms );
178  } );
179 }
180 
182 
183  StatusCode ss = Service::start();
184  if ( !ss.isSuccess() ) return ss;
185 
186  // sysStart for m_algorithms
187  for ( auto& [name, algEntry] : m_algorithms ) {
188  ss = algEntry.alg->sysStart();
189  if ( ss.isFailure() ) {
190  error() << "Unable to start Algorithm: " << name << endmsg;
191  return ss;
192  }
193  }
194  return ss;
195 }
196 
198  StatusCode ss = Service::stop();
199  if ( !ss.isSuccess() ) return ss;
200 
201  // sysStart for m_algorithms
202  for ( auto& [name, algEntry] : m_algorithms ) {
203  ss = algEntry.alg->sysStop();
204  if ( ss.isFailure() ) {
205  error() << "Unable to stop Algorithm: " << name << endmsg;
206  return ss;
207  }
208  }
209  return ss;
210 }
211 
213  for ( auto& [name, algEntry] : m_algorithms ) {
214  algEntry.alg->sysFinalize().ignore( /* AUTOMATICALLY ADDED FOR gaudi/Gaudi!763 */ );
215  }
216  m_algorithms.clear();
217  return Service::finalize();
218 }
219 
220 std::map<std::string, AlgEntry>
221 HiveDataBrokerSvc::instantiateAndInitializeAlgorithms( const std::vector<std::string>& names ) const {
222  std::map<std::string, AlgEntry> algorithms;
223 
224  //= Get the Application manager, to see if algorithm exist
225  auto appMgr = service<IAlgManager>( "ApplicationMgr" );
226  size_t index = 0;
227  for ( const std::string& item : names ) {
228  const Gaudi::Utils::TypeNameString tn( item );
229 
230  //== Check wether the specified algorithm already exists. If not, create it
231  SmartIF<IAlgorithm> myIAlg = appMgr->algorithm( item, false ); // do not create it now
232  if ( !myIAlg ) {
233  myIAlg = createAlgorithm( *appMgr, tn.type(), tn.name() );
234  } else {
235  // when the algorithm is not created, the ref count is short by one, so we
236  // have to fix it.
237  myIAlg->addRef();
238  }
239 
240  if ( !myIAlg ) {
241  throw GaudiException{ "Failed to create " + boost::lexical_cast<std::string>( item ), __func__,
243  }
244 
245  // propagate the sub-algorithm into own state.
246  StatusCode sc = myIAlg->sysInitialize();
247  if ( sc.isFailure() ) {
248  throw GaudiException{ "Failed to initialize " + boost::lexical_cast<std::string>( item ), __func__,
250  }
251 
252  algorithms.emplace( tn.name(), AlgEntry{ index++, std::move( myIAlg ) } );
253  }
254 
255  return algorithms;
256 }
257 
258 std::map<DataObjID, AlgEntry const*>
259 HiveDataBrokerSvc::mapProducers( std::map<std::string, AlgEntry>& algorithms ) const {
260  if ( msgLevel( MSG::DEBUG ) ) {
261  debug() << "Data Dependencies for Algorithms:";
262  for ( const auto& [name, entry] : m_algorithms ) {
263  debug() << "\n " << name << " :";
264  for ( const auto* id : entry.inputs ) { debug() << "\n o INPUT " << id->key(); }
265  for ( const auto* id : entry.outputs ) { debug() << "\n o OUTPUT " << id->key(); }
266  }
267  debug() << endmsg;
268  }
269 
270  // If requested, dump a graph of the data dependencies in a .dot or .md file
271  if ( !m_dataDepsGraphFile.empty() ) {
272  info() << "Dumping data dependencies graph to file: " << m_dataDepsGraphFile.value() << endmsg;
273  dumpGraphFile( m_dataDepsGraphFile, m_algorithms | std::views::values );
274  }
275 
276  // figure out all outputs
277  std::map<DataObjID, const AlgEntry*> producers;
278  for ( auto& [name, alg] : algorithms ) {
279  for ( auto id : alg.outputs ) {
280  auto r = producers.emplace( *id, &alg );
281  if ( !r.second ) {
282  throw GaudiException( "multiple algorithms declare " + id->key() + " as output (" + name + " and " +
283  producers[*id]->alg->name() + " at least). This is not allowed",
284  __func__, StatusCode::FAILURE );
285  }
286  }
287  }
288 
289  // resolve dependencies
290  for ( auto& [name, algEntry] : algorithms ) {
291  for ( const DataObjID* idp : algEntry.inputs ) {
292  DataObjID id = *idp;
293  auto iproducer = producers.find( id );
294  if ( iproducer != producers.end() ) {
295  algEntry.dependsOn.insert( iproducer->second );
296  } else {
297  std::ostringstream error_message;
298  error_message << "\nUnknown requested input by " << AlgorithmRepr{ *( algEntry.alg ) } << " : "
299  << std::quoted( id.key(), '\'' ) << ".\n";
300  error_message << "You can set the OutputLevel of HiveDataBrokerSvc to DEBUG to get a list of inputs and "
301  "outputs of every registered algorithm.\n";
302  throw GaudiException( error_message.str(), __func__, StatusCode::FAILURE );
303  // TODO: assign to dataloader!
304  // algEntry.dependsOn.insert(dataloader.alg);
305  // dataloader.data.emplace( id ); // TODO: we may ask to much of the
306  // dataloader this way...
307  }
308  }
309  }
310  return producers;
311 }
312 
314 void HiveDataBrokerSvc::visit( AlgEntry const& alg, std::vector<std::string> const& stoppers,
315  std::vector<Gaudi::Algorithm*>& sorted, std::vector<bool>& visited,
316  std::vector<bool>& visiting ) const {
317  assert( visited.size() == m_algorithms.size() );
318  assert( visiting.size() == m_algorithms.size() );
319  if ( visited[alg.index] ) { return; }
320  if ( visiting[alg.index] ) { throw GaudiException( "Cycle detected ", __func__, StatusCode::FAILURE ); }
321 
322  if ( std::none_of( std::begin( stoppers ), std::end( stoppers ),
323  [alg]( auto& stopper ) { return alg.alg->name() == stopper; } ) ) {
324  visiting[alg.index] = true;
325  for ( auto* dep : sorted_( alg.dependsOn ) ) { visit( *dep, stoppers, sorted, visited, visiting ); }
326  visiting[alg.index] = false;
327  }
328 
329  visited[alg.index] = true;
330  sorted.push_back( alg.alg );
331 }
332 
333 std::vector<Gaudi::Algorithm*>
335  const std::vector<std::string>& stoppers ) const {
336  std::vector<Gaudi::Algorithm*> result;
337 
338  std::vector<const AlgEntry*> deps;
339  deps.reserve( requested.size() );
340 
341  // start with seeding from the initial request
342  for ( const auto& id : requested ) {
343  auto i = m_dependencies.find( id );
344  if ( i == m_dependencies.end() )
345  throw GaudiException( "unknown requested input: " + id.key(), __func__, StatusCode::FAILURE );
346  deps.push_back( i->second );
347  }
348  // producers may be responsible for multiple requested DataObjID -- make sure they are only mentioned once
349  std::sort( deps.begin(), deps.end(), []( auto const* lhs, auto const* rhs ) { return *lhs < *rhs; } );
350  deps.erase( std::unique( deps.begin(), deps.end(), []( auto const& lhs, auto const& rhs ) { return *lhs == *rhs; } ),
351  deps.end() );
352 
353  std::vector<bool> visited( m_algorithms.size() );
354  std::vector<bool> visiting( m_algorithms.size() );
355  for ( auto* alg : deps ) { visit( *alg, stoppers, result, visited, visiting ); }
356  return result;
357 }
358 
359 std::vector<Gaudi::Algorithm*>
361  const std::vector<std::string>& stoppers ) const {
362  std::vector<Gaudi::Algorithm*> result;
363 
364  auto it = m_algorithms.find( requested.name() );
365  if ( it == end( m_algorithms ) ) {
366  throw GaudiException{ "No algorithm with name " + requested.name() + " in DataProducers. Type is " +
367  ( requested.haveType() ? requested.type() : "not specified" ),
368  __func__, StatusCode::FAILURE };
369  }
370  auto const& alg = it->second;
371  if ( requested.haveType() && alg.alg->type() != requested.type() ) {
372  error() << "requested " << requested << " but have matching name with different type: " << alg.alg->type()
373  << endmsg;
374  }
375  assert( alg.alg != nullptr );
376 
377  std::vector<bool> visited( m_algorithms.size() );
378  std::vector<bool> visiting( m_algorithms.size() );
379  visit( alg, stoppers, result, visited, visiting );
380 
381  if ( msgLevel( MSG::DEBUG ) ) {
382  debug() << std::endl << "requested " << requested << " returning " << std::endl << " ";
384  debug(), result, ",\n ",
385  []( auto& os, const Gaudi::Algorithm* a ) -> decltype( auto ) { return os << AlgorithmRepr{ *a }; } );
386  debug() << std::endl << endmsg;
387  }
388  return result;
389 }
MSG::DEBUG
@ DEBUG
Definition: IMessageSvc.h:22
Histograms_with_global.algorithms
algorithms
Definition: Histograms_with_global.py:19
HiveDataBrokerSvc::initialize
StatusCode initialize() override
Definition: HiveDataBroker.cpp:151
GraphDumper.h
IAlgManager.h
Service::initialize
StatusCode initialize() override
Definition: Service.cpp:118
HiveDataBrokerSvc::m_producers
Gaudi::Property< std::vector< std::string > > m_producers
Definition: HiveDataBroker.cpp:124
operator<
bool operator<(backwards_compatibility_hack_time_timespan, backwards_compatibility_hack_time_timespan)
Definition: Time.icpp:219
Gaudi::Utils::TypeNameString::name
const std::string & name() const
Definition: TypeNameString.h:48
StatusCode::andThen
StatusCode andThen(F &&f, ARGS &&... args) const
Chain code blocks making the execution conditional a success result.
Definition: StatusCode.h:163
StatusCode::isSuccess
bool isSuccess() const
Definition: StatusCode.h:314
Service::start
StatusCode start() override
Definition: Service.cpp:187
System.h
GaudiException.h
gaudirun.s
string s
Definition: gaudirun.py:346
HiveDataBrokerSvc::instantiateAndInitializeAlgorithms
std::map< std::string, AlgEntry > instantiateAndInitializeAlgorithms(const std::vector< std::string > &names) const
Definition: HiveDataBroker.cpp:221
GaudiException
Definition: GaudiException.h:29
gaudirun.c
c
Definition: gaudirun.py:525
Gaudi::Hive::Graph
utilities to dump graphs in different formats
Definition: GraphDumper.h:30
System::typeinfoName
GAUDI_API const std::string typeinfoName(const std::type_info &)
Get platform independent information about the class type.
Definition: System.cpp:260
Gaudi::Functional::details::detail2::requires
requires requires
Definition: details.h:419
GaudiPartProp.tests.id
id
Definition: tests.py:111
HiveDataBrokerSvc::stop
StatusCode stop() override
Definition: HiveDataBroker.cpp:197
CommonMessaging< implements< IService, IProperty, IStateful > >::msgLevel
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
Definition: CommonMessaging.h:147
Service::finalize
StatusCode finalize() override
Definition: Service.cpp:223
HiveDataBrokerSvc::m_dataLoader
Gaudi::Property< std::string > m_dataLoader
Definition: HiveDataBroker.cpp:122
ManySmallAlgs.alg
alg
Definition: ManySmallAlgs.py:81
IAlgManager
Definition: IAlgManager.h:34
Gaudi::Utils::begin
AttribStringParser::Iterator begin(const AttribStringParser &parser)
Definition: AttribStringParser.h:135
Gaudi::Utils::TypeNameString
Helper class to parse a string of format "type/name".
Definition: TypeNameString.h:19
Service::name
const std::string & name() const override
Retrieve name of the service
Definition: Service.cpp:333
StatusCode
Definition: StatusCode.h:64
Gaudi::cxx::for_each
void for_each(ContainerOfSynced &c, Fun &&f)
Definition: SynchronizedValue.h:98
IAlgorithm
Definition: IAlgorithm.h:36
gaudirun.g
dictionary g
Definition: gaudirun.py:582
Gaudi::Parsers::operator<<
std::ostream & operator<<(std::ostream &o, const Catalog &c)
printout operator
Definition: Catalog.h:49
HiveDataBrokerSvc::start
StatusCode start() override
Definition: HiveDataBroker.cpp:181
HiveDataBrokerSvc::m_algorithms
std::map< std::string, AlgEntry > m_algorithms
Definition: HiveDataBroker.cpp:139
HiveDataBrokerSvc::algorithmsRequiredFor
std::vector< Gaudi::Algorithm * > algorithmsRequiredFor(const DataObjIDColl &requested, const std::vector< std::string > &stoppers={}) const override
Definition: HiveDataBroker.cpp:334
Gaudi::Algorithm
Base class from which all concrete algorithm classes should be derived.
Definition: Algorithm.h:87
Gaudi::Property::value
const ValueType & value() const
Definition: Property.h:229
Algorithm.h
SmartIF< IAlgorithm >
HiveDataBrokerSvc::m_dataDepsGraphFile
Gaudi::Property< std::string > m_dataDepsGraphFile
Definition: HiveDataBroker.cpp:127
HiveDataBrokerSvc::m_dependencies
std::map< DataObjID, AlgEntry const * > m_dependencies
Definition: HiveDataBroker.cpp:143
DataObjIDColl
std::unordered_set< DataObjID, DataObjID_Hasher > DataObjIDColl
Definition: DataObjID.h:121
endmsg
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
Definition: MsgStream.h:198
extends
Base class used to extend a class implementing other interfaces.
Definition: extends.h:19
Gaudi::Utils::TypeNameString::type
const std::string & type() const
Definition: TypeNameString.h:47
DataObjID
Definition: DataObjID.h:47
IDataBroker.h
StatusCode::ignore
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
Definition: StatusCode.h:139
Service.h
StatusCode::isFailure
bool isFailure() const
Definition: StatusCode.h:129
gaudirun.type
type
Definition: gaudirun.py:160
ConditionsStallTest.name
name
Definition: ConditionsStallTest.py:77
Service::stop
StatusCode stop() override
Definition: Service.cpp:181
HiveDataBrokerSvc
Definition: HiveDataBroker.cpp:107
SmartIF::get
TYPE * get() const
Get interface pointer.
Definition: SmartIF.h:82
DECLARE_COMPONENT
#define DECLARE_COMPONENT(type)
Definition: PluginServiceV1.h:45
Gaudi::Utils::TypeNameString::haveType
bool haveType() const
Definition: TypeNameString.h:49
IAlgManager::createAlgorithm
virtual StatusCode createAlgorithm(const std::string &algtype, const std::string &algname, IAlgorithm *&alg, bool managed=false, bool checkIfExists=true)=0
Create an instance of a algorithm type that has been declared beforehand and assigns to it a name.
Properties.v
v
Definition: Properties.py:122
HiveDataBrokerSvc::mapProducers
std::map< DataObjID, AlgEntry const * > mapProducers(std::map< std::string, AlgEntry > &algorithms) const
Definition: HiveDataBroker.cpp:259
IOTest.end
end
Definition: IOTest.py:125
StatusCode::FAILURE
constexpr static const auto FAILURE
Definition: StatusCode.h:100
HiveDataBrokerSvc::visit
void visit(AlgEntry const &alg, std::vector< std::string > const &stoppers, std::vector< Gaudi::Algorithm * > &sorted, std::vector< bool > &visited, std::vector< bool > &visiting) const
Implements DFS topological sorting.
Definition: HiveDataBroker.cpp:314
HiveDataBrokerSvc::finalize
StatusCode finalize() override
Definition: HiveDataBroker.cpp:212
ProduceConsume.key
key
Definition: ProduceConsume.py:84
IOTest.appMgr
appMgr
Definition: IOTest.py:105
Gaudi::Details::operator==
bool operator==(const PropertyId &lhs, const PropertyId &rhs)
Definition: PropertyId.h:103
Gaudi::Property< std::string >
GaudiUtils::details::ostream_joiner
Stream & ostream_joiner(Stream &os, Iterator first, Iterator last, Separator sep, OutputElement output=OutputElement{})
Definition: SerializeSTL.h:86
Gaudi::ParticleProperties::index
size_t index(const Gaudi::ParticleProperty *property, const Gaudi::Interfaces::IParticlePropertySvc *service)
helper utility for mapping of Gaudi::ParticleProperty object into non-negative integral sequential id...
Definition: IParticlePropertySvc.cpp:39