23 #include "ThreadLocalStorage.h" 25 #include "boost/algorithm/string/predicate.hpp" 27 #include "tbb/concurrent_queue.h" 28 #include "tbb/tbb_stddef.h" 29 #if TBB_INTERFACE_VERSION_MAJOR < 12 30 # include "tbb/recursive_mutex.h" 31 #endif // TBB_INTERFACE_VERSION_MAJOR < 12 39 #include <type_traits> 40 #include <unordered_map> 48 using LocalAlloc = Gaudi::Allocator::MonotonicArena<T>;
55 pool_string m_identifierStorage;
56 mutable std::optional<std::string> m_identifier;
60 using allocator_type = LocalAlloc<char>;
64 allocator_type alloc ) noexcept
65 : m_data{
std::move( data )}, m_addr{
std::move( addr )}, m_identifierStorage{id, alloc} {
66 if ( m_data ) m_data->setRegistry(
this );
69 Entry(
const Entry& ) =
delete;
70 Entry& operator=(
const Entry& rhs ) =
delete;
71 Entry( Entry&& rhs ) =
delete;
72 Entry& operator=( Entry&& rhs ) =
delete;
75 unsigned long addRef()
override {
return -1; }
76 unsigned long release()
override {
return -1; }
77 const name_type&
name()
const override {
79 if ( !m_identifier ) m_identifier.emplace( m_identifierStorage );
83 if ( !m_identifier ) m_identifier.emplace( m_identifierStorage );
86 std::string_view identifierView()
const {
return m_identifierStorage; }
88 DataObject*
object()
const override {
return const_cast<DataObject*>( m_data.get() ); }
91 m_addr.
reset( iAddr );
99 LocalAlloc<std::pair<const std::string_view, Entry>>>;
102 template <
typename Map = UnorderedMap>
104 LocalArena m_resource;
107 std::optional<Map> m_store{std::in_place, &m_resource};
108 static_assert( std::is_same_v<typename Map::key_type, std::string_view> );
113 auto [i, b] = m_store->try_emplace( k, k,
std::move( d ),
std::move( a ), &m_resource );
115 auto nh = m_store->extract( i );
116 nh.key() = nh.mapped().identifierView();
117 auto r = m_store->insert(
std::move( nh ) );
119 return r.position->second;
124 [[nodiscard]]
bool empty()
const {
return m_store->empty(); }
126 [[nodiscard]]
std::size_t used_bytes() const noexcept {
return m_resource.size(); }
127 [[nodiscard]]
std::size_t used_blocks() const noexcept {
return m_resource.num_blocks(); }
128 [[nodiscard]]
std::size_t used_buckets()
const {
return m_store->bucket_count(); }
129 [[nodiscard]]
std::size_t num_allocations() const noexcept {
return m_resource.num_allocations(); }
134 m_store.emplace( m_est_size, &m_resource );
142 const Entry* d =
find( k );
143 return d ? d->object() :
nullptr;
145 const Entry*
find( std::string_view k )
const noexcept {
146 auto i = m_store->find( k );
147 return i != m_store->end() ? &( i->second ) :
nullptr;
150 [[nodiscard]]
auto begin() const noexcept {
return m_store->begin(); }
151 [[nodiscard]]
auto end() const noexcept {
return m_store->end(); }
152 void clear() noexcept { m_store->clear(); }
153 auto erase( std::string_view k ) {
return m_store->erase( k ); }
154 template <
typename Predicate>
155 void erase_if( Predicate p ) {
156 auto i = m_store->begin();
157 auto end = m_store->end();
159 if ( std::invoke( p, std::as_const( *i ) ) )
160 i = m_store->erase( i );
174 std::string_view normalize_path( std::string_view
path, std::string_view
prefix ) {
177 if ( !
path.empty() &&
path.front() ==
'/' )
path.remove_prefix( 1 );
183 auto status = cnv.
createObj( &addr, pObject );
185 if ( status.isFailure() )
object.reset();
190 struct Partition final {
194 std::optional<Store<>> store;
195 int eventNumber = -1;
198 #if TBB_INTERFACE_VERSION_MAJOR < 12 199 template <
typename T,
typename Mutex = tbb::recursive_mutex,
typename ReadLock =
typename Mutex::scoped_lock,
203 typename ReadLock = std::scoped_lock<std::recursive_mutex>,
204 #endif // TBB_INTERFACE_VERSION_MAJOR < 12 205 typename WriteLock = ReadLock>
211 template <
typename F>
212 decltype(
auto ) with_lock( F&& f ) {
213 WriteLock
lock{m_mtx};
216 template <
typename F>
217 decltype(
auto ) with_lock( F&& f )
const {
218 ReadLock
lock{m_mtx};
223 template <
typename Fun>
224 auto with_lock( Fun&& f ) {
225 return [f = std::forward<Fun>( f )](
auto& p ) -> decltype(
auto ) {
return p.with_lock( f ); };
228 TTHREAD_TLS( Synced<Partition>* ) s_current =
nullptr;
230 template <
typename Fun>
232 return s_current ? s_current->with_lock( std::forward<Fun>( f ) )
251 Gaudi::Property<bool> m_forceLeaves{
this,
"ForceLeaves",
false,
"force creation of default leaves on registerObject"};
257 mutable Gaudi::Accumulators::AveragingCounter<std::size_t>
m_usedPoolSize, m_servedPoolAllocations,
258 m_usedPoolAllocations, m_storeEntries, m_storeBuckets;
264 if (
LIKELY( !m_printPoolStats ) )
return;
265 auto n_allocs = p.store->num_allocations();
267 m_storeEntries += p.store->size();
268 m_usedPoolSize += p.store->used_bytes();
269 m_storeBuckets += p.store->used_buckets();
270 m_usedPoolAllocations += p.store->used_blocks();
271 m_servedPoolAllocations += n_allocs;
280 p.store.emplace( m_estStoreBuckets, poolSize() );
296 "InhibitedPathPrefixes",
298 "Prefixes of TES locations that will not be loaded by the persistency service "};
300 this,
"FollowLinksToAncestors",
true,
301 "Load objects which reside in files other than the one corresponding to the root of the event store"};
305 using extends::extends;
307 CLID rootCLID()
const override;
311 size_t allocateStore(
int evtnumber )
override;
312 StatusCode freeStore(
size_t partition )
override;
313 size_t freeSlots()
override {
return m_freeSlots.unsafe_size(); }
314 StatusCode selectStore(
size_t partition )
override;
316 StatusCode clearStore(
size_t partition )
override;
317 StatusCode setNumberOfStores(
size_t slots )
override;
319 size_t getPartitionNumber(
int eventnumber )
const override;
322 return findObject(
id.fullKey(), pObject ).isSuccess();
330 StatusCode clearSubTree( std::string_view )
override;
349 StatusCode registerObject( std::string_view parentPath, std::string_view objectPath,
DataObject* pObject )
override;
352 StatusCode unregisterObject( std::string_view )
override;
357 return !obj ? unregisterObject(
sr )
385 Entry::setDataProviderSvc(
this );
386 extends::initialize().ignore();
387 if ( !setNumberOfStores( m_slots ).isSuccess() ) {
388 error() <<
"Cannot set number of slots" <<
endmsg;
393 for (
auto& synced_p : m_partitions ) {
394 synced_p.with_lock( [
this]( Partition& p ) { initStore( p ); } );
396 for (
size_t i = 0; i < m_slots; i++ ) { m_freeSlots.push( i ); }
397 selectStore( 0 ).ignore();
401 error() <<
"Cannot get IConversionSvc " << m_loader.value() <<
endmsg;
404 return setDataLoader(
loader,
nullptr );
407 if ( m_printPoolStats ) {
408 info() <<
"Mean memory pool usage: " << float( 1e-3f * m_usedPoolSize.mean() ) <<
" KiB serving " 409 <<
float( m_servedPoolAllocations.mean() ) <<
" allocations from " <<
float( m_usedPoolAllocations.mean() )
410 <<
" to produce " <<
float( m_storeEntries.mean() ) <<
" entries in " <<
float( m_storeBuckets.mean() )
413 setDataLoader(
nullptr,
nullptr ).ignore();
414 return extends::finalize();
432 size_t slot = std::string::npos;
434 assert( slot != std::string::npos );
436 [[maybe_unused]]
auto prev =
m_partitions[slot].with_lock(
437 [evtnumber]( Partition& p ) {
return std::exchange( p.eventNumber, evtnumber ); } );
438 assert( prev == -1 );
444 if ( slots <
size_t{1} ) {
445 error() <<
"Invalid number of slots (" << slots <<
")" <<
endmsg;
449 error() <<
"Too late to change the number of slots!" <<
endmsg;
459 with_lock( [eventnumber](
const Partition& p ) {
return p.eventNumber == eventnumber; } ) );
470 auto prev =
m_partitions[partition].with_lock( []( Partition& p ) {
return std::exchange( p.eventNumber, -1 ); } );
478 return m_partitions[partition].with_lock( [
this]( Partition& p ) {
485 top = normalize_path( top,
rootName() );
486 return fwd( [&]( Partition& p ) {
487 p.store->erase_if( [top](
const auto& value ) {
return boost::algorithm::starts_with( value.first, top ); } );
493 return fwd( [
this]( Partition& p ) {
500 return fwd( [&]( Partition& p ) {
501 top = normalize_path( top,
rootName() );
502 auto cmp = [](
const Entry* lhs,
const Entry* rhs ) {
return lhs->identifier() < rhs->identifier(); };
504 for (
const auto& v : *p.store ) {
505 if ( boost::algorithm::starts_with( v.second.identifier(), top ) ) keys.insert( &v.second );
507 auto k = keys.begin();
508 while ( k != keys.end() ) {
509 const auto&
id = ( *k )->identifier();
511 bool accept = pAgent->
analyse( const_cast<Entry*>( *( k++ ) ),
level );
514 [&id](
const auto& e ) {
return boost::algorithm::starts_with( e->identifier(), id ); } );
522 debug() <<
"setRoot( " << root_path <<
", (DataObject*)" << (
void*)pObject <<
" )" <<
endmsg;
524 if (
UNLIKELY( !fwd( []( Partition& p ) {
526 } ).isSuccess() ) ) {
534 debug() <<
"setRoot( " << root_path <<
", (IOpaqueAddress*)" << rootAddr.get();
535 if ( rootAddr )
debug() <<
"[ " << rootAddr->par()[0] <<
", " << rootAddr->par()[1] <<
" ]";
538 if (
UNLIKELY( !fwd( []( Partition& p ) {
540 } ).isSuccess() ) ) {
543 if ( !rootAddr )
return Status::INVALID_OBJ_ADDR;
546 if ( !
object )
return Status::INVALID_OBJECT;
548 LocalArena dummy_arena{root_path.
size() + 1};
549 auto dummy = Entry{root_path, {}, {}, &dummy_arena};
550 object->setRegistry( &dummy );
551 rootAddr->setRegistry( &dummy );
553 if ( status.isSuccess() ) {
554 auto pObject =
object.get();
565 if ( !addr )
return Status::INVALID_OBJ_ADDR;
567 debug() <<
"registerAddress( (IRegistry*)" << (
void*)pReg <<
", " <<
path <<
", (IOpaqueAddress*)" << addr.get()
568 <<
"[ " << addr->
par()[0] <<
", " << addr->
par()[1] <<
" ]" 574 debug() <<
"Attempt to load " << addr->
par()[1] <<
" from file " << addr->
par()[0] <<
" blocked -- different file" 579 [addrPath = addr->
par()[1]]( std::string_view
prefix ) {
580 return boost::algorithm::starts_with( addrPath,
prefix );
583 debug() <<
"Attempt to load " << addr->
par()[1] <<
" from file " << addr->
par()[0] <<
" blocked -- path inhibited" 589 if ( !
object )
return Status::INVALID_OBJECT;
592 LocalArena dummy_arena{fullpath.size() + 1};
593 auto dummy = Entry{fullpath, {}, {}, &dummy_arena};
594 object->setRegistry( &dummy );
597 if ( !status.isSuccess() )
return status;
602 auto ptr =
object.get();
603 debug() <<
"registerAddress: " << std::quoted( normalize_path( fullpath,
rootName() ) ) <<
" (DataObject*)" 607 fwd( [&]( Partition& p ) {
615 return parentPath.empty()
625 for (
auto i = dir.rfind(
'/' ); i != std::string_view::npos; i = dir.rfind(
'/' ) ) {
626 dir = dir.substr( 0, i );
627 if ( !p.store->find( dir ) ) {
629 debug() <<
"registerObject: adding directory " << std::quoted( dir ) <<
endmsg;
636 auto ptr =
object.get();
637 debug() <<
"registerObject: " << std::quoted(
path ) <<
" (DataObject*)" << static_cast<void*>( ptr )
646 return fwd( [&]( Partition& p ) {
648 pObject = const_cast<DataObject*>( p.store->get(
path ) );
650 debug() <<
"retrieveObject: " << std::quoted(
path ) <<
" (DataObject*)" << (
void*)pObject
virtual const std::string * par() const =0
Retrieve String parameters.
constexpr auto size(const T &, Args &&...) noexcept
Out1 * put(const DataObjectHandle< Out1 > &out_handle, Out2 &&out)
StatusCode setNumberOfStores(size_t slots) override
Set the number of event slots (copies of DataSvc objects).
Define general base for Gaudi exception.
std::size_t poolSize() const
StatusCode setDataLoader(IConversionSvc *svc, IDataProviderSvc *dpsvc) override
IRegistry * registry() const
Get pointer to Registry.
virtual StatusCode createObj(IOpaqueAddress *pAddress, DataObject *&refpObject)=0
Create the transient representation of an object.
Implementation of property with value of concrete type.
Gaudi::StateMachine::State FSMState() const override
Gaudi::Property< bool > m_followLinksToAncestors
GAUDI_API const std::string typeinfoName(const std::type_info &)
Get platform independent information about the class type.
virtual bool analyse(IRegistry *pObject, int level)=0
Analyse the data object.
StatusCode resetPreLoad() override
StatusCode updateObject(IRegistry *) override
StatusCode initialize() override
SmartIF< IConversionSvc > m_dataLoader
virtual StatusCode setDataProvider(IDataProviderSvc *pService)=0
Set Data provider service.
constexpr static const auto SUCCESS
std::vector< DataStoreItem > m_preLoads
Items to be pre-loaded.
Gaudi::Property< std::string > m_rootName
virtual const name_type & name() const =0
Name of the directory (or key)
StatusCode clearSubTree(DataObject *obj) override
tbb::concurrent_queue< size_t > m_freeSlots
StatusCode removePreLoadItem(const DataStoreItem &) override
StatusCode traverseSubTree(std::string_view, IDataStoreAgent *) override
StatusCode retrieveObject(IRegistry *pDirectory, std::string_view path, DataObject *&pObject) override
auto get(const Handle &handle, const Algo &, const EventContext &) -> decltype(details::deref(handle.get()))
StatusCode traverseTree(IDataStoreAgent *pAgent) override
Data provider interface definition.
Description of the DataStoreItem class.
Gaudi::Property< size_t > m_slots
GAUDI_API int backTrace(void **addresses, const int depth)
size_t freeSlots() override
bool exists(const DataObjID &id) override
StatusCode objectParent(const DataObject *, IRegistry *&) override
MSG::Level msgLevel() const
get the cached level (originally extracted from the embedded MsgStream)
Invalid root path object cannot be retrieved or stored.
StatusCode linkObject(std::string_view, DataObject *) override
#define DECLARE_COMPONENT(type)
Gaudi::Property< bool > m_forceLeaves
MsgStream & error() const
shortcut for the method msgStream(MSG::ERROR)
StatusCode preLoad() override
std::vector< Synced< Partition > > m_partitions
The actual store(s)
StatusCode clearSubTree(std::string_view) override
This class is used for returning status codes from appropriate routines.
StatusCode updateObject(DataObject *) override
StatusCode finalize() override
virtual unsigned long addRef()=0
Add reference to object.
size_t allocateStore(int evtnumber) override
Allocate a store partition for a given event number.
The IRegistry represents the entry door to the environment any data object residing in a transient da...
size_t getNumberOfStores() const override
MsgStream & debug() const
shortcut for the method msgStream(MSG::DEBUG)
unsigned int CLID
Class ID definition.
StatusCode unlinkObject(DataObject *, std::string_view) override
StatusCode unregisterObject(std::string_view) override
virtual IOpaqueAddress * address() const =0
Retrieve opaque storage address.
virtual DataObject * object() const =0
Retrieve object behind the link.
StatusCode objectParent(const IRegistry *, IRegistry *&) override
TTHREAD_TLS(Synced< Partition > *) s_current
StatusCode unregisterAddress(IRegistry *, std::string_view) override
StatusCode objectLeaves(const IRegistry *, std::vector< IRegistry * > &) override
StatusCode unregisterObject(DataObject *obj, std::string_view sr) override
StatusCode unregisterObject(DataObject *obj) override
StatusCode linkObject(IRegistry *, std::string_view, DataObject *) override
StatusCode clearStore() override
const StatusCode & ignore() const
Ignore/check StatusCode.
virtual StatusCode fillObjRefs(IOpaqueAddress *pAddress, DataObject *pObject)=0
Resolve the references of the created transient object.
StatusCode selectStore(size_t partition) override
Activate a partition object. The identifies the partition uniquely.
Generic data agent interface.
size_t getPartitionNumber(int eventnumber) const override
Get the partition number corresponding to a given event.
std::string_view m_onlyThisID
Base class used to extend a class implementing other interfaces.
virtual unsigned long release()=0
release reference to object
virtual void setAddress(IOpaqueAddress *pAddress)=0
Set/Update Opaque storage address.
Gaudi::Accumulators::AveragingCounter< std::size_t > m_usedPoolSize
StatusCode freeStore(size_t partition) override
Free a store partition.
StatusCode registerAddress(std::string_view fullPath, IOpaqueAddress *pAddress) override
StatusCode objectLeaves(const DataObject *, std::vector< IRegistry * > &) override
StatusCode unlinkObject(IRegistry *, std::string_view) override
StatusCode setRoot(std::string root_name, DataObject *pObject) override
constexpr static const auto FAILURE
virtual IDataProviderSvc * dataSvc() const =0
Retrieve pointer to Transient Store.
StatusCode unregisterAddress(std::string_view) override
void initStore(Partition &p) const
virtual const id_type & identifier() const =0
Full identifier (or key)
AttribStringParser::Iterator begin(const AttribStringParser &parser)
virtual void setRegistry(IRegistry *r)=0
Update directory pointer.
Opaque address interface definition.
StatusCode registerObject(std::string_view parentPath, std::string_view objectPath, DataObject *pObject) override
A DataObject is the base class of any identifiable object on any data store.
A fast memory arena that does not track deallocations.
Use a minimal event store implementation, and adds everything required to satisfy the IDataProviderSv...
MsgStream & endmsg(MsgStream &s)
MsgStream Modifier: endmsg. Calls the output method of the MsgStream.
StatusCode addPreLoadItem(const DataStoreItem &) override
Gaudi::Property< std::vector< std::string > > m_inhibitPrefixes
StatusCode findObject(IRegistry *pDirectory, std::string_view path, DataObject *&pObject) override
static GAUDI_API void setNumConcEvents(const std::size_t &nE)
const std::string & rootName() const override
StatusCode traverseSubTree(DataObject *obj, IDataStoreAgent *pAgent) override
void fillStats(Partition &p) const
StatusCode unlinkObject(std::string_view) override