aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-08-23 14:20:10 +0200
committerGitHub Enterprise <[email protected]>2024-08-23 14:20:10 +0200
commit64f07faa75811ef21d18f78db75242f00499c808 (patch)
treee2f4d0957650f3afc20554b50346bb7a9c2d0894 /src/zenstore/cache/cachedisklayer.cpp
parentv5.5.5 (diff)
downloadzen-64f07faa75811ef21d18f78db75242f00499c808.tar.xz
zen-64f07faa75811ef21d18f78db75242f00499c808.zip
Make sure `noexcept` functions does not leak exceptions (#136)
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp364
1 files changed, 202 insertions, 162 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 5d167fc47..f85d05dce 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1200,12 +1200,15 @@ struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle
ZenCacheDiskLayer::CacheBucket::PutBatchHandle*
ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults)
{
+ ZEN_TRACE_CPU("Z$::Bucket::PutBatched");
return new PutBatchHandle(OutResults);
}
void
ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
{
+ ZEN_TRACE_CPU("Z$::Bucket::PutBatched");
+
try
{
ZEN_ASSERT(Batch);
@@ -1279,9 +1282,9 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
}
delete Batch;
}
- catch (std::exception& Ex)
+ catch (const std::exception& Ex)
{
- ZEN_ERROR("Exception in cache bucket when ending batch put operation: '{}'", Ex.what());
+ ZEN_ERROR("Exception in ZenCacheDiskLayer::CacheBucket::EndPutBatch: '{}'", Ex.what());
}
}
@@ -1311,196 +1314,205 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
{
ZEN_TRACE_CPU("Z$::Bucket::GetBatched");
- ZEN_ASSERT(Batch);
- ZEN_ASSERT(Batch->Keys.size() == Batch->ResultIndexes.size());
+ try
+ {
+ ZEN_ASSERT(Batch);
+ ZEN_ASSERT(Batch->Keys.size() == Batch->ResultIndexes.size());
- metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
+ metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
- if (!Batch->ResultIndexes.empty())
- {
- std::vector<DiskLocation> StandaloneDiskLocations;
- std::vector<size_t> StandaloneKeyIndexes;
- std::vector<DiskLocation> InlineDiskLocations;
- std::vector<BlockStoreLocation> InlineBlockLocations;
- std::vector<size_t> InlineKeyIndexes;
- std::vector<bool> FillRawHashAndRawSize(Batch->Keys.size(), false);
+ if (!Batch->ResultIndexes.empty())
{
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++)
+ std::vector<DiskLocation> StandaloneDiskLocations;
+ std::vector<size_t> StandaloneKeyIndexes;
+ std::vector<DiskLocation> InlineDiskLocations;
+ std::vector<BlockStoreLocation> InlineBlockLocations;
+ std::vector<size_t> InlineKeyIndexes;
+ std::vector<bool> FillRawHashAndRawSize(Batch->Keys.size(), false);
{
- const IoHash& HashKey = Batch->Keys[KeyIndex];
- auto It = m_Index.find(HashKey);
- if (It != m_Index.end())
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++)
{
- size_t ResultIndex = Batch->ResultIndexes[KeyIndex];
- ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
-
- const PayloadIndex PayloadIdx = It.value();
- m_AccessTimes[PayloadIdx] = GcClock::TickCount();
- const BucketPayload& Payload = m_Payloads[PayloadIdx];
- const DiskLocation& Location = Payload.Location;
-
- FillRawHashAndRawSize[KeyIndex] = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0);
- if (Payload.MetaData)
+ const IoHash& HashKey = Batch->Keys[KeyIndex];
+ auto It = m_Index.find(HashKey);
+ if (It != m_Index.end())
{
- const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData];
- OutValue.RawHash = MetaData.RawHash;
- OutValue.RawSize = MetaData.RawSize;
- FillRawHashAndRawSize[KeyIndex] = false;
- }
+ size_t ResultIndex = Batch->ResultIndexes[KeyIndex];
+ ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
- if (Payload.MemCached)
- {
- OutValue.Value = m_MemCachedPayloads[Payload.MemCached].Payload;
- m_MemoryHitCount++;
- }
- else
- {
- if (m_Configuration.MemCacheSizeThreshold > 0)
+ const PayloadIndex PayloadIdx = It.value();
+ m_AccessTimes[PayloadIdx] = GcClock::TickCount();
+ const BucketPayload& Payload = m_Payloads[PayloadIdx];
+ const DiskLocation& Location = Payload.Location;
+
+ FillRawHashAndRawSize[KeyIndex] = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0);
+ if (Payload.MetaData)
{
- m_MemoryMissCount++;
+ const BucketMetaData& MetaData = m_MetaDatas[Payload.MetaData];
+ OutValue.RawHash = MetaData.RawHash;
+ OutValue.RawSize = MetaData.RawSize;
+ FillRawHashAndRawSize[KeyIndex] = false;
}
- if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+
+ if (Payload.MemCached)
{
- StandaloneDiskLocations.push_back(Location);
- StandaloneKeyIndexes.push_back(KeyIndex);
+ OutValue.Value = m_MemCachedPayloads[Payload.MemCached].Payload;
+ m_MemoryHitCount++;
}
else
{
- InlineDiskLocations.push_back(Location);
- InlineBlockLocations.emplace_back(Location.GetBlockLocation(m_Configuration.PayloadAlignment));
- InlineKeyIndexes.push_back(KeyIndex);
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
+ }
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ StandaloneDiskLocations.push_back(Location);
+ StandaloneKeyIndexes.push_back(KeyIndex);
+ }
+ else
+ {
+ InlineDiskLocations.push_back(Location);
+ InlineBlockLocations.emplace_back(Location.GetBlockLocation(m_Configuration.PayloadAlignment));
+ InlineKeyIndexes.push_back(KeyIndex);
+ }
}
}
}
}
- }
- auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value) {
- if (!Value)
- {
- return;
- }
- size_t ResultIndex = Batch->ResultIndexes[KeyIndex];
- ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
- OutValue.Value = std::move(Value);
- OutValue.Value.SetContentType(Location.GetContentType());
-
- bool AddToMemCache = false;
- bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex];
- if (m_Configuration.MemCacheSizeThreshold > 0)
- {
- size_t ValueSize = OutValue.Value.GetSize();
- if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold)
+ auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value) {
+ if (!Value)
{
- OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value);
- AddToMemCache = true;
+ return;
}
- }
+ size_t ResultIndex = Batch->ResultIndexes[KeyIndex];
+ ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
+ OutValue.Value = std::move(Value);
+ OutValue.Value.SetContentType(Location.GetContentType());
- if (SetMetaInfo)
- {
- ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData");
- if (Location.IsFlagSet(DiskLocation::kCompressed))
+ bool AddToMemCache = false;
+ bool SetMetaInfo = FillRawHashAndRawSize[KeyIndex];
+ if (m_Configuration.MemCacheSizeThreshold > 0)
{
- if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
+ size_t ValueSize = OutValue.Value.GetSize();
+ if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold)
{
- OutValue = ZenCacheValue{};
- AddToMemCache = false;
- SetMetaInfo = false;
+ OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value);
+ AddToMemCache = true;
}
}
- else
- {
- OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
- OutValue.RawSize = OutValue.Value.GetSize();
- }
- }
- if (SetMetaInfo || AddToMemCache)
- {
- ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCache");
- const IoHash& Key = Batch->Keys[KeyIndex];
- RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
+ if (SetMetaInfo)
{
- if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end())
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData");
+ if (Location.IsFlagSet(DiskLocation::kCompressed))
{
- BucketPayload& Payload = m_Payloads[UpdateIt->second];
-
- // Only update if it has not already been updated by other thread
- if (!Payload.MetaData && SetMetaInfo)
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
{
- SetMetaData(UpdateIndexLock, Payload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash});
- }
- if (!Payload.MemCached && AddToMemCache)
- {
- SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value);
+ OutValue = ZenCacheValue{};
+ AddToMemCache = false;
+ SetMetaInfo = false;
}
}
+ else
+ {
+ OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
+ OutValue.RawSize = OutValue.Value.GetSize();
+ }
}
- }
- };
- // We don't want to read into memory if they are to big since we might only want to touch the compressed
- // header before sending it along
- if (!InlineDiskLocations.empty())
- {
- ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline");
- m_BlockStore.IterateChunks(InlineBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
- const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 32u * 1024u);
- m_BlockStore.IterateBlock(
- InlineBlockLocations,
- ChunkIndexes,
- [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool {
- if (Data != nullptr)
+ if (SetMetaInfo || AddToMemCache)
+ {
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MemCache");
+ const IoHash& Key = Batch->Keys[KeyIndex];
+ RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
+ {
+ if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end())
{
- FillOne(InlineDiskLocations[ChunkIndex],
- InlineKeyIndexes[ChunkIndex],
- IoBufferBuilder::MakeCloneFromMemory(Data, Size));
+ BucketPayload& Payload = m_Payloads[UpdateIt->second];
+
+ // Only update if it has not already been updated by other thread
+ if (!Payload.MetaData && SetMetaInfo)
+ {
+ SetMetaData(UpdateIndexLock, Payload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash});
+ }
+ if (!Payload.MemCached && AddToMemCache)
+ {
+ SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value);
+ }
}
- return true;
- },
- [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
- BlockStoreFile& File,
- uint64_t Offset,
- uint64_t Size) -> bool {
- FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size));
- return true;
- },
- LargeChunkSizeLimit);
- return true;
- });
- }
+ }
+ }
+ };
- if (!StandaloneDiskLocations.empty())
- {
- ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadStandalone");
- for (size_t Index = 0; Index < StandaloneDiskLocations.size(); Index++)
+ // We don't want to read into memory if they are to big since we might only want to touch the compressed
+ // header before sending it along
+ if (!InlineDiskLocations.empty())
{
- size_t KeyIndex = StandaloneKeyIndexes[Index];
- const DiskLocation& Location = StandaloneDiskLocations[Index];
- FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex]));
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline");
+ m_BlockStore.IterateChunks(InlineBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
+ const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 32u * 1024u);
+ m_BlockStore.IterateBlock(
+ InlineBlockLocations,
+ ChunkIndexes,
+ [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
+ const void* Data,
+ uint64_t Size) -> bool {
+ if (Data != nullptr)
+ {
+ FillOne(InlineDiskLocations[ChunkIndex],
+ InlineKeyIndexes[ChunkIndex],
+ IoBufferBuilder::MakeCloneFromMemory(Data, Size));
+ }
+ return true;
+ },
+ [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
+ BlockStoreFile& File,
+ uint64_t Offset,
+ uint64_t Size) -> bool {
+ FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size));
+ return true;
+ },
+ LargeChunkSizeLimit);
+ return true;
+ });
}
- }
- for (size_t ResultIndex : Batch->ResultIndexes)
- {
- bool Hit = !!Batch->OutResults[ResultIndex].Value;
- if (Hit)
+ if (!StandaloneDiskLocations.empty())
{
- m_DiskHitCount++;
- StatsScope.SetBytes(Batch->OutResults[ResultIndex].Value.GetSize());
+ ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadStandalone");
+ for (size_t Index = 0; Index < StandaloneDiskLocations.size(); Index++)
+ {
+ size_t KeyIndex = StandaloneKeyIndexes[Index];
+ const DiskLocation& Location = StandaloneDiskLocations[Index];
+ FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex]));
+ }
}
- else
+
+ for (size_t ResultIndex : Batch->ResultIndexes)
{
- m_DiskMissCount++;
- if (m_Configuration.MemCacheSizeThreshold > 0)
+ bool Hit = !!Batch->OutResults[ResultIndex].Value;
+ if (Hit)
+ {
+ m_DiskHitCount++;
+ StatsScope.SetBytes(Batch->OutResults[ResultIndex].Value.GetSize());
+ }
+ else
{
- m_MemoryMissCount++;
+ m_DiskMissCount++;
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
+ }
}
}
}
}
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in ZenCacheDiskLayer::CacheBucket::EndGetBatch: '{}'", Ex.what());
+ }
}
void
@@ -3794,12 +3806,19 @@ struct ZenCacheDiskLayer::PutBatchHandle
void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle)>& CB) noexcept
{
- RwLock::SharedLockScope _(Lock);
- for (ZenCacheDiskLayer::PutBatchHandle::BucketHandle& BucketHandle : BucketHandles)
+ try
+ {
+ RwLock::SharedLockScope _(Lock);
+ for (ZenCacheDiskLayer::PutBatchHandle::BucketHandle& BucketHandle : BucketHandles)
+ {
+ ZEN_ASSERT(BucketHandle.Bucket);
+ ZEN_ASSERT(BucketHandle.Handle);
+ CB(BucketHandle.Bucket, BucketHandle.Handle);
+ }
+ }
+ catch (const std::exception& Ex)
{
- ZEN_ASSERT(BucketHandle.Bucket);
- ZEN_ASSERT(BucketHandle.Handle);
- CB(BucketHandle.Bucket, BucketHandle.Handle);
+ ZEN_ERROR("Exception in ZenCacheDiskLayer::PutBatchHandle::ForEach: '{}'", Ex.what());
}
}
@@ -3851,9 +3870,16 @@ ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults)
void
ZenCacheDiskLayer::EndPutBatch(PutBatchHandle* Batch) noexcept
{
- ZEN_ASSERT(Batch);
- Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle) { Bucket->EndPutBatch(Handle); });
- delete Batch;
+ try
+ {
+ ZEN_ASSERT(Batch);
+ Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::PutBatchHandle* Handle) { Bucket->EndPutBatch(Handle); });
+ delete Batch;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in ZenCacheDiskLayer::EndPutBatch: '{}'", Ex.what());
+ }
}
struct ZenCacheDiskLayer::GetBatchHandle
@@ -3867,12 +3893,19 @@ struct ZenCacheDiskLayer::GetBatchHandle
void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle)>& CB) noexcept
{
- RwLock::SharedLockScope _(Lock);
- for (ZenCacheDiskLayer::GetBatchHandle::BucketHandle& BucketHandle : BucketHandles)
+ try
{
- ZEN_ASSERT(BucketHandle.Bucket);
- ZEN_ASSERT(BucketHandle.Handle);
- CB(BucketHandle.Bucket, BucketHandle.Handle);
+ RwLock::SharedLockScope _(Lock);
+ for (ZenCacheDiskLayer::GetBatchHandle::BucketHandle& BucketHandle : BucketHandles)
+ {
+ ZEN_ASSERT(BucketHandle.Bucket);
+ ZEN_ASSERT(BucketHandle.Handle);
+ CB(BucketHandle.Bucket, BucketHandle.Handle);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in ZenCacheDiskLayer::GetBatchHandle::ForEach: '{}'", Ex.what());
}
}
@@ -3926,10 +3959,17 @@ void
ZenCacheDiskLayer::EndGetBatch(GetBatchHandle* Batch) noexcept
{
ZEN_TRACE_CPU("Z$::GetBatched");
- ZEN_ASSERT(Batch);
- Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle) { Bucket->EndGetBatch(Handle); });
- TryMemCacheTrim();
- delete Batch;
+ try
+ {
+ ZEN_ASSERT(Batch);
+ Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::GetBatchHandle* Handle) { Bucket->EndGetBatch(Handle); });
+ TryMemCacheTrim();
+ delete Batch;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in ZenCacheDiskLayer::EndGetBatch: '{}'", Ex.what());
+ }
}
bool