22 #include <tbb/concurrent_queue.h>
24 #include <ThreadLocalStorage.h>
26 #include <boost/algorithm/string/predicate.hpp>
34 #include <type_traits>
35 #include <unordered_map>
50 pool_string m_identifierStorage;
51 mutable std::optional<std::string> m_identifier;
55 using allocator_type = LocalAlloc<char>;
59 allocator_type alloc ) noexcept
61 if ( m_data ) m_data->setRegistry(
this );
64 Entry(
const Entry& ) =
delete;
65 Entry& operator=(
const Entry& rhs ) =
delete;
66 Entry( Entry&& rhs ) =
delete;
67 Entry& operator=( Entry&& rhs ) =
delete;
70 unsigned long addRef()
override {
return -1; }
71 unsigned long release()
override {
return -1; }
72 const name_type&
name()
const override {
74 if ( !m_identifier ) m_identifier.emplace( m_identifierStorage );
78 if ( !m_identifier ) m_identifier.emplace( m_identifierStorage );
81 std::string_view identifierView()
const {
return m_identifierStorage; }
86 m_addr.
reset( iAddr );
94 LocalAlloc<std::pair<const std::string_view, Entry>>>;
97 template <
typename Map = UnorderedMap>
99 LocalArena m_resource;
102 std::optional<Map> m_store{ std::in_place, &m_resource };
103 static_assert( std::is_same_v<typename Map::key_type, std::string_view> );
108 auto [i, b] = m_store->try_emplace( k, k,
std::move( d ),
std::move( a ), &m_resource );
110 auto nh = m_store->extract( i );
111 nh.key() = nh.mapped().identifierView();
112 auto r = m_store->insert(
std::move( nh ) );
114 return r.position->second;
119 [[nodiscard]]
bool empty()
const {
return m_store->empty(); }
121 [[nodiscard]]
std::size_t used_bytes() const noexcept {
return m_resource.size(); }
122 [[nodiscard]]
std::size_t used_blocks() const noexcept {
return m_resource.num_blocks(); }
123 [[nodiscard]]
std::size_t used_buckets()
const {
return m_store->bucket_count(); }
124 [[nodiscard]]
std::size_t num_allocations() const noexcept {
return m_resource.num_allocations(); }
129 m_store.emplace( m_est_size, &m_resource );
137 const Entry* d =
find( k );
138 return d ? d->object() :
nullptr;
140 const Entry*
find( std::string_view k )
const noexcept {
141 auto i = m_store->find( k );
142 return i != m_store->end() ? &( i->second ) :
nullptr;
145 [[nodiscard]]
auto begin() const noexcept {
return m_store->begin(); }
146 [[nodiscard]]
auto end() const noexcept {
return m_store->end(); }
147 void clear() noexcept { m_store->clear(); }
148 auto erase( std::string_view k ) {
return m_store->erase( k ); }
149 template <
typename Predicate>
150 void erase_if( Predicate p ) {
151 auto i = m_store->begin();
152 auto end = m_store->end();
154 if ( std::invoke( p, std::as_const( *i ) ) )
155 i = m_store->erase( i );
169 std::string_view normalize_path( std::string_view
path, std::string_view
prefix ) {
172 if ( !
path.empty() &&
path.front() ==
'/' )
path.remove_prefix( 1 );
178 auto status = cnv.
createObj( &addr, pObject );
180 if ( status.isFailure() )
object.reset();
185 struct Partition final {
189 std::optional<Store<>> store;
190 int eventNumber = -1;
193 template <
typename T,
typename Mutex = std::recursive_mutex,
typename ReadLock = std::scoped_lock<Mutex>,
194 typename WriteLock = ReadLock>
200 template <
typename F>
202 WriteLock
lock{ m_mtx };
205 template <
typename F>
206 decltype(
auto )
with_lock( F&& f )
const {
207 ReadLock
lock{ m_mtx };
212 template <
typename Fun>
214 return [f = std::forward<Fun>( f )](
auto& p ) -> decltype(
auto ) {
return p.with_lock( f ); };
217 TTHREAD_TLS( Synced<Partition>* ) s_current =
nullptr;
219 template <
typename Fun>
221 return s_current ? s_current->with_lock( std::forward<Fun>( f ) )
241 "force creation of default leaves on registerObject" };
247 "Estimated number of buckets in the store" };
255 if ( !m_printPoolStats )
return;
256 auto n_allocs = p.store->num_allocations();
258 m_storeEntries += p.store->size();
259 m_usedPoolSize += p.store->used_bytes();
260 m_storeBuckets += p.store->used_buckets();
261 m_usedPoolAllocations += p.store->used_blocks();
262 m_servedPoolAllocations += n_allocs;
271 p.store.emplace( m_estStoreBuckets, poolSize() );
287 "InhibitedPathPrefixes",
289 "Prefixes of TES locations that will not be loaded by the persistency service " };
291 this,
"FollowLinksToAncestors",
true,
292 "Load objects which reside in files other than the one corresponding to the root of the event store" };
296 using extends::extends;
298 CLID rootCLID()
const override;
302 size_t allocateStore(
int evtnumber )
override;
303 StatusCode freeStore(
size_t partition )
override;
304 size_t freeSlots()
override {
return m_freeSlots.unsafe_size(); }
305 StatusCode selectStore(
size_t partition )
override;
307 StatusCode clearStore(
size_t partition )
override;
308 StatusCode setNumberOfStores(
size_t slots )
override;
310 size_t getPartitionNumber(
int eventnumber )
const override;
313 return findObject(
id.fullKey(), pObject ).isSuccess();
321 StatusCode clearSubTree( std::string_view )
override;
340 StatusCode registerObject( std::string_view parentPath, std::string_view objectPath,
DataObject* pObject )
override;
343 StatusCode unregisterObject( std::string_view )
override;
348 return !obj ? unregisterObject(
sr )
376 Entry::setDataProviderSvc(
this );
377 extends::initialize().ignore();
378 if ( !setNumberOfStores( m_slots ).isSuccess() ) {
379 error() <<
"Cannot set number of slots" <<
endmsg;
384 for (
auto& synced_p : m_partitions ) {
385 synced_p.with_lock( [
this]( Partition& p ) { initStore( p ); } );
387 for (
size_t i = 0; i < m_slots; i++ ) { m_freeSlots.push( i ); }
388 selectStore( 0 ).ignore();
392 error() <<
"Cannot get IConversionSvc " << m_loader.value() <<
endmsg;
395 return setDataLoader(
loader,
nullptr );
398 if ( m_printPoolStats ) {
399 info() <<
"Mean memory pool usage: " << float( 1e-3f *
float( m_usedPoolSize.
mean() ) ) <<
" KiB serving "
400 << float( m_servedPoolAllocations.
mean() ) <<
" allocations from " << float( m_usedPoolAllocations.mean() )
401 <<
" to produce " << float( m_storeEntries.mean() ) <<
" entries in " << float( m_storeBuckets.mean() )
404 setDataLoader(
nullptr,
nullptr ).ignore();
405 return extends::finalize();
423 size_t slot = std::string::npos;
425 assert( slot != std::string::npos );
427 [[maybe_unused]]
auto prev =
m_partitions[slot].with_lock(
428 [evtnumber]( Partition& p ) {
return std::exchange( p.eventNumber, evtnumber ); } );
429 assert( prev == -1 );
435 if ( slots <
size_t{ 1 } ) {
436 error() <<
"Invalid number of slots (" << slots <<
")" <<
endmsg;
440 error() <<
"Too late to change the number of slots!" <<
endmsg;
450 with_lock( [eventnumber](
const Partition& p ) {
return p.eventNumber == eventnumber; } ) );
461 auto prev =
m_partitions[partition].with_lock( []( Partition& p ) {
return std::exchange( p.eventNumber, -1 ); } );
469 return m_partitions[partition].with_lock( [
this]( Partition& p ) {
477 return fwd( [&]( Partition& p ) {
478 p.store->erase_if( [
top](
const auto& value ) {
return boost::algorithm::starts_with( value.first,
top ); } );
484 return fwd( [
this]( Partition& p ) {
491 return fwd( [&]( Partition& p ) {
494 auto cmp = [](
const Entry* lhs,
const Entry* rhs ) {
return lhs->identifier() < rhs->identifier(); };
496 for (
const auto&
v : *p.store ) {
497 if ( boost::algorithm::starts_with(
v.second.identifier(),
top ) )
keys.insert( &
v.second );
499 auto k =
keys.begin();
500 while ( k !=
keys.end() ) {
501 const auto&
id = ( *k )->identifier();
503 bool accept = pAgent->
analyse(
const_cast<Entry*
>( *( k++ ) ),
level );
506 [&
id](
const auto& e ) { return boost::algorithm::starts_with( e->identifier(), id ); } );
514 debug() <<
"setRoot( " << root_path <<
", (DataObject*)" << (
void*)pObject <<
" )" <<
endmsg;
516 if ( !fwd( []( Partition& p ) {
526 debug() <<
"setRoot( " << root_path <<
", (IOpaqueAddress*)" << rootAddr.get();
527 if ( rootAddr ) debug() <<
"[ " << rootAddr->par()[0] <<
", " << rootAddr->par()[1] <<
" ]";
528 debug() <<
" )" <<
endmsg;
530 if ( !fwd( []( Partition& p ) {
535 if ( !rootAddr )
return Status::INVALID_OBJ_ADDR;
538 if ( !
object )
return Status::INVALID_OBJECT;
540 LocalArena dummy_arena{ root_path.
size() + 1 };
541 auto dummy = Entry{ root_path, {}, {}, &dummy_arena };
542 object->setRegistry( &dummy );
543 rootAddr->setRegistry( &dummy );
544 auto status =
m_dataLoader->fillObjRefs( rootAddr.get(),
object.get() );
545 if ( status.isSuccess() ) {
546 auto pObject =
object.get();
557 if ( !addr )
return Status::INVALID_OBJ_ADDR;
559 debug() <<
"registerAddress( (IRegistry*)" << (
void*)pReg <<
", " <<
path <<
", (IOpaqueAddress*)" << addr.get()
560 <<
"[ " << addr->
par()[0] <<
", " << addr->
par()[1] <<
" ]"
566 debug() <<
"Attempt to load " << addr->
par()[1] <<
" from file " << addr->
par()[0] <<
" blocked -- different file"
571 [addrPath = addr->
par()[1]]( std::string_view
prefix ) {
572 return boost::algorithm::starts_with( addrPath, prefix );
575 debug() <<
"Attempt to load " << addr->
par()[1] <<
" from file " << addr->
par()[0] <<
" blocked -- path inhibited"
581 if ( !
object )
return Status::INVALID_OBJECT;
584 LocalArena dummy_arena{ fullpath.size() + 1 };
585 auto dummy = Entry{ fullpath, {}, {}, &dummy_arena };
586 object->setRegistry( &dummy );
588 auto status =
m_dataLoader->fillObjRefs( addr.get(),
object.get() );
589 if ( !status.isSuccess() )
return status;
594 auto ptr =
object.
get();
595 debug() <<
"registerAddress: " << std::quoted( normalize_path( fullpath,
rootName() ) ) <<
" (DataObject*)"
599 fwd( [&]( Partition& p ) {
607 return parentPath.empty()
617 for (
auto i = dir.rfind(
'/' ); i != std::string_view::npos; i = dir.rfind(
'/' ) ) {
618 dir = dir.substr( 0, i );
619 if ( !p.store->find( dir ) ) {
620 if ( msgLevel( MSG::DEBUG ) ) {
621 debug() <<
"registerObject: adding directory " << std::quoted( dir ) << endmsg;
623 p.store->put( dir, std::unique_ptr<DataObject>{} );
628 auto ptr =
object.get();
629 debug() <<
"registerObject: " << std::quoted(
path ) <<
" (DataObject*)" <<
static_cast<void*
>( ptr )
638 return fwd( [&]( Partition& p ) {
642 debug() <<
"retrieveObject: " << std::quoted(
path ) <<
" (DataObject*)" << (
void*)pObject