22#include <tbb/concurrent_queue.h> 
   24#include <ThreadLocalStorage.h> 
   33#include <unordered_map> 
   43  using pool_string = std::basic_string<char, std::char_traits<char>, LocalAlloc<char>>;
 
   46    std::unique_ptr<DataObject>        m_data;
 
   47    std::unique_ptr<IOpaqueAddress>    m_addr;
 
   48    pool_string                        m_identifierStorage;
 
   49    mutable std::optional<std::string> m_identifier;
 
   50    static IDataProviderSvc*           s_svc;
 
   53    using allocator_type = LocalAlloc<char>;
 
   54    static void setDataProviderSvc( IDataProviderSvc* p ) { s_svc = p; }
 
   56    Entry( std::string_view 
id, std::unique_ptr<DataObject> data, std::unique_ptr<IOpaqueAddress> addr,
 
   57           allocator_type alloc ) noexcept
 
   58        : m_data{ std::move( data ) }, m_addr{ std::move( addr ) }, m_identifierStorage{ 
id, alloc } {
 
   59      if ( m_data ) m_data->setRegistry( 
this );
 
   60      if ( m_addr ) m_addr->setRegistry( 
this );
 
   62    Entry( 
const Entry& )                = 
delete;
 
   63    Entry& operator=( 
const Entry& rhs ) = 
delete;
 
   64    Entry( Entry&& rhs )                 = 
delete;
 
   65    Entry& operator=( Entry&& rhs )      = 
delete;
 
   68    unsigned long    addRef()
 override { 
return -1; }
 
   69    unsigned long    release()
 override { 
return -1; }
 
   70    const name_type& 
name()
 const override {
 
   72      if ( !m_identifier ) m_identifier.emplace( m_identifierStorage );
 
   75    const id_type& identifier()
 const override {
 
   76      if ( !m_identifier ) m_identifier.emplace( m_identifierStorage );
 
   79    std::string_view  identifierView()
 const { 
return m_identifierStorage; }
 
   80    IDataProviderSvc* dataSvc()
 const override { 
return s_svc; }
 
   82    IOpaqueAddress*   address()
 const override { 
return m_addr.get(); }
 
   83    void              setAddress( IOpaqueAddress* iAddr )
 override {
 
   84      m_addr.reset( iAddr );
 
   85      if ( m_addr ) m_addr->setRegistry( 
this );
 
   91      std::unordered_map<std::string_view, Entry, std::hash<std::string_view>, std::equal_to<std::string_view>,
 
   92                         LocalAlloc<std::pair<const std::string_view, Entry>>>;
 
   93  using OrderedMap = std::map<std::string_view, Entry>;
 
   95  template <
typename Map = UnorderedMap>
 
   97    LocalArena  m_resource;
 
   98    std::size_t m_est_size;
 
  100    std::optional<Map> m_store{ std::in_place, &m_resource };
 
  101    static_assert( std::is_same_v<typename Map::key_type, std::string_view> );
 
  103    const auto& emplace( std::string_view k, std::unique_ptr<DataObject> d, std::unique_ptr<IOpaqueAddress> a = {} ) {
 
  106      auto [i, b] = m_store->try_emplace( k, k, std::move( d ), std::move( a ), &m_resource );
 
  107      if ( !b ) 
throw std::runtime_error( 
"failed to insert " + std::string{ k } );
 
  108      auto nh  = m_store->extract( i );
 
  109      nh.key() = nh.mapped().identifierView(); 
 
  110      auto r   = m_store->insert( std::move( nh ) );
 
  111      if ( !r.inserted ) 
throw std::runtime_error( 
"failed to insert " + std::string{ k } );
 
  112      return r.position->second;
 
  116    Store( std::size_t est_size, std::size_t pool_size ) : m_resource{ pool_size }, m_est_size{ est_size } {}
 
  117    [[nodiscard]] 
bool        empty()
 const { 
return m_store->empty(); }
 
  118    [[nodiscard]] std::size_t 
size()
 const { 
return m_store->size(); }
 
  119    [[nodiscard]] std::size_t used_bytes() const noexcept { 
return m_resource.size(); }
 
  120    [[nodiscard]] std::size_t used_blocks() const noexcept { 
return m_resource.num_blocks(); }
 
  121    [[nodiscard]] std::size_t used_buckets()
 const { 
return m_store->bucket_count(); }
 
  122    [[nodiscard]] std::size_t num_allocations() const noexcept { 
return m_resource.num_allocations(); }
 
  127      m_store.emplace( m_est_size, &m_resource ); 
 
  130    const DataObject* 
put( std::string_view k, std::unique_ptr<DataObject> data,
 
  131                           std::unique_ptr<IOpaqueAddress> addr = {} ) {
 
  132      return emplace( k, std::move( data ), std::move( addr ) ).object();
 
  135      const Entry* d = find( k );
 
  136      return d ? d->object() : 
nullptr;
 
  138    const Entry* find( std::string_view k ) 
const noexcept {
 
  139      auto i = m_store->find( k );
 
  140      return i != m_store->end() ? &( i->second ) : 
nullptr;
 
  143    [[nodiscard]] 
auto begin() const noexcept { 
return m_store->begin(); }
 
  144    [[nodiscard]] 
auto end() const noexcept { 
return m_store->end(); }
 
  145    void               clear() noexcept { m_store->clear(); }
 
  146    auto               erase( std::string_view k ) { 
return m_store->erase( k ); }
 
  147    template <
typename Predicate>
 
  148    void erase_if( Predicate p ) {
 
  149      auto i   = m_store->begin();
 
  150      auto end = m_store->end();
 
  152        if ( std::invoke( p, std::as_const( *i ) ) )
 
  153          i = m_store->erase( i );
 
  163    throw std::logic_error{ 
"Unsupported Function Called: " + 
s + 
"\n" + trace };
 
  167  std::string_view normalize_path( std::string_view path, std::string_view prefix ) {
 
  170    if ( !
path.empty() && 
path.front() == 
'/' ) 
path.remove_prefix( 1 );
 
  176    auto        status  = cnv.
createObj( &addr, pObject ); 
 
  177    auto        object  = std::unique_ptr<DataObject>( pObject );
 
  178    if ( status.isFailure() ) 
object.reset();
 
  183  struct Partition final {
 
  187    std::optional<Store<>> store;
 
  188    int                    eventNumber = -1;
 
  191  template <
typename T, 
typename Mutex = std::recursive_mutex, 
typename ReadLock = std::scoped_lock<Mutex>,
 
  192            typename WriteLock = ReadLock>
 
  198    template <
typename F>
 
  200      WriteLock lock{ m_mtx };
 
  203    template <
typename F>
 
  204    decltype( auto ) 
with_lock( F&& f ) 
const {
 
  205      ReadLock lock{ m_mtx };
 
  210  template <
typename Fun>
 
  212    return [f = std::forward<Fun>( f )]( 
auto& p ) -> 
decltype( 
auto ) { 
return p.with_lock( f ); };
 
  215  TTHREAD_TLS( Synced<Partition>* ) s_current = 
nullptr;
 
  217  template <
typename Fun>
 
  219    return s_current ? s_current->with_lock( std::forward<Fun>( f ) )
 
  239                                       "force creation of default leaves on registerObject" };
 
 
  245                                                  "Estimated number of buckets in the store" };
 
 
  254    auto n_allocs = p.store->num_allocations();
 
 
  285      "InhibitedPathPrefixes",
 
  287      "Prefixes of TES locations that will not be loaded by the persistency service " };
 
 
  289      this, 
"FollowLinksToAncestors", 
true,
 
  290      "Load objects which reside in files other than the one corresponding to the root of the event store" };
 
 
  294  using extends::extends;
 
  297  const std::string& 
rootName() 
const override;
 
  303  StatusCode selectStore( 
size_t partition ) 
override;
 
  305  StatusCode clearStore( 
size_t partition ) 
override;
 
  306  StatusCode setNumberOfStores( 
size_t slots ) 
override;
 
  308  size_t     getPartitionNumber( 
int eventnumber ) 
const override;
 
  311    return findObject( 
id.fullKey(), pObject ).isSuccess();
 
 
  319  StatusCode clearSubTree( std::string_view ) 
override;
 
  338  StatusCode registerObject( std::string_view parentPath, std::string_view objectPath, 
DataObject* pObject ) 
override;
 
  341  StatusCode unregisterObject( std::string_view ) 
override;
 
  374    Entry::setDataProviderSvc( 
this );
 
  375    extends::initialize().ignore();
 
  383      synced_p.with_lock( [
this]( Partition& p ) { 
initStore( p ); } );
 
 
  397      info() << 
"Mean memory pool usage: " << float( 1e-3f * 
float( 
m_usedPoolSize.mean() ) ) << 
" KiB serving " 
  403    return extends::finalize();
 
 
 
  421  size_t slot = std::string::npos;
 
  423    assert( slot != std::string::npos );
 
  425    [[maybe_unused]] 
auto prev = 
m_partitions[slot].with_lock(
 
  426        [evtnumber]( Partition& p ) { 
return std::exchange( p.eventNumber, evtnumber ); } );
 
  427    assert( prev == -1 ); 
 
 
  433  if ( slots < 
size_t{ 1 } ) {
 
  434    error() << 
"Invalid number of slots (" << slots << 
")" << 
endmsg;
 
  438    error() << 
"Too late to change the number of slots!" << 
endmsg;
 
 
  448                         with_lock( [eventnumber]( 
const Partition& p ) { 
return p.eventNumber == eventnumber; } ) );
 
 
  459  auto prev = 
m_partitions[partition].with_lock( []( Partition& p ) { 
return std::exchange( p.eventNumber, -1 ); } );
 
 
  467  return m_partitions[partition].with_lock( [
this]( Partition& p ) {
 
 
  474  top = normalize_path( top, 
rootName() );
 
  475  return fwd( [&]( Partition& p ) {
 
  476    p.store->erase_if( [top]( 
const auto& value ) { 
return value.first.starts_with( top ); } );
 
 
  482  return fwd( [
this]( Partition& p ) {
 
 
  489  return fwd( [&]( Partition& p ) {
 
  490    top                              = normalize_path( top, 
rootName() );
 
  491    unsigned int nbSlashesInRootName = std::count( 
rootName().begin(), 
rootName().end(), 
'/' );
 
  492    auto         cmp = []( 
const Entry* lhs, 
const Entry* rhs ) { 
return lhs->identifier() < rhs->identifier(); };
 
  493    std::set<
const Entry*, 
decltype( cmp )> keys{ std::move( cmp ) };
 
  494    for ( 
const auto& v : *p.store ) {
 
  495      if ( v.second.identifier().starts_with( top ) ) keys.insert( &v.second );
 
  497    auto k = keys.begin();
 
  498    while ( k != keys.end() ) {
 
  499      const auto& 
id     = ( *k )->identifier();
 
  500      int         level  = std::count( 
id.begin(), 
id.end(), 
'/' ) + nbSlashesInRootName;
 
  501      bool        accept = pAgent->
analyse( 
const_cast<Entry*
>( *( k++ ) ), level );
 
  503        k = std::find_if_not( k, keys.end(), [&
id]( 
const auto& e ) { return e->identifier().starts_with( id ); } );
 
 
  511    debug() << 
"setRoot( " << root_path << 
", (DataObject*)" << (
void*)pObject << 
" )" << 
endmsg;
 
  513  if ( !fwd( []( Partition& p ) {
 
 
  521  auto rootAddr = std::unique_ptr<IOpaqueAddress>( pRootAddr );
 
  523    debug() << 
"setRoot( " << root_path << 
", (IOpaqueAddress*)" << rootAddr.get();
 
  524    if ( rootAddr ) 
debug() << 
"[ " << rootAddr->par()[0] << 
", " << rootAddr->par()[1] << 
" ]";
 
  527  if ( !fwd( []( Partition& p ) {
 
  532  if ( !rootAddr ) 
return Status::INVALID_OBJ_ADDR; 
 
  535  if ( !
object ) 
return Status::INVALID_OBJECT;
 
  537  LocalArena dummy_arena{ root_path.size() + 1 };
 
  538  auto       dummy = Entry{ root_path, {}, {}, &dummy_arena };
 
  539  object->setRegistry( &dummy );
 
  540  rootAddr->setRegistry( &dummy );
 
  541  auto status = 
m_dataLoader->fillObjRefs( rootAddr.get(), 
object.get() );
 
  542  if ( status.isSuccess() ) {
 
  543    auto pObject = 
object.get();
 
 
  553  auto addr = std::unique_ptr<IOpaqueAddress>( pAddr );
 
  554  if ( !addr ) 
return Status::INVALID_OBJ_ADDR; 
 
  556    debug() << 
"registerAddress( (IRegistry*)" << (
void*)pReg << 
", " << path << 
", (IOpaqueAddress*)" << addr.get()
 
  557            << 
"[ " << addr->
par()[0] << 
", " << addr->
par()[1] << 
" ]" 
  563      debug() << 
"Attempt to load " << addr->
par()[1] << 
" from file " << addr->
par()[0] << 
" blocked -- different file" 
  569           [addrPath = addr->
par()[1]]( std::string_view prefix ) { return addrPath.starts_with( prefix ); } ) ) {
 
  571      debug() << 
"Attempt to load " << addr->
par()[1] << 
" from file " << addr->
par()[0] << 
" blocked -- path inhibited" 
  577  if ( !
object ) 
return Status::INVALID_OBJECT;
 
  580  LocalArena dummy_arena{ fullpath.size() + 1 };
 
  581  auto       dummy = Entry{ fullpath, {}, {}, &dummy_arena };
 
  582  object->setRegistry( &dummy );
 
  584  auto status = 
m_dataLoader->fillObjRefs( addr.get(), 
object.get() );
 
  585  if ( !status.isSuccess() ) 
return status;
 
  590    auto ptr = 
object.get();
 
  591    debug() << 
"registerAddress: " << std::quoted( normalize_path( fullpath, 
rootName() ) ) << 
" (DataObject*)" 
  592            << 
static_cast<void*
>( ptr ) << ( ptr ? 
" -> " + 
System::typeinfoName( 
typeid( *ptr ) ) : std::string{} )
 
  595  fwd( [&]( Partition& p ) {
 
  596    p.store->put( normalize_path( fullpath, 
rootName() ), std::move( 
object ), std::move( addr ) );
 
 
  603  return parentPath.empty()
 
  605             : 
registerObject( 
nullptr, std::string{ parentPath }.append( 
"/" ).append( objectPath ), pObject );
 
 
  609  return fwd( [&, 
object = std::unique_ptr<DataObject>( pObject ),
 
  610               path = normalize_path( path, 
rootName() )]( Partition& p ) 
mutable {
 
  613      for ( 
auto i = dir.rfind( 
'/' ); i != std::string_view::npos; i = dir.rfind( 
'/' ) ) {
 
  614        dir = dir.substr( 0, i );
 
  615        if ( !p.store->find( dir ) ) {
 
  617            debug() << 
"registerObject: adding directory " << std::quoted( dir ) << 
endmsg;
 
  619          p.store->put( dir, std::unique_ptr<DataObject>{} );
 
  624      auto ptr = 
object.get();
 
  625      debug() << 
"registerObject: " << std::quoted( path ) << 
" (DataObject*)" << 
static_cast<void*
>( ptr )
 
  628    p.store->put( path, std::move( 
object ) );
 
 
  634  return fwd( [&]( Partition& p ) {
 
  635    path    = normalize_path( path, 
rootName() );
 
  636    pObject = 
const_cast<DataObject*
>( p.store->get( path ) );
 
  638      debug() << 
"retrieveObject: " << std::quoted( path ) << 
" (DataObject*)" << (
void*)pObject
 
 
unsigned int CLID
Class ID definition.
TTHREAD_TLS(Synced< Partition > *) s_current
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
#define DECLARE_COMPONENT(type)
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
MsgStream & info() const
shortcut for the method msgStream(MSG::INFO)
MSG::Level msgLevel() const
A DataObject is the base class of any identifiable object on any data store.
IRegistry * registry() const
Get pointer to Registry.
Description of the DataStoreItem class.
Use a minimal event store implementation, and adds everything required to satisfy the IDataProviderSv...
Gaudi::Property< std::size_t > m_poolSize
Gaudi::Accumulators::AveragingCounter< std::size_t > m_usedPoolAllocations
StatusCode updateObject(IRegistry *) override
bool exists(const DataObjID &id) override
StatusCode unlinkObject(IRegistry *, std::string_view) override
Gaudi::Property< std::vector< std::string > > m_inhibitPrefixes
StatusCode objectLeaves(const IRegistry *, std::vector< IRegistry * > &) override
StatusCode setDataLoader(IConversionSvc *svc, IDataProviderSvc *dpsvc) override
StatusCode finalize() override
SmartIF< IConversionSvc > m_dataLoader
StatusCode traverseSubTree(std::string_view, IDataStoreAgent *) override
StatusCode preLoad() override
StatusCode addPreLoadItem(const DataStoreItem &) override
StatusCode retrieveObject(IRegistry *pDirectory, std::string_view path, DataObject *&pObject) override
std::size_t poolSize() const
StatusCode unregisterAddress(IRegistry *, std::string_view) override
Gaudi::Property< bool > m_followLinksToAncestors
StatusCode updateObject(DataObject *) override
Gaudi::Accumulators::AveragingCounter< std::size_t > m_storeBuckets
StatusCode findObject(IRegistry *pDirectory, std::string_view path, DataObject *&pObject) override
StatusCode removePreLoadItem(const DataStoreItem &) override
StatusCode unregisterObject(DataObject *obj) override
size_t freeSlots() override
StatusCode clearSubTree(std::string_view) override
Gaudi::Property< size_t > m_slots
StatusCode traverseTree(IDataStoreAgent *pAgent) override
StatusCode selectStore(size_t partition) override
Activate a partition object. The identifies the partition uniquely.
Gaudi::Property< CLID > m_rootCLID
StatusCode initialize() override
StatusCode unregisterAddress(std::string_view) override
StatusCode traverseSubTree(DataObject *obj, IDataStoreAgent *pAgent) override
StatusCode registerAddress(std::string_view fullPath, IOpaqueAddress *pAddress) override
Gaudi::Property< std::string > m_loader
Gaudi::Accumulators::AveragingCounter< std::size_t > m_servedPoolAllocations
StatusCode unregisterObject(std::string_view) override
Gaudi::Property< bool > m_printPoolStats
std::vector< Synced< Partition > > m_partitions
The actual store(s)
void fillStats(Partition &p) const
StatusCode setNumberOfStores(size_t slots) override
Set the number of event slots (copies of DataSvc objects).
StatusCode objectParent(const IRegistry *, IRegistry *&) override
StatusCode objectParent(const DataObject *, IRegistry *&) override
StatusCode unregisterObject(DataObject *obj, std::string_view sr) override
void initStore(Partition &p) const
size_t getPartitionNumber(int eventnumber) const override
Get the partition number corresponding to a given event.
StatusCode setRoot(std::string root_name, DataObject *pObject) override
StatusCode clearSubTree(DataObject *obj) override
size_t getNumberOfStores() const override
StatusCode registerObject(std::string_view parentPath, std::string_view objectPath, DataObject *pObject) override
Gaudi::Accumulators::AveragingCounter< std::size_t > m_usedPoolSize
StatusCode resetPreLoad() override
tbb::concurrent_queue< size_t > m_freeSlots
StatusCode linkObject(IRegistry *, std::string_view, DataObject *) override
Gaudi::Property< bool > m_forceLeaves
Gaudi::Accumulators::AveragingCounter< std::size_t > m_storeEntries
StatusCode freeStore(size_t partition) override
Free a store partition.
size_t allocateStore(int evtnumber) override
Allocate a store partition for a given event number.
const std::string & rootName() const override
Gaudi::Property< std::size_t > m_estStoreBuckets
Gaudi::Property< std::string > m_rootName
std::string_view m_onlyThisID
CLID rootCLID() const override
StatusCode objectLeaves(const DataObject *, std::vector< IRegistry * > &) override
StatusCode linkObject(std::string_view, DataObject *) override
StatusCode clearStore() override
StatusCode unlinkObject(std::string_view) override
StatusCode unlinkObject(DataObject *, std::string_view) override
std::vector< DataStoreItem > m_preLoads
Items to be pre-loaded.
A fast memory arena that does not track deallocations.
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
Implementation of property with value of concrete type.
Define general base for Gaudi exception.
virtual StatusCode createObj(IOpaqueAddress *pAddress, DataObject *&refpObject)=0
Create the transient representation of an object.
Data provider interface definition.
@ INVALID_ROOT
Invalid root path object cannot be retrieved or stored.
Generic data agent interface.
virtual bool analyse(IRegistry *pObject, int level)=0
Analyse the data object.
Opaque address interface definition.
virtual void setRegistry(IRegistry *r)=0
Update directory pointer.
virtual const std::string * par() const =0
Retrieve String parameters.
The IRegistry represents the entry door to the environment any data object residing in a transient da...
virtual const id_type & identifier() const =0
Full identifier (or key)
virtual void setAddress(IOpaqueAddress *pAddress)=0
Set/Update Opaque storage address.
Gaudi::StateMachine::State FSMState() const override
SmartIF< ISvcLocator > & serviceLocator() const override
Retrieve pointer to service locator.
Small smart pointer class with automatic reference counting for IInterface.
This class is used for returning status codes from appropriate routines.
const StatusCode & ignore() const
Allow discarding a StatusCode without warning.
constexpr static const auto SUCCESS
constexpr static const auto FAILURE
Base class used to extend a class implementing other interfaces.
::Gaudi::Allocator::Arena<::Gaudi::Arena::Monotonic< Alignment, UpstreamAllocator >, T, DefaultResource > MonotonicArena
auto put(const DataObjectHandle< Out1 > &out_handle, Out2 &&out)
GAUDI_API std::string path(const AIDA::IBaseHistogram *aida)
get the path in THS for AIDA histogram
AttribStringParser::Iterator begin(const AttribStringParser &parser)
get
decorate the vector of properties
GAUDI_API int backTrace(void **addresses, const int depth)
GAUDI_API const std::string typeinfoName(const std::type_info &)
Get platform independent information about the class type.
constexpr auto size(const T &, Args &&...) noexcept
A counter aiming at computing sum and average.