aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-05-02 10:53:15 +0200
committerGitHub Enterprise <[email protected]>2024-05-02 10:53:15 +0200
commit1bafcb32cb48b2256a9d72995388b7df72058039 (patch)
tree2385ec3de36c4f45018832eb36bcab6ac2d3670f /src/zenstore/cache/cachedisklayer.cpp
parentfix get project files loop (#68) (diff)
downloadzen-1bafcb32cb48b2256a9d72995388b7df72058039.tar.xz
zen-1bafcb32cb48b2256a9d72995388b7df72058039.zip
batch cache put (#67)
- Improvement: Batch scope for put of cache values
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp213
1 files changed, 207 insertions, 6 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 51d547b3d..a497c8969 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1162,6 +1162,108 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
return {};
}
+struct ZenCacheDiskLayer::CacheBucket::BatchHandle
+{
+ BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ struct Entry
+ {
+ std::vector<IoHash> HashKeyAndReferences;
+ };
+ std::vector<IoBuffer> Buffers;
+ std::vector<Entry> Entries;
+ std::vector<size_t> EntryResultIndexes;
+
+ std::vector<bool>& OutResults;
+};
+
+ZenCacheDiskLayer::CacheBucket::BatchHandle*
+ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults)
+{
+ return new BatchHandle(OutResults);
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::EndPutBatch(BatchHandle* Batch) noexcept
+{
+ try
+ {
+ ZEN_ASSERT(Batch);
+ if (!Batch->Buffers.empty())
+ {
+ std::vector<uint8_t> EntryFlags;
+ for (const IoBuffer& Buffer : Batch->Buffers)
+ {
+ uint8_t Flags = 0;
+ if (Buffer.GetContentType() == ZenContentType::kCbObject)
+ {
+ Flags |= DiskLocation::kStructured;
+ }
+ else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ Flags |= DiskLocation::kCompressed;
+ }
+ EntryFlags.push_back(Flags);
+ }
+
+ size_t IndexOffset = 0;
+ m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
+ std::vector<DiskIndexEntry> DiskEntries;
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ for (size_t Index = 0; Index < Locations.size(); Index++)
+ {
+ DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]);
+ const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences;
+ ZEN_ASSERT(HashKeyAndReferences.size() > 1);
+ const IoHash HashKey = HashKeyAndReferences[0];
+ DiskEntries.push_back({.Key = HashKey, .Location = Location});
+ if (m_TrackedCacheKeys)
+ {
+ m_TrackedCacheKeys->insert(HashKey);
+ }
+ if (m_TrackedReferences && HashKeyAndReferences.size() > 1)
+ {
+ m_TrackedReferences->insert(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end());
+ }
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
+ {
+ PayloadIndex EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size()));
+ BucketPayload& Payload = m_Payloads[EntryIndex];
+
+ RemoveMemCachedData(IndexLock, Payload);
+ RemoveMetaData(IndexLock, Payload);
+
+ Payload = (BucketPayload{.Location = Location});
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+ }
+ else
+ {
+ PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size());
+ m_Payloads.emplace_back(BucketPayload{.Location = Location});
+ m_AccessTimes.emplace_back(GcClock::TickCount());
+ m_Index.insert_or_assign(HashKey, EntryIndex);
+ }
+ }
+ }
+ m_SlogFile.Append(DiskEntries);
+ for (size_t Index = 0; Index < Locations.size(); Index++)
+ {
+ size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index];
+ ZEN_ASSERT(ResultIndex < Batch->OutResults.size());
+ Batch->OutResults[ResultIndex] = true;
+ }
+ IndexOffset += Locations.size();
+ });
+ }
+ delete Batch;
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in cache bucket when ending batch put operation: '{}'", Ex.what());
+ }
+}
+
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -1283,7 +1385,10 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
void
-ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ BatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::Put");
@@ -1292,10 +1397,14 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
{
PutStandaloneCacheValue(HashKey, Value, References);
+ if (OptionalBatchHandle)
+ {
+ OptionalBatchHandle->OutResults.push_back(true);
+ }
}
else
{
- PutInlineCacheValue(HashKey, Value, References);
+ PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle);
}
m_DiskWriteCount++;
@@ -2593,10 +2702,24 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck
}
void
-ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ BatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue");
-
+ if (OptionalBatchHandle != nullptr)
+ {
+ OptionalBatchHandle->Buffers.push_back(Value.Value);
+ OptionalBatchHandle->Entries.push_back({});
+ OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size());
+ OptionalBatchHandle->OutResults.push_back(false);
+ std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences;
+ HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size());
+ HashKeyAndReferences.push_back(HashKey);
+ HashKeyAndReferences.insert(HashKeyAndReferences.end(), HashKeyAndReferences.begin(), HashKeyAndReferences.end());
+ return;
+ }
uint8_t EntryFlags = 0;
if (Value.Value.GetContentType() == ZenContentType::kCbObject)
@@ -3423,6 +3546,79 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
return Result;
}
+struct ZenCacheDiskLayer::BatchHandle
+{
+ BatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ struct BucketHandle
+ {
+ CacheBucket* Bucket;
+ CacheBucket::BatchHandle* Handle;
+ };
+
+ void ForEach(const std::function<void(CacheBucket* Bucket, CacheBucket::BatchHandle* Handle)>& CB) noexcept
+ {
+ RwLock::SharedLockScope _(Lock);
+ for (ZenCacheDiskLayer::BatchHandle::BucketHandle& BucketHandle : BucketHandles)
+ {
+ ZEN_ASSERT(BucketHandle.Bucket);
+ ZEN_ASSERT(BucketHandle.Handle);
+ CB(BucketHandle.Bucket, BucketHandle.Handle);
+ }
+ }
+
+ CacheBucket::BatchHandle* GetHandle(CacheBucket* Bucket)
+ {
+ {
+ RwLock::SharedLockScope _(Lock);
+ if (auto It =
+ std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; });
+ It != BucketHandles.end())
+ {
+ return It->Handle;
+ }
+ }
+
+ CacheBucket::BatchHandle* NewBucketHandle = Bucket->BeginPutBatch(OutResults);
+ if (NewBucketHandle == nullptr)
+ {
+ return nullptr;
+ }
+
+ RwLock::ExclusiveLockScope _(Lock);
+ if (auto It =
+ std::find_if(BucketHandles.begin(), BucketHandles.end(), [&](BucketHandle& Handle) { return Handle.Bucket == Bucket; });
+ It != BucketHandles.end())
+ {
+ CacheBucket::BatchHandle* Result = It->Handle;
+ ZEN_ASSERT(Result != nullptr);
+ _.ReleaseNow();
+ Bucket->EndPutBatch(NewBucketHandle);
+ return Result;
+ }
+
+ BucketHandles.push_back(ZenCacheDiskLayer::BatchHandle::BucketHandle{.Bucket = Bucket, .Handle = NewBucketHandle});
+
+ return NewBucketHandle;
+ }
+ RwLock Lock;
+ std::vector<BucketHandle> BucketHandles;
+ std::vector<bool>& OutResults;
+};
+
+ZenCacheDiskLayer::BatchHandle*
+ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults)
+{
+ return new BatchHandle(OutResults);
+}
+
+void
+ZenCacheDiskLayer::EndPutBatch(BatchHandle* Batch) noexcept
+{
+ ZEN_ASSERT(Batch);
+ Batch->ForEach([&](CacheBucket* Bucket, CacheBucket::BatchHandle* Handle) { Bucket->EndPutBatch(Handle); });
+ delete Batch;
+}
+
bool
ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -3440,13 +3636,18 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
void
-ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+ZenCacheDiskLayer::Put(std::string_view InBucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ BatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Put");
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
- Bucket->Put(HashKey, Value, References);
+ CacheBucket::BatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket);
+ Bucket->Put(HashKey, Value, References, BucketBatchHandle);
TryMemCacheTrim();
}
}