aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-22 20:21:02 +0200
committerGitHub Enterprise <[email protected]>2024-04-22 20:21:02 +0200
commit96f44f2f2d8cbcda254d0b193f5a1aece645daeb (patch)
tree9d1975c4d76d7a577ecfe8e2fe9456738571528b /src
parentfix LogRemoteStoreStatsDetails (#53) (diff)
downloadzen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.tar.xz
zen-96f44f2f2d8cbcda254d0b193f5a1aece645daeb.zip
InsertChunks for CAS store (#55)
- Improvement: Add batching when writing multiple small chunks to block store - decreases I/O load significantly on oplog import
Diffstat (limited to 'src')
-rw-r--r--src/zenserver-test/zenserver-test.cpp4
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp77
-rw-r--r--src/zenserver/projectstore/projectstore.cpp59
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp139
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h4
-rw-r--r--src/zenstore/blockstore.cpp163
-rw-r--r--src/zenstore/cache/cacherpc.cpp91
-rw-r--r--src/zenstore/cas.cpp123
-rw-r--r--src/zenstore/cas.h17
-rw-r--r--src/zenstore/cidstore.cpp50
-rw-r--r--src/zenstore/compactcas.cpp58
-rw-r--r--src/zenstore/compactcas.h21
-rw-r--r--src/zenstore/include/zenstore/blockstore.h3
-rw-r--r--src/zenstore/include/zenstore/cidstore.h21
14 files changed, 653 insertions, 177 deletions
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index 4d2c507fe..ccea53942 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -2873,6 +2873,10 @@ TEST_CASE("project.remote")
CHECK(IsHttpSuccessCode(Response.status_code));
CbPackage ResponsePackage = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()));
CHECK(ResponsePackage.GetAttachments().size() == AttachmentHashes.size());
+ for (auto A : ResponsePackage.GetAttachments())
+ {
+ CHECK(IoHash::HashBuffer(A.AsCompressedBinary().DecompressToComposite()) == A.GetHash());
+ }
};
auto ValidateOplog = [&SourceOps, &AddOp, &Servers, &Session](int ServerIndex, std::string_view Project, std::string_view Oplog) {
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index f64b9c5a5..8106e9db9 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -848,17 +848,20 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
CbPackage Package;
if (Package.TryLoad(ClientResultValue.Value))
{
- CbObject CacheRecord = Package.GetObject();
- AttachmentCount Count;
- size_t NumAttachments = Package.GetAttachments().size();
- std::vector<const CbAttachment*> AttachmentsToStoreLocally;
- std::vector<IoHash> ReferencedAttachments;
- AttachmentsToStoreLocally.reserve(NumAttachments);
+ CbObject CacheRecord = Package.GetObject();
+ AttachmentCount Count;
+ size_t NumAttachments = Package.GetAttachments().size();
+ std::vector<IoHash> ReferencedAttachments;
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ WriteAttachmentBuffers.reserve(NumAttachments);
+ std::vector<IoHash> WriteRawHashes;
+ WriteRawHashes.reserve(NumAttachments);
CacheRecord.IterateAttachments([this,
&Package,
&Ref,
- &AttachmentsToStoreLocally,
+ &WriteAttachmentBuffers,
+ &WriteRawHashes,
&ReferencedAttachments,
&Count,
QueryLocal,
@@ -872,7 +875,9 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
{
if (StoreLocal)
{
- AttachmentsToStoreLocally.emplace_back(Attachment);
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ WriteAttachmentBuffers.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(Attachment->GetHash());
}
Count.Valid++;
}
@@ -923,18 +928,22 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
m_CacheStore
.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments);
m_CacheStats.WriteCount++;
- }
- for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
- {
- ZEN_ASSERT_SLOW(StoreLocal);
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult =
- m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- if (InsertResult.New)
+ if (!WriteAttachmentBuffers.empty())
{
- Count.New++;
+ std::vector<CidStore::InsertResult> InsertResults =
+ m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (const CidStore::InsertResult& Result : InsertResults)
+ {
+ if (Result.New)
+ {
+ Count.New++;
+ }
+ }
}
+
+ WriteAttachmentBuffers = {};
+ WriteRawHashes = {};
}
BinaryWriter MemStream;
@@ -1151,23 +1160,27 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
CbObject CacheRecord = Package.GetObject();
- AttachmentCount Count;
- size_t NumAttachments = Package.GetAttachments().size();
- std::vector<IoHash> ValidAttachments;
- std::vector<IoHash> ReferencedAttachments;
- std::vector<const CbAttachment*> AttachmentsToStoreLocally;
+ AttachmentCount Count;
+ size_t NumAttachments = Package.GetAttachments().size();
+ std::vector<IoHash> ValidAttachments;
+ std::vector<IoHash> ReferencedAttachments;
ValidAttachments.reserve(NumAttachments);
- AttachmentsToStoreLocally.reserve(NumAttachments);
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ WriteAttachmentBuffers.reserve(NumAttachments);
+ WriteRawHashes.reserve(NumAttachments);
CacheRecord.IterateAttachments(
- [this, &Ref, &Package, &AttachmentsToStoreLocally, &ValidAttachments, &ReferencedAttachments, &Count](CbFieldView HashView) {
+ [this, &Ref, &Package, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, &ReferencedAttachments, &Count](
+ CbFieldView HashView) {
const IoHash Hash = HashView.AsHash();
ReferencedAttachments.push_back(Hash);
if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
{
if (Attachment->IsCompressedBinary())
{
- AttachmentsToStoreLocally.emplace_back(Attachment);
+ WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(Hash);
ValidAttachments.emplace_back(Hash);
Count.Valid++;
}
@@ -1202,14 +1215,18 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments);
m_CacheStats.WriteCount++;
- for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
+ if (!WriteAttachmentBuffers.empty())
{
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- if (InsertResult.New)
+ std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (const CidStore::InsertResult& InsertResult : InsertResults)
{
- Count.New++;
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
}
+ WriteAttachmentBuffers = {};
+ WriteRawHashes = {};
}
ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}",
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 3a7922aaf..3c281275e 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1448,19 +1448,36 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
auto Attachments = OpPackage.GetAttachments();
- for (const auto& Attach : Attachments)
+ if (!Attachments.empty())
{
- ZEN_ASSERT(Attach.IsCompressedBinary());
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ std::vector<uint64_t> WriteRawSizes;
- CompressedBuffer AttachmentData = Attach.AsCompressedBinary();
- const uint64_t AttachmentSize = AttachmentData.DecodeRawSize();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData.GetCompressed().Flatten().AsIoBuffer(), Attach.GetHash());
+ WriteAttachmentBuffers.reserve(Attachments.size());
+ WriteRawHashes.reserve(Attachments.size());
+ WriteRawSizes.reserve(Attachments.size());
- if (InsertResult.New)
+ for (const auto& Attach : Attachments)
{
- NewAttachmentBytes += AttachmentSize;
+ ZEN_ASSERT(Attach.IsCompressedBinary());
+
+ CompressedBuffer AttachmentData = Attach.AsCompressedBinary();
+ const uint64_t AttachmentSize = AttachmentData.DecodeRawSize();
+ WriteAttachmentBuffers.push_back(AttachmentData.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(Attach.GetHash());
+ WriteRawSizes.push_back(AttachmentSize);
+ AttachmentBytes += AttachmentSize;
+ }
+
+ std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < InsertResults.size(); Index++)
+ {
+ if (InsertResults[Index].New)
+ {
+ NewAttachmentBytes += WriteRawSizes[Index];
+ }
}
- AttachmentBytes += AttachmentSize;
}
ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes));
@@ -3354,12 +3371,25 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
return true;
}
+
std::span<const CbAttachment> Attachments = Package.GetAttachments();
- for (const CbAttachment& Attachment : Attachments)
+ if (!Attachments.empty())
{
- IoHash RawHash = Attachment.GetHash();
- CompressedBuffer Compressed = Attachment.AsCompressedBinary();
- m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), RawHash, CidStore::InsertMode::kCopyOnly);
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+
+ WriteAttachmentBuffers.reserve(Attachments.size());
+ WriteRawHashes.reserve(Attachments.size());
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ IoHash RawHash = Attachment.GetHash();
+ CompressedBuffer Compressed = Attachment.AsCompressedBinary();
+ WriteAttachmentBuffers.push_back(Compressed.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(RawHash);
+ }
+
+ m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
}
HttpReq.WriteResponse(HttpResponseCode::OK);
return true;
@@ -4703,9 +4733,8 @@ TEST_CASE("project.store.block")
return CompositeBuffer(SharedBuffer(Buffer));
}));
}
- CompressedBuffer Block = GenerateBlock(std::move(Chunks));
- IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer();
- CHECK(IterateBlock(Block.DecodeRawHash(), std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {}));
+ CompressedBuffer Block = GenerateBlock(std::move(Chunks));
+ CHECK(IterateBlock(Block.Decompress(), [](CompressedBuffer&&, const IoHash&) {}));
}
#endif
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index e9c6964c5..e922fcf1c 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -143,20 +143,8 @@ LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats)
}
bool
-IterateBlock(const IoHash& BlockHash,
- IoBuffer&& CompressedBlock,
- std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
+IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
{
- IoHash RawHash;
- uint64_t RawSize;
- IoBuffer BlockPayload =
- CompressedBuffer::FromCompressed(SharedBuffer(std::move(CompressedBlock)), RawHash, RawSize).Decompress().AsIoBuffer();
- if (RawHash != BlockHash)
- {
- ZEN_WARN("Header rawhash for downloaded block {} does not match, got {}", BlockHash, RawHash);
- return false;
- }
-
MemoryView BlockView = BlockPayload.GetView();
const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData());
uint32_t NumberSize;
@@ -2126,20 +2114,6 @@ ParseOplogContainer(const CbObject& ContainerObject,
Stopwatch Timer;
- size_t NeedAttachmentCount = 0;
- CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
- for (CbFieldView LargeChunksField : LargeChunksArray)
- {
- IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
- if (HasAttachment(AttachmentHash))
- {
- continue;
- }
- OnNeedAttachment(AttachmentHash);
- NeedAttachmentCount++;
- };
- ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num()));
-
size_t NeedBlockCount = 0;
CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
for (CbFieldView BlockField : BlocksArray)
@@ -2185,6 +2159,21 @@ ParseOplogContainer(const CbObject& ContainerObject,
}
}
}
+ ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
+
+ size_t NeedAttachmentCount = 0;
+ CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
+ for (CbFieldView LargeChunksField : LargeChunksArray)
+ {
+ IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
+ if (HasAttachment(AttachmentHash))
+ {
+ continue;
+ }
+ OnNeedAttachment(AttachmentHash);
+ NeedAttachmentCount++;
+ };
+ ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num()));
CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView();
for (CbFieldView ChunkedFileField : ChunkedFilesArray)
@@ -2215,8 +2204,6 @@ ParseOplogContainer(const CbObject& ContainerObject,
Chunked.ChunkHashes.size());
}
- ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
-
MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer();
@@ -2474,17 +2461,31 @@ LoadOplog(CidStore& ChunkStore,
{
return;
}
- for (const auto& It : Chunks)
+
+ if (!Chunks.empty())
{
- uint64_t ChunkSize = It.second.GetCompressedSize();
- Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
- CidStore::InsertResult InsertResult = ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(),
- It.first,
- CidStore::InsertMode::kCopyOnly);
- if (InsertResult.New)
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ WriteAttachmentBuffers.reserve(Chunks.size());
+ WriteRawHashes.reserve(Chunks.size());
+
+ for (const auto& It : Chunks)
{
- Info.AttachmentBytesStored.fetch_add(ChunkSize);
- Info.AttachmentsStored.fetch_add(1);
+ uint64_t ChunkSize = It.second.GetCompressedSize();
+ Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(It.first);
+ }
+ std::vector<CidStore::InsertResult> InsertResults =
+ ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
+
+ for (size_t Index = 0; Index < InsertResults.size(); Index++)
+ {
+ if (InsertResults[Index].New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
}
}
});
@@ -2558,23 +2559,36 @@ LoadOplog(CidStore& ChunkStore,
std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
WantedChunks.reserve(Chunks.size());
WantedChunks.insert(Chunks.begin(), Chunks.end());
- bool StoreChunksOK =
- IterateBlock(BlockHash,
- IoBuffer(Bytes),
- [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
- if (WantedChunks.contains(AttachmentRawHash))
- {
- uint64_t ChunkSize = Chunk.GetCompressedSize();
- CidStore::InsertResult InsertResult =
- ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
- if (InsertResult.New)
- {
- Info.AttachmentBytesStored.fetch_add(ChunkSize);
- Info.AttachmentsStored.fetch_add(1);
- }
- WantedChunks.erase(AttachmentRawHash);
- }
- });
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+
+ IoHash RawHash;
+ uint64_t RawSize;
+ SharedBuffer BlockPayload = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize).Decompress();
+ if (RawHash != BlockHash)
+ {
+ ReportMessage(OptionalContext, fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash));
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash),
+ {});
+ }
+
+ bool StoreChunksOK = IterateBlock(
+ BlockPayload,
+ [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk,
+ const IoHash& AttachmentRawHash) {
+ if (WantedChunks.contains(AttachmentRawHash))
+ {
+ WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ IoHash RawHash;
+ uint64_t RawSize;
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize));
+ ZEN_ASSERT(RawHash == AttachmentRawHash);
+ WriteRawHashes.emplace_back(AttachmentRawHash);
+ WantedChunks.erase(AttachmentRawHash);
+ }
+ });
+
if (!StoreChunksOK)
{
ReportMessage(OptionalContext,
@@ -2587,7 +2601,22 @@ LoadOplog(CidStore& ChunkStore,
{});
return;
}
+
ZEN_ASSERT(WantedChunks.empty());
+
+ if (!WriteAttachmentBuffers.empty())
+ {
+ auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < Results.size(); Index++)
+ {
+ const auto& Result = Results[Index];
+ if (Result.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
+ }
});
});
};
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index d4ccd8c7b..d6e064bdf 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -162,8 +162,6 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
JobContext* OptionalContext);
CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks);
-bool IterateBlock(const IoHash& BlockHash,
- IoBuffer&& CompressedBlock,
- std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
+bool IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
} // namespace zen
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 644ddf7b4..8ef9e842f 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -453,6 +453,103 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, cons
}
}
+void
+BlockStore::WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback)
+{
+ ZEN_TRACE_CPU("BlockStore::WriteChunks");
+
+ ZEN_ASSERT(!Datas.empty());
+ ZEN_ASSERT(Alignment > 0u);
+
+ const size_t TotalCount = Datas.size();
+ uint64_t TotalSize = 0;
+ uint64_t LargestSize = 0;
+ for (const IoBuffer& Data : Datas)
+ {
+ uint64_t Size = Data.GetSize();
+ ZEN_ASSERT(Size > 0);
+ ZEN_ASSERT(Size <= m_MaxBlockSize);
+ TotalSize += Size;
+ LargestSize = Max(LargestSize, Size);
+ }
+
+ const uint64_t MinSize = Max(LargestSize, 8u * 1024u * 1024u);
+ const uint64_t BufferSize = Min(TotalSize, MinSize);
+ std::vector<uint8_t> Buffer(BufferSize);
+
+ size_t Offset = 0;
+ while (Offset < TotalCount)
+ {
+ size_t Count = 1;
+ uint32_t RangeSize = gsl::narrow<uint32_t>(Datas[Offset].GetSize());
+
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ uint32_t AlignedInsertOffset = RoundUp(m_CurrentInsertOffset, Alignment);
+ if ((!m_WriteBlock) || ((AlignedInsertOffset + RangeSize) > m_MaxBlockSize))
+ {
+ std::filesystem::path BlockPath;
+ WriteBlockIndex = GetFreeBlockIndex(WriteBlockIndex, InsertLock, BlockPath);
+ if (WriteBlockIndex == (uint32_t)m_MaxBlockCount)
+ {
+ throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath));
+ }
+ Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath));
+ NewBlockFile->Create(m_MaxBlockSize);
+
+ m_ChunkBlocks[WriteBlockIndex] = NewBlockFile;
+ m_WriteBlock = NewBlockFile;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ m_CurrentInsertOffset = 0;
+ AlignedInsertOffset = 0;
+ }
+ while (Offset + Count < TotalCount)
+ {
+ uint32_t NextRangeSize = gsl::narrow<uint32_t>(RoundUp(RangeSize, Alignment) + Datas[Offset + Count].GetSize());
+ if ((AlignedInsertOffset + NextRangeSize) > m_MaxBlockSize || (NextRangeSize > BufferSize))
+ {
+ break;
+ }
+ Count++;
+ RangeSize = NextRangeSize;
+ }
+ m_CurrentInsertOffset = AlignedInsertOffset + RangeSize;
+ Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
+ m_ActiveWriteBlocks.push_back(WriteBlockIndex);
+ InsertLock.ReleaseNow();
+
+ {
+ MutableMemoryView WriteBuffer(Buffer.data(), RangeSize);
+ for (size_t Index = 0; Index < Count; Index++)
+ {
+ MemoryView SourceBuffer = Datas[Index + Offset];
+ WriteBuffer.CopyFrom(SourceBuffer);
+ WriteBuffer.MidInline(RoundUp(SourceBuffer.GetSize(), Alignment));
+ }
+ WriteBlock->Write(Buffer.data(), RangeSize, AlignedInsertOffset);
+ }
+
+ m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed);
+
+ uint32_t ChunkOffset = AlignedInsertOffset;
+ std::vector<BlockStoreLocation> Locations(Count);
+ for (size_t Index = 0; Index < Count; Index++)
+ {
+ uint32_t ChunkSize = gsl::narrow<uint32_t>(Datas[Offset + Index].GetSize());
+ Locations[Index] = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset, .Size = ChunkSize};
+ ChunkOffset += gsl::narrow<uint32_t>(RoundUp(ChunkSize, Alignment));
+ }
+ Callback(Locations);
+
+ {
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex));
+ }
+
+ Offset += Count;
+ }
+}
+
BlockStore::ReclaimSnapshotState
BlockStore::GetReclaimSnapshotState()
{
@@ -1543,6 +1640,72 @@ namespace blockstore::impl {
} // namespace blockstore::impl
+TEST_CASE("blockstore.multichunks")
+{
+ using namespace blockstore::impl;
+
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory, 128, 1024);
+
+ std::vector<IoBuffer> MultiChunkData;
+ std::string FirstChunkData = "0123456789012345678901234567890123456789012345678901234567890123";
+ MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FirstChunkData.data(), FirstChunkData.size()));
+
+ std::string SecondChunkData = "12345678901234567890123456789012345678901234567890123456";
+ MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, SecondChunkData.data(), SecondChunkData.size()));
+
+ std::string ThirdChunkData = "789012345678901234567890123456789012345678901234567890";
+ MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, ThirdChunkData.data(), ThirdChunkData.size()));
+
+ BlockStoreLocation Locations[5];
+ size_t ChunkOffset = 0;
+ Store.WriteChunks(MultiChunkData, 4, [&](std::span<BlockStoreLocation> InLocations) {
+ for (const BlockStoreLocation& Location : InLocations)
+ {
+ Locations[ChunkOffset++] = Location;
+ }
+ CHECK(ChunkOffset <= MultiChunkData.size());
+ });
+ CHECK(ChunkOffset == 3);
+
+ CHECK(ReadChunkAsString(Store, Locations[0]) == FirstChunkData);
+ CHECK(Locations[1].BlockIndex == Locations[0].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[1]) == SecondChunkData);
+ CHECK(Locations[2].BlockIndex != Locations[1].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[2]) == ThirdChunkData);
+
+ MultiChunkData.resize(0);
+ std::string FourthChunkData = "ABCDEFGHIJABCDEFGHIJ_23";
+ MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FourthChunkData.data(), FourthChunkData.size()));
+
+ std::string FifthChunkData =
+ "ABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJABCDEFGHIJ_93";
+ MultiChunkData.push_back(IoBuffer(IoBuffer::Wrap, FifthChunkData.data(), FifthChunkData.size()));
+
+ Store.WriteChunks(MultiChunkData, 4, [&](std::span<BlockStoreLocation> InLocations) {
+ for (const BlockStoreLocation& Location : InLocations)
+ {
+ CHECK(ChunkOffset < 5);
+ Locations[ChunkOffset++] = Location;
+ }
+ });
+ CHECK(ChunkOffset == 5);
+
+ CHECK(ReadChunkAsString(Store, Locations[0]) == FirstChunkData);
+ CHECK(Locations[1].BlockIndex == Locations[0].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[1]) == SecondChunkData);
+ CHECK(Locations[2].BlockIndex != Locations[1].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[2]) == ThirdChunkData);
+
+ CHECK(Locations[3].BlockIndex == Locations[2].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[3]) == FourthChunkData);
+ CHECK(Locations[4].BlockIndex != Locations[3].BlockIndex);
+ CHECK(ReadChunkAsString(Store, Locations[4]) == FifthChunkData);
+}
+
TEST_CASE("blockstore.chunks")
{
using namespace blockstore::impl;
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 3cb1b03a5..60b2fa1a1 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -315,49 +315,58 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
uint64_t RecordObjectSize = Record.GetSize();
uint64_t TransferredSize = RecordObjectSize;
- AttachmentCount Count;
- size_t NumAttachments = Package->GetAttachments().size();
- std::vector<IoHash> ValidAttachments;
- std::vector<IoHash> ReferencedAttachments;
- std::vector<const CbAttachment*> AttachmentsToStoreLocally;
+ AttachmentCount Count;
+ size_t NumAttachments = Package->GetAttachments().size();
+ std::vector<IoHash> ValidAttachments;
+ std::vector<IoHash> ReferencedAttachments;
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
ValidAttachments.reserve(NumAttachments);
- AttachmentsToStoreLocally.reserve(NumAttachments);
+ WriteAttachmentBuffers.reserve(NumAttachments);
+ WriteRawHashes.reserve(NumAttachments);
const bool HasUpstream = m_UpstreamCache.IsActive();
Stopwatch Timer;
- Request.RecordObject.IterateAttachments(
- [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &ReferencedAttachments, &Count, &TransferredSize](
- CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- ReferencedAttachments.push_back(ValueHash);
- if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
- {
- if (Attachment->IsCompressedBinary())
- {
- AttachmentsToStoreLocally.emplace_back(Attachment);
- ValidAttachments.emplace_back(ValueHash);
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
- Request.Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ToString(ZenContentType::kCbPackage),
- ValueHash);
- Count.Invalid++;
- }
- }
- else if (m_CidStore.ContainsChunk(ValueHash))
+ Request.RecordObject.IterateAttachments([this,
+ &Request,
+ Package,
+ &WriteAttachmentBuffers,
+ &WriteRawHashes,
+ &ValidAttachments,
+ &ReferencedAttachments,
+ &Count,
+ &TransferredSize](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ ReferencedAttachments.push_back(ValueHash);
+ if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
+ {
+ if (Attachment->IsCompressedBinary())
{
+ WriteAttachmentBuffers.push_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(ValueHash);
ValidAttachments.emplace_back(ValueHash);
Count.Valid++;
}
- Count.Total++;
- });
+ else
+ {
+ ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ Request.Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ToString(ZenContentType::kCbPackage),
+ ValueHash);
+ Count.Invalid++;
+ }
+ }
+ else if (m_CidStore.ContainsChunk(ValueHash))
+ {
+ ValidAttachments.emplace_back(ValueHash);
+ Count.Valid++;
+ }
+ Count.Total++;
+ });
if (Count.Invalid > 0)
{
@@ -371,16 +380,20 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments);
m_CacheStats.WriteCount++;
- for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
+ if (!WriteAttachmentBuffers.empty())
{
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- if (InsertResult.New)
+ std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < InsertResults.size(); Index++)
{
- Count.New++;
+ if (InsertResults[Index].New)
+ {
+ Count.New++;
+ }
+ TransferredSize += WriteAttachmentBuffers[Index].GetSize();
}
- TransferredSize += Chunk.GetCompressedSize();
}
+ WriteAttachmentBuffers = {};
+ WriteRawHashes = {};
ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}",
Request.Namespace,
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index a1a4a0acc..2fe466028 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -49,15 +49,18 @@ public:
CasImpl(GcManager& Gc);
virtual ~CasImpl();
- virtual void Initialize(const CidStoreConfiguration& InConfig) override;
- virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) override;
- virtual IoBuffer FindChunk(const IoHash& ChunkHash) override;
- virtual bool ContainsChunk(const IoHash& ChunkHash) override;
- virtual void FilterChunks(HashKeySet& InOutChunks) override;
- virtual void Flush() override;
- virtual void ScrubStorage(ScrubContext& Ctx) override;
- virtual void GarbageCollect(GcContext& GcCtx) override;
- virtual CidStoreSize TotalSize() const override;
+ virtual void Initialize(const CidStoreConfiguration& InConfig) override;
+ virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) override;
+ virtual std::vector<InsertResult> InsertChunks(std::span<IoBuffer> Data,
+ std::span<IoHash> ChunkHashes,
+ InsertMode Mode = InsertMode::kMayBeMovedInPlace) override;
+ virtual IoBuffer FindChunk(const IoHash& ChunkHash) override;
+ virtual bool ContainsChunk(const IoHash& ChunkHash) override;
+ virtual void FilterChunks(HashKeySet& InOutChunks) override;
+ virtual void Flush() override;
+ virtual void ScrubStorage(ScrubContext& Ctx) override;
+ virtual void GarbageCollect(GcContext& GcCtx) override;
+ virtual CidStoreSize TotalSize() const override;
private:
CasContainerStrategy m_TinyStrategy;
@@ -250,6 +253,108 @@ CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode)
return m_LargeStrategy.InsertChunk(Chunk, ChunkHash, Mode);
}
+static void
+GetCompactCasResults(CasContainerStrategy& Strategy,
+ std::span<IoBuffer> Data,
+ std::span<IoHash> ChunkHashes,
+ std::span<size_t> Indexes,
+ std::vector<CasStore::InsertResult> Results)
+{
+ const size_t Count = Indexes.size();
+ if (Count == 1)
+ {
+ const size_t Index = Indexes[0];
+ Results[Index] = Strategy.InsertChunk(Data[Index], ChunkHashes[Index]);
+ return;
+ }
+ std::vector<IoBuffer> Chunks;
+ std::vector<IoHash> Hashes;
+ Chunks.reserve(Count);
+ Hashes.reserve(Count);
+ for (size_t Index : Indexes)
+ {
+ Chunks.push_back(Data[Index]);
+ Hashes.push_back(ChunkHashes[Index]);
+ }
+
+ Strategy.InsertChunks(Chunks, Hashes);
+
+ for (size_t Offset = 0; Offset < Count; Offset++)
+ {
+ size_t Index = Indexes[Offset];
+ Results[Index] = Results[Offset];
+ }
+};
+
+static void
+GetFileCasResults(FileCasStrategy& Strategy,
+ CasStore::InsertMode Mode,
+ std::span<IoBuffer> Data,
+ std::span<IoHash> ChunkHashes,
+ std::span<size_t> Indexes,
+ std::vector<CasStore::InsertResult> Results)
+{
+ for (size_t Index : Indexes)
+ {
+ Results[Index] = Strategy.InsertChunk(Data[Index], ChunkHashes[Index], Mode);
+ }
+};
+
+std::vector<CasStore::InsertResult>
+CasImpl::InsertChunks(std::span<IoBuffer> Data, std::span<IoHash> ChunkHashes, CasStore::InsertMode Mode)
+{
+ ZEN_TRACE_CPU("CAS::InsertChunks");
+ ZEN_ASSERT(Data.size() == ChunkHashes.size());
+
+ if (Data.size() == 1)
+ {
+ std::vector<CasStore::InsertResult> Result(1);
+ Result[0] = InsertChunk(Data[0], ChunkHashes[0], Mode);
+ return Result;
+ }
+
+ std::vector<size_t> TinyIndexes;
+ std::vector<size_t> SmallIndexes;
+ std::vector<size_t> LargeIndexes;
+
+ for (size_t Index = 0; Index < Data.size(); Index++)
+ {
+ const uint64_t ChunkSize = Data[Index].Size();
+ if (ChunkSize < m_Config.TinyValueThreshold)
+ {
+ ZEN_ASSERT(ChunkSize);
+ TinyIndexes.push_back(Index);
+ }
+ else if (ChunkSize < m_Config.HugeValueThreshold)
+ {
+ SmallIndexes.push_back(Index);
+ }
+ else
+ {
+ LargeIndexes.push_back(Index);
+ }
+ }
+
+ std::vector<CasStore::InsertResult> Result(Data.size());
+
+ if (!TinyIndexes.empty())
+ {
+ GetCompactCasResults(m_TinyStrategy, Data, ChunkHashes, TinyIndexes, Result);
+ }
+
+ if (!SmallIndexes.empty())
+ {
+ GetCompactCasResults(m_SmallStrategy, Data, ChunkHashes, SmallIndexes, Result);
+ }
+
+ if (!LargeIndexes.empty())
+ {
+ GetFileCasResults(m_LargeStrategy, Mode, Data, ChunkHashes, LargeIndexes, Result);
+ }
+
+ return Result;
+}
+
IoBuffer
CasImpl::FindChunk(const IoHash& ChunkHash)
{
diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h
index b951d6d59..f93724905 100644
--- a/src/zenstore/cas.h
+++ b/src/zenstore/cas.h
@@ -40,13 +40,16 @@ public:
virtual void Initialize(const CidStoreConfiguration& Config) = 0;
virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace) = 0;
- virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0;
- virtual bool ContainsChunk(const IoHash& ChunkHash) = 0;
- virtual void FilterChunks(HashKeySet& InOutChunks) = 0;
- virtual void Flush() = 0;
- virtual void ScrubStorage(ScrubContext& Ctx) = 0;
- virtual void GarbageCollect(GcContext& GcCtx) = 0;
- virtual CidStoreSize TotalSize() const = 0;
+ virtual std::vector<InsertResult> InsertChunks(std::span<IoBuffer> Data,
+ std::span<IoHash> ChunkHashes,
+ InsertMode Mode = InsertMode::kMayBeMovedInPlace) = 0;
+ virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0;
+ virtual bool ContainsChunk(const IoHash& ChunkHash) = 0;
+ virtual void FilterChunks(HashKeySet& InOutChunks) = 0;
+ virtual void Flush() = 0;
+ virtual void ScrubStorage(ScrubContext& Ctx) = 0;
+ virtual void GarbageCollect(GcContext& GcCtx) = 0;
+ virtual CidStoreSize TotalSize() const = 0;
protected:
CidStoreConfiguration m_Config;
diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp
index 67b7e95ac..68bccd06b 100644
--- a/src/zenstore/cidstore.cpp
+++ b/src/zenstore/cidstore.cpp
@@ -45,6 +45,50 @@ struct CidStore::Impl
return {.New = Result.New};
}
+ std::vector<CidStore::InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes, CidStore::InsertMode Mode)
+ {
+ if (ChunkDatas.size() == 1)
+ {
+ std::vector<CidStore::InsertResult> Result(1);
+ Result[0] = AddChunk(ChunkDatas[0], RawHashes[0], Mode);
+ return Result;
+ }
+ ZEN_ASSERT(ChunkDatas.size() == RawHashes.size());
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(ChunkDatas.size());
+#if ZEN_BUILD_DEBUG
+ size_t Offset = 0;
+#endif
+ uint64_t TotalSize = 0;
+ for (const IoBuffer& ChunkData : ChunkDatas)
+ {
+ TotalSize += ChunkData.GetSize();
+#if ZEN_BUILD_DEBUG
+ IoHash VerifyRawHash;
+ uint64_t _;
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _) && RawHashes[Offset++] == VerifyRawHash);
+#endif
+ Chunks.push_back(ChunkData);
+ Chunks.back().SetContentType(ZenContentType::kCompressedBinary);
+ }
+
+ metrics::RequestStats::Scope $(m_AddChunkOps, TotalSize);
+
+ std::vector<CasStore::InsertResult> CasResults =
+ m_CasStore.InsertChunks(Chunks, RawHashes, static_cast<CasStore::InsertMode>(Mode));
+ ZEN_ASSERT(CasResults.size() == ChunkDatas.size());
+ std::vector<CidStore::InsertResult> Result;
+ for (const CasStore::InsertResult& CasResult : CasResults)
+ {
+ if (CasResult.New)
+ {
+ m_WriteCount++;
+ }
+ Result.emplace_back(CidStore::InsertResult{.New = CasResult.New});
+ }
+ return Result;
+ }
+
IoBuffer FindChunkByCid(const IoHash& DecompressedId)
{
metrics::RequestStats::Scope StatsScope(m_FindChunkOps, 0);
@@ -145,6 +189,12 @@ CidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode
return m_Impl->AddChunk(ChunkData, RawHash, Mode);
}
+std::vector<CidStore::InsertResult>
+CidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes, InsertMode Mode)
+{
+ return m_Impl->AddChunks(ChunkDatas, RawHashes, Mode);
+}
+
IoBuffer
CidStore::FindChunkByCid(const IoHash& DecompressedId)
{
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 84905df15..ec2bfbdec 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -204,6 +204,64 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
}
+std::vector<CasStore::InsertResult>
+CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes)
+{
+ ZEN_ASSERT(Chunks.size() == ChunkHashes.size());
+ std::vector<CasStore::InsertResult> Result(Chunks.size());
+ std::vector<size_t> NewChunkIndexes;
+ Result.reserve(Chunks.size());
+ {
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++)
+ {
+ const IoHash& ChunkHash = ChunkHashes[ChunkIndex];
+ bool IsNew = !m_LocationMap.contains(ChunkHash);
+ Result[ChunkIndex] = CasStore::InsertResult{.New = IsNew};
+ if (IsNew)
+ {
+ NewChunkIndexes.push_back(ChunkIndex);
+ }
+ }
+ }
+
+ if (NewChunkIndexes.empty())
+ {
+ return Result;
+ }
+
+ std::vector<IoBuffer> Datas;
+ for (size_t ChunkIndex : NewChunkIndexes)
+ {
+ const IoBuffer& Chunk = Chunks[ChunkIndex];
+#if !ZEN_WITH_TESTS
+ ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
+#endif
+ Datas.emplace_back(Chunk);
+ }
+
+ size_t ChunkOffset = 0;
+ m_BlockStore.WriteChunks(Datas, m_PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
+ std::vector<CasDiskIndexEntry> IndexEntries;
+ for (const BlockStoreLocation& Location : Locations)
+ {
+ size_t ChunkIndex = NewChunkIndexes[ChunkOffset++];
+ IndexEntries.emplace_back(
+ CasDiskIndexEntry{.Key = ChunkHashes[ChunkIndex], .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)});
+ }
+ m_CasLog.Append(IndexEntries);
+ {
+ RwLock::ExclusiveLockScope _(m_LocationMapLock);
+ for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries)
+ {
+ m_LocationMap.emplace(DiskIndexEntry.Key, m_Locations.size());
+ m_Locations.push_back(DiskIndexEntry.Location);
+ }
+ }
+ });
+ return Result;
+}
+
IoBuffer
CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h
index 932844da7..eb1d36b31 100644
--- a/src/zenstore/compactcas.h
+++ b/src/zenstore/compactcas.h
@@ -51,16 +51,17 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore
CasContainerStrategy(GcManager& Gc);
~CasContainerStrategy();
- CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
- IoBuffer FindChunk(const IoHash& ChunkHash);
- bool HaveChunk(const IoHash& ChunkHash);
- void FilterChunks(HashKeySet& InOutChunks);
- void Initialize(const std::filesystem::path& RootDirectory,
- const std::string_view ContainerBaseName,
- uint32_t MaxBlockSize,
- uint32_t Alignment,
- bool IsNewStore);
- void Flush();
+ CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
+ std::vector<CasStore::InsertResult> InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes);
+ IoBuffer FindChunk(const IoHash& ChunkHash);
+ bool HaveChunk(const IoHash& ChunkHash);
+ void FilterChunks(HashKeySet& InOutChunks);
+ void Initialize(const std::filesystem::path& RootDirectory,
+ const std::string_view ContainerBaseName,
+ uint32_t MaxBlockSize,
+ uint32_t Alignment,
+ bool IsNewStore);
+ void Flush();
// GcStorage
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index 656040936..c28aa8102 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -161,6 +161,9 @@ public:
void WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, const WriteChunkCallback& Callback);
+ typedef std::function<void(std::span<BlockStoreLocation> Locations)> WriteChunksCallback;
+ void WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback);
+
IoBuffer TryGetChunk(const BlockStoreLocation& Location) const;
void Flush(bool ForceNewBlock);
diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h
index 4c9f30608..54f562767 100644
--- a/src/zenstore/include/zenstore/cidstore.h
+++ b/src/zenstore/include/zenstore/cidstore.h
@@ -73,15 +73,18 @@ public:
kMayBeMovedInPlace
};
- void Initialize(const CidStoreConfiguration& Config);
- InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace);
- virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
- bool ContainsChunk(const IoHash& DecompressedId);
- void FilterChunks(HashKeySet& InOutChunks);
- void Flush();
- void ScrubStorage(ScrubContext& Ctx);
- CidStoreSize TotalSize() const;
- CidStoreStats Stats() const;
+ void Initialize(const CidStoreConfiguration& Config);
+ InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace);
+ std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas,
+ std::span<IoHash> RawHashes,
+ InsertMode Mode = InsertMode::kMayBeMovedInPlace);
+ virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+ bool ContainsChunk(const IoHash& DecompressedId);
+ void FilterChunks(HashKeySet& InOutChunks);
+ void Flush();
+ void ScrubStorage(ScrubContext& Ctx);
+ CidStoreSize TotalSize() const;
+ CidStoreStats Stats() const;
virtual void ReportMetrics(StatsMetrics& Statsd) override;