aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-22 20:21:02 +0200
committerGitHub Enterprise <[email protected]>2024-04-22 20:21:02 +0200
commit96f44f2f2d8cbcda254d0b193f5a1aece645daeb (patch)
tree9d1975c4d76d7a577ecfe8e2fe9456738571528b /src/zenstore
parentfix LogRemoteStoreStatsDetails (#53) (diff)
downloadzen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.tar.xz
zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.zip
InsertChunks for CAS store (#55)
- Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import
Diffstat (limited to 'src/zenstore')
-rw-r--r--src/zenstore/blockstore.cpp163
-rw-r--r--src/zenstore/cache/cacherpc.cpp91
-rw-r--r--src/zenstore/cas.cpp123
-rw-r--r--src/zenstore/cas.h17
-rw-r--r--src/zenstore/cidstore.cpp50
-rw-r--r--src/zenstore/compactcas.cpp58
-rw-r--r--src/zenstore/compactcas.h21
-rw-r--r--src/zenstore/include/zenstore/blockstore.h3
-rw-r--r--src/zenstore/include/zenstore/cidstore.h21
9 files changed, 473 insertions, 74 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 644ddf7b4..8ef9e842f 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -453,6 +453,103 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, cons
}
}
+void
+BlockStore::WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback)
+{
+ ZEN_TRACE_CPU("BlockStore::WriteChunks");
+
+ ZEN_ASSERT(!Datas.empty());
+ ZEN_ASSERT(Alignment > 0u);
+
+ const size_t TotalCount = Datas.size();
+ uint64_t TotalSize = 0;
+ uint64_t LargestSize = 0;
+ for (const IoBuffer& Data : Datas)
+ {
+ uint64_t Size = Data.GetSize();
+ ZEN_ASSERT(Size > 0);
+ ZEN_ASSERT(Size <= m_MaxBlockSize);
+ TotalSize += Size;
+ LargestSize = Max(LargestSize, Size);
+ }
+
+ const uint64_t MinSize = Max(LargestSize, 8u * 1024u * 1024u);
+ const uint64_t BufferSize = Min(TotalSize, MinSize);
+ std::vector<uint8_t> Buffer(BufferSize);
+
+ size_t Offset = 0;
+ while (Offset < TotalCount)
+ {
+ size_t Count = 1;
+ uint32_t RangeSize = gsl::narrow<uint32_t>(Datas[Offset].GetSize());
+
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ uint32_t AlignedInsertOffset = RoundUp(m_CurrentInsertOffset, Alignment);
+ if ((!m_WriteBlock) || ((AlignedInsertOffset + RangeSize) > m_MaxBlockSize))
+ {
+ std::filesystem::path BlockPath;
+ WriteBlockIndex = GetFreeBlockIndex(WriteBlockIndex, InsertLock, BlockPath);
+ if (WriteBlockIndex == (uint32_t)m_MaxBlockCount)
+ {
+ throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath));
+ }
+ Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath));
+ NewBlockFile->Create(m_MaxBlockSize);
+
+ m_ChunkBlocks[WriteBlockIndex] = NewBlockFile;
+ m_WriteBlock = NewBlockFile;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ m_CurrentInsertOffset = 0;
+ AlignedInsertOffset = 0;
+ }
+ while (Offset + Count < TotalCount)
+ {
+ uint32_t NextRangeSize = gsl::narrow<uint32_t>(RoundUp(RangeSize, Alignment) + Datas[Offset + Count].GetSize());
+ if ((AlignedInsertOffset + NextRangeSize) > m_MaxBlockSize || (NextRangeSize > BufferSize))
+ {
+ break;
+ }
+ Count++;
+ RangeSize = NextRangeSize;
+ }
+ m_CurrentInsertOffset = AlignedInsertOffset + RangeSize;
+ Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
+ m_ActiveWriteBlocks.push_back(WriteBlockIndex);
+ InsertLock.ReleaseNow();
+
+ {
+ MutableMemoryView WriteBuffer(Buffer.data(), RangeSize);
+ for (size_t Index = 0; Index < Count; Index++)
+ {
+ MemoryView SourceBuffer = Datas[Index + Offset];
+ WriteBuffer.CopyFrom(SourceBuffer);
+ WriteBuffer.MidInline(RoundUp(SourceBuffer.GetSize(), Alignment));
+ }
+ WriteBlock->Write(Buffer.data(), RangeSize, AlignedInsertOffset);
+ }
+
+ m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed);
+
+ uint32_t ChunkOffset = AlignedInsertOffset;
+ std::vector<BlockStoreLocation> Locations(Count);
+ for (size_t Index = 0; Index < Count; Index++)
+ {
+ uint32_t ChunkSize = gsl::narrow<uint32_t>(Datas[Offset + Index].GetSize());
+ Locations[Index] = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset, .Size = ChunkSize};
+ ChunkOffset += gsl::narrow<uint32_t>(RoundUp(ChunkSize, Alignment));
+ }
+ Callback(Locations);
+
+ {
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex));
+ }
+
+ Offset += Count;
+ }
+}
+
BlockStore::ReclaimSnapshotState
BlockStore::GetReclaimSnapshotState()
{
@@ -1543,6 +1640,72 @@ namespace blockstore::impl {
} // namespace blockstore::impl
+TEST_CASE("blockstore.multichunks")
+{
+ using namespace blockstore::impl;
+
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory, 128, 1024);
+
+ std::vector<IoBuffer> MultiChunkData;
+ std::string FirstChunkData = "0123456789012345678901234567890123456789012345678901234567890123";
+ MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FirstChunkData.data(), FirstChunkData.size()));
+
+ std::string SecondChunkData = "12345678901234567890123456789012345678901234567890123456";
+ MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, SecondChunkData.data(), SecondChunkData.size()));
+
+ std::string ThirdChunkData = "789012345678901234567890123456789012345678901234567890";
+ MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, ThirdChunkData.data(), ThirdChunkData.size()));
+
+ BlockStoreLocation Locations[5];
+ size_t ChunkOffset = 0;
+ Store.WriteChunks(MultiChunkData, 4, [&](std::span<BlockStoreLocation> InLocations) {
+ for (const BlockStoreLocation& Location : InLocations)
+ {
+ Locations[ChunkOffset++] = Location;
+ }
+ CHECK(ChunkOffset <= MultiChunkData.size());
+ });
+ CHECK(ChunkOffset == 3);
+
+ CHECK(ReadChunkAsString(Store, Locations[0]) == FirstChunkData);
+ CHECK(Locations[1].BlockIndex == Locations[0].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[1]) == SecondChunkData);
+ CHECK(Locations[2].BlockIndex != Locations[1].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[2]) == ThirdChunkData);
+
+ MultiChunkData.resize(0);
+ std::string FourthChunkData = "ABCDEFGHIJABCDEFGHIJ_23";
+ MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FourthChunkData.data(), FourthChunkData.size()));
+
+ std::string FifthChunkData =
+ "ABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJ_93";
+ MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FifthChunkData.data(), FifthChunkData.size()));
+
+ Store.WriteChunks(MultiChunkData, 4, [&](std::span<BlockStoreLocation> InLocations) {
+ for (const BlockStoreLocation& Location : InLocations)
+ {
+ CHECK(ChunkOffset < 5);
+ Locations[ChunkOffset++] = Location;
+ }
+ });
+ CHECK(ChunkOffset == 5);
+
+ CHECK(ReadChunkAsString(Store, Locations[0]) == FirstChunkData);
+ CHECK(Locations[1].BlockIndex == Locations[0].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[1]) == SecondChunkData);
+ CHECK(Locations[2].BlockIndex != Locations[1].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[2]) == ThirdChunkData);
+
+ CHECK(Locations[3].BlockIndex == Locations[2].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[3]) == FourthChunkData);
+ CHECK(Locations[4].BlockIndex != Locations[3].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[4]) == FifthChunkData);
+}
+
TEST_CASE("blockstore.chunks")
{
using namespace blockstore::impl;
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 3cb1b03a5..60b2fa1a1 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -315,49 +315,58 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
uint64_t RecordObjectSize = Record.GetSize();
uint64_t TransferredSize = RecordObjectSize;
- AttachmentCount Count;
- size_t NumAttachments = Package->GetAttachments().size();
- std::vector<IoHash> ValidAttachments;
- std::vector<IoHash> ReferencedAttachments;
- std::vector<const CbAttachment*> AttachmentsToStoreLocally;
+ AttachmentCount Count;
+ size_t NumAttachments = Package->GetAttachments().size();
+ std::vector<IoHash> ValidAttachments;
+ std::vector<IoHash> ReferencedAttachments;
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
ValidAttachments.reserve(NumAttachments);
- AttachmentsToStoreLocally.reserve(NumAttachments);
+ WriteAttachmentBuffers.reserve(NumAttachments);
+ WriteRawHashes.reserve(NumAttachments);
const bool HasUpstream = m_UpstreamCache.IsActive();
Stopwatch Timer;
- Request.RecordObject.IterateAttachments(
- [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &ReferencedAttachments, &Count, &TransferredSize](
- CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- ReferencedAttachments.push_back(ValueHash);
- if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
- {
- if (Attachment->IsCompressedBinary())
- {
- AttachmentsToStoreLocally.emplace_back(Attachment);
- ValidAttachments.emplace_back(ValueHash);
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
- Request.Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ToString(ZenContentType::kCbPackage),
- ValueHash);
- Count.Invalid++;
- }
- }
- else if (m_CidStore.ContainsChunk(ValueHash))
+ Request.RecordObject.IterateAttachments([this,
+ &Request,
+ Package,
+ &WriteAttachmentBuffers,
+ &WriteRawHashes,
+ &ValidAttachments,
+ &ReferencedAttachments,
+ &Count,
+ &TransferredSize](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ ReferencedAttachments.push_back(ValueHash);
+ if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
+ {
+ if (Attachment->IsCompressedBinary())
{
+ WriteAttachmentBuffers.push_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(ValueHash);
ValidAttachments.emplace_back(ValueHash);
Count.Valid++;
}
- Count.Total++;
- });
+ else
+ {
+ ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ Request.Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ToString(ZenContentType::kCbPackage),
+ ValueHash);
+ Count.Invalid++;
+ }
+ }
+ else if (m_CidStore.ContainsChunk(ValueHash))
+ {
+ ValidAttachments.emplace_back(ValueHash);
+ Count.Valid++;
+ }
+ Count.Total++;
+ });
if (Count.Invalid > 0)
{
@@ -371,16 +380,20 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments);
m_CacheStats.WriteCount++;
- for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
+ if (!WriteAttachmentBuffers.empty())
{
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- if (InsertResult.New)
+ std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < InsertResults.size(); Index++)
{
- Count.New++;
+ if (InsertResults[Index].New)
+ {
+ Count.New++;
+ }
+ TransferredSize += WriteAttachmentBuffers[Index].GetSize();
}
- TransferredSize += Chunk.GetCompressedSize();
}
+ WriteAttachmentBuffers = {};
+ WriteRawHashes = {};
ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}",
Request.Namespace,
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index a1a4a0acc..2fe466028 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -49,15 +49,18 @@ public:
CasImpl(GcManager& Gc);
virtual ~CasImpl();
- virtual void Initialize(const CidStoreConfiguration& InConfig) override;
- virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) override;
- virtual IoBuffer FindChunk(const IoHash& ChunkHash) override;
- virtual bool ContainsChunk(const IoHash& ChunkHash) override;
- virtual void FilterChunks(HashKeySet& InOutChunks) override;
- virtual void Flush() override;
- virtual void ScrubStorage(ScrubContext& Ctx) override;
- virtual void GarbageCollect(GcContext& GcCtx) override;
- virtual CidStoreSize TotalSize() const override;
+ virtual void Initialize(const CidStoreConfiguration& InConfig) override;
+ virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) override;
+ virtual std::vector<InsertResult> InsertChunks(std::span<IoBuffer> Data,
+ std::span<IoHash> ChunkHashes,
+ InsertMode Mode = InsertMode::kMayBeMovedInPlace) override;
+ virtual IoBuffer FindChunk(const IoHash& ChunkHash) override;
+ virtual bool ContainsChunk(const IoHash& ChunkHash) override;
+ virtual void FilterChunks(HashKeySet& InOutChunks) override;
+ virtual void Flush() override;
+ virtual void ScrubStorage(ScrubContext& Ctx) override;
+ virtual void GarbageCollect(GcContext& GcCtx) override;
+ virtual CidStoreSize TotalSize() const override;
private:
CasContainerStrategy m_TinyStrategy;
@@ -250,6 +253,108 @@ CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode)
return m_LargeStrategy.InsertChunk(Chunk, ChunkHash, Mode);
}
+static void
+GetCompactCasResults(CasContainerStrategy& Strategy,
+ std::span<IoBuffer> Data,
+ std::span<IoHash> ChunkHashes,
+ std::span<size_t> Indexes,
+ std::vector<CasStore::InsertResult> Results)
+{
+ const size_t Count = Indexes.size();
+ if (Count == 1)
+ {
+ const size_t Index = Indexes[0];
+ Results[Index] = Strategy.InsertChunk(Data[Index], ChunkHashes[Index]);
+ return;
+ }
+ std::vector<IoBuffer> Chunks;
+ std::vector<IoHash> Hashes;
+ Chunks.reserve(Count);
+ Hashes.reserve(Count);
+ for (size_t Index : Indexes)
+ {
+ Chunks.push_back(Data[Index]);
+ Hashes.push_back(ChunkHashes[Index]);
+ }
+
+ Strategy.InsertChunks(Chunks, Hashes);
+
+ for (size_t Offset = 0; Offset < Count; Offset++)
+ {
+ size_t Index = Indexes[Offset];
+ Results[Index] = Results[Offset];
+ }
+};
+
+static void
+GetFileCasResults(FileCasStrategy& Strategy,
+ CasStore::InsertMode Mode,
+ std::span<IoBuffer> Data,
+ std::span<IoHash> ChunkHashes,
+ std::span<size_t> Indexes,
+ std::vector<CasStore::InsertResult> Results)
+{
+ for (size_t Index : Indexes)
+ {
+ Results[Index] = Strategy.InsertChunk(Data[Index], ChunkHashes[Index], Mode);
+ }
+};
+
+std::vector<CasStore::InsertResult>
+CasImpl::InsertChunks(std::span<IoBuffer> Data, std::span<IoHash> ChunkHashes, CasStore::InsertMode Mode)
+{
+ ZEN_TRACE_CPU("CAS::InsertChunks");
+ ZEN_ASSERT(Data.size() == ChunkHashes.size());
+
+ if (Data.size() == 1)
+ {
+ std::vector<CasStore::InsertResult> Result(1);
+ Result[0] = InsertChunk(Data[0], ChunkHashes[0], Mode);
+ return Result;
+ }
+
+ std::vector<size_t> TinyIndexes;
+ std::vector<size_t> SmallIndexes;
+ std::vector<size_t> LargeIndexes;
+
+ for (size_t Index = 0; Index < Data.size(); Index++)
+ {
+ const uint64_t ChunkSize = Data[Index].Size();
+ if (ChunkSize < m_Config.TinyValueThreshold)
+ {
+ ZEN_ASSERT(ChunkSize);
+ TinyIndexes.push_back(Index);
+ }
+ else if (ChunkSize < m_Config.HugeValueThreshold)
+ {
+ SmallIndexes.push_back(Index);
+ }
+ else
+ {
+ LargeIndexes.push_back(Index);
+ }
+ }
+
+ std::vector<CasStore::InsertResult> Result(Data.size());
+
+ if (!TinyIndexes.empty())
+ {
+ GetCompactCasResults(m_TinyStrategy, Data, ChunkHashes, TinyIndexes, Result);
+ }
+
+ if (!SmallIndexes.empty())
+ {
+ GetCompactCasResults(m_SmallStrategy, Data, ChunkHashes, SmallIndexes, Result);
+ }
+
+ if (!LargeIndexes.empty())
+ {
+ GetFileCasResults(m_LargeStrategy, Mode, Data, ChunkHashes, LargeIndexes, Result);
+ }
+
+ return Result;
+}
+
IoBuffer
CasImpl::FindChunk(const IoHash& ChunkHash)
{
diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h
index b951d6d59..f93724905 100644
--- a/src/zenstore/cas.h
+++ b/src/zenstore/cas.h
@@ -40,13 +40,16 @@ public:
virtual void Initialize(const CidStoreConfiguration& Config) = 0;
virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace) = 0;
- virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0;
- virtual bool ContainsChunk(const IoHash& ChunkHash) = 0;
- virtual void FilterChunks(HashKeySet& InOutChunks) = 0;
- virtual void Flush() = 0;
- virtual void ScrubStorage(ScrubContext& Ctx) = 0;
- virtual void GarbageCollect(GcContext& GcCtx) = 0;
- virtual CidStoreSize TotalSize() const = 0;
+ virtual std::vector<InsertResult> InsertChunks(std::span<IoBuffer> Data,
+ std::span<IoHash> ChunkHashes,
+ InsertMode Mode = InsertMode::kMayBeMovedInPlace) = 0;
+ virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0;
+ virtual bool ContainsChunk(const IoHash& ChunkHash) = 0;
+ virtual void FilterChunks(HashKeySet& InOutChunks) = 0;
+ virtual void Flush() = 0;
+ virtual void ScrubStorage(ScrubContext& Ctx) = 0;
+ virtual void GarbageCollect(GcContext& GcCtx) = 0;
+ virtual CidStoreSize TotalSize() const = 0;
protected:
CidStoreConfiguration m_Config;
diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp
index 67b7e95ac..68bccd06b 100644
--- a/src/zenstore/cidstore.cpp
+++ b/src/zenstore/cidstore.cpp
@@ -45,6 +45,50 @@ struct CidStore::Impl
return {.New = Result.New};
}
+ std::vector<CidStore::InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes, CidStore::InsertMode Mode)
+ {
+ if (ChunkDatas.size() == 1)
+ {
+ std::vector<CidStore::InsertResult> Result(1);
+ Result[0] = AddChunk(ChunkDatas[0], RawHashes[0], Mode);
+ return Result;
+ }
+ ZEN_ASSERT(ChunkDatas.size() == RawHashes.size());
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(ChunkDatas.size());
+#if ZEN_BUILD_DEBUG
+ size_t Offset = 0;
+#endif
+ uint64_t TotalSize = 0;
+ for (const IoBuffer& ChunkData : ChunkDatas)
+ {
+ TotalSize += ChunkData.GetSize();
+#if ZEN_BUILD_DEBUG
+ IoHash VerifyRawHash;
+ uint64_t _;
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _) && RawHashes[Offset++] == VerifyRawHash);
+#endif
+ Chunks.push_back(ChunkData);
+ Chunks.back().SetContentType(ZenContentType::kCompressedBinary);
+ }
+
+ metrics::RequestStats::Scope $(m_AddChunkOps, TotalSize);
+
+ std::vector<CasStore::InsertResult> CasResults =
+ m_CasStore.InsertChunks(Chunks, RawHashes, static_cast<CasStore::InsertMode>(Mode));
+ ZEN_ASSERT(CasResults.size() == ChunkDatas.size());
+ std::vector<CidStore::InsertResult> Result;
+ for (const CasStore::InsertResult& CasResult : CasResults)
+ {
+ if (CasResult.New)
+ {
+ m_WriteCount++;
+ }
+ Result.emplace_back(CidStore::InsertResult{.New = CasResult.New});
+ }
+ return Result;
+ }
+
IoBuffer FindChunkByCid(const IoHash& DecompressedId)
{
metrics::RequestStats::Scope StatsScope(m_FindChunkOps, 0);
@@ -145,6 +189,12 @@ CidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode
return m_Impl->AddChunk(ChunkData, RawHash, Mode);
}
+std::vector<CidStore::InsertResult>
+CidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes, InsertMode Mode)
+{
+ return m_Impl->AddChunks(ChunkDatas, RawHashes, Mode);
+}
+
IoBuffer
CidStore::FindChunkByCid(const IoHash& DecompressedId)
{
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 84905df15..ec2bfbdec 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -204,6 +204,64 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
}
+std::vector<CasStore::InsertResult>
+CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes)
+{
+ ZEN_ASSERT(Chunks.size() == ChunkHashes.size());
+ std::vector<CasStore::InsertResult> Result(Chunks.size());
+ std::vector<size_t> NewChunkIndexes;
+ Result.reserve(Chunks.size());
+ {
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++)
+ {
+ const IoHash& ChunkHash = ChunkHashes[ChunkIndex];
+ bool IsNew = !m_LocationMap.contains(ChunkHash);
+ Result[ChunkIndex] = CasStore::InsertResult{.New = IsNew};
+ if (IsNew)
+ {
+ NewChunkIndexes.push_back(ChunkIndex);
+ }
+ }
+ }
+
+ if (NewChunkIndexes.empty())
+ {
+ return Result;
+ }
+
+ std::vector<IoBuffer> Datas;
+ for (size_t ChunkIndex : NewChunkIndexes)
+ {
+ const IoBuffer& Chunk = Chunks[ChunkIndex];
+#if !ZEN_WITH_TESTS
+ ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
+#endif
+ Datas.emplace_back(Chunk);
+ }
+
+ size_t ChunkOffset = 0;
+ m_BlockStore.WriteChunks(Datas, m_PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
+ std::vector<CasDiskIndexEntry> IndexEntries;
+ for (const BlockStoreLocation& Location : Locations)
+ {
+ size_t ChunkIndex = NewChunkIndexes[ChunkOffset++];
+ IndexEntries.emplace_back(
+ CasDiskIndexEntry{.Key = ChunkHashes[ChunkIndex], .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)});
+ }
+ m_CasLog.Append(IndexEntries);
+ {
+ RwLock::ExclusiveLockScope _(m_LocationMapLock);
+ for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries)
+ {
+ m_LocationMap.emplace(DiskIndexEntry.Key, m_Locations.size());
+ m_Locations.push_back(DiskIndexEntry.Location);
+ }
+ }
+ });
+ return Result;
+}
+
IoBuffer
CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h
index 932844da7..eb1d36b31 100644
--- a/src/zenstore/compactcas.h
+++ b/src/zenstore/compactcas.h
@@ -51,16 +51,17 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore
CasContainerStrategy(GcManager& Gc);
~CasContainerStrategy();
- CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
- IoBuffer FindChunk(const IoHash& ChunkHash);
- bool HaveChunk(const IoHash& ChunkHash);
- void FilterChunks(HashKeySet& InOutChunks);
- void Initialize(const std::filesystem::path& RootDirectory,
- const std::string_view ContainerBaseName,
- uint32_t MaxBlockSize,
- uint32_t Alignment,
- bool IsNewStore);
- void Flush();
+ CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
+ std::vector<CasStore::InsertResult> InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes);
+ IoBuffer FindChunk(const IoHash& ChunkHash);
+ bool HaveChunk(const IoHash& ChunkHash);
+ void FilterChunks(HashKeySet& InOutChunks);
+ void Initialize(const std::filesystem::path& RootDirectory,
+ const std::string_view ContainerBaseName,
+ uint32_t MaxBlockSize,
+ uint32_t Alignment,
+ bool IsNewStore);
+ void Flush();
// GcStorage
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index 656040936..c28aa8102 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -161,6 +161,9 @@ public:
void WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, const WriteChunkCallback& Callback);
+ typedef std::function<void(std::span<BlockStoreLocation> Locations)> WriteChunksCallback;
+ void WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback);
+
IoBuffer TryGetChunk(const BlockStoreLocation& Location) const;
void Flush(bool ForceNewBlock);
diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h
index 4c9f30608..54f562767 100644
--- a/src/zenstore/include/zenstore/cidstore.h
+++ b/src/zenstore/include/zenstore/cidstore.h
@@ -73,15 +73,18 @@ public:
kMayBeMovedInPlace
};
- void Initialize(const CidStoreConfiguration& Config);
- InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace);
- virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
- bool ContainsChunk(const IoHash& DecompressedId);
- void FilterChunks(HashKeySet& InOutChunks);
- void Flush();
- void ScrubStorage(ScrubContext& Ctx);
- CidStoreSize TotalSize() const;
- CidStoreStats Stats() const;
+ void Initialize(const CidStoreConfiguration& Config);
+ InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace);
+ std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas,
+ std::span<IoHash> RawHashes,
+ InsertMode Mode = InsertMode::kMayBeMovedInPlace);
+ virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+ bool ContainsChunk(const IoHash& DecompressedId);
+ void FilterChunks(HashKeySet& InOutChunks);
+ void Flush();
+ void ScrubStorage(ScrubContext& Ctx);
+ CidStoreSize TotalSize() const;
+ CidStoreStats Stats() const;
virtual void ReportMetrics(StatsMetrics& Statsd) override;