aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/remoteprojectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-03-14 16:50:18 +0100
committerGitHub Enterprise <[email protected]>2024-03-14 16:50:18 +0100
commit0a935231009cb21680d364ef125f0296a5a5bed6 (patch)
tree7e55a67ae60883b0eab71a0d636aeec23f307d14 /src/zenserver/projectstore/remoteprojectstore.cpp
parentclean up test linking (#4) (diff)
downloadzen-0a935231009cb21680d364ef125f0296a5a5bed6.tar.xz
zen-0a935231009cb21680d364ef125f0296a5a5bed6.zip
special treatment large oplog attachments v2 (#5)
- Bugfix: Install Ctrl+C handler earlier when doing `zen oplog-export` and `zen oplog-export` to properly cancel jobs - Improvement: Add ability to block a set of CAS entries from GC in project store - Improvement: Large attachments and loose files are now split into smaller chunks and stored in blocks during oplog export
Diffstat (limited to 'src/zenserver/projectstore/remoteprojectstore.cpp')
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp1652
1 files changed, 1070 insertions, 582 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 672292290..ce3411114 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -12,6 +12,7 @@
#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zencore/workthreadpool.h>
+#include <zenstore/chunkedfile.h>
#include <zenstore/cidstore.h>
#include <zenutil/workerpools.h>
@@ -38,6 +39,14 @@ namespace zen {
CbArray("chunks") // Optional, only if we are not creating blocks (Zen)
CbFieldType::BinaryAttachment // Chunk attachment hashes
+ CbArray("chunkedfiles");
+ CbFieldType::Hash "rawhash"
+ CbFieldType::Integer "rawsize"
+ CbArray("chunks");
+ CbFieldType::Hash "chunkhash"
+ CbArray("sequence");
+ CbFieldType::Integer chunks index
+
CompressedBinary ChunkBlock
{
VarUInt ChunkCount
@@ -143,30 +152,36 @@ IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& C
};
CompressedBuffer
-GenerateBlock(std::vector<SharedBuffer>&& Chunks)
+GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks)
{
- size_t ChunkCount = Chunks.size();
- SharedBuffer SizeBuffer;
+ std::vector<SharedBuffer> ChunkSegments;
+ ChunkSegments.resize(1);
+ ChunkSegments.reserve(1 + FetchChunks.size());
+ size_t ChunkCount = FetchChunks.size();
{
IoBuffer TempBuffer(ChunkCount * 9);
MutableMemoryView View = TempBuffer.GetMutableView();
uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData());
uint8_t* BufferEndPtr = BufferStartPtr;
BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr);
- auto It = Chunks.begin();
- while (It != Chunks.end())
+ for (const auto& It : FetchChunks)
{
- BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(It->GetSize()), BufferEndPtr);
- It++;
+ CompositeBuffer Chunk = It.second(It.first);
+ uint64_t ChunkSize = 0;
+ std::span<const SharedBuffer> Segments = Chunk.GetSegments();
+ for (const SharedBuffer& Segment : Segments)
+ {
+ ChunkSize += Segment.GetSize();
+ ChunkSegments.push_back(Segment);
+ }
+ BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr);
}
ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd());
ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr);
- SizeBuffer = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength)));
+ ChunkSegments[0] = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength)));
}
CompressedBuffer CompressedBlock =
- CompressedBuffer::Compress(CompositeBuffer(std::move(SizeBuffer), CompositeBuffer(std::move(Chunks))),
- OodleCompressor::Mermaid,
- OodleCompressionLevel::None);
+ CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None);
return CompressedBlock;
}
@@ -180,7 +195,7 @@ struct Block
void
CreateBlock(WorkerThreadPool& WorkerPool,
Latch& OpSectionsLatch,
- std::vector<SharedBuffer>&& ChunksInBlock,
+ std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock,
RwLock& SectionsLock,
std::vector<Block>& Blocks,
size_t BlockIndex,
@@ -251,12 +266,16 @@ WriteToTempFile(CompressedBuffer&& CompressedBuffer, std::filesystem::path Path)
RetriesLeft--;
return true;
});
-
uint64_t Offset = 0;
- for (const SharedBuffer& Buffer : CompressedBuffer.GetCompressed().GetSegments())
{
- BlockFile.Write(Buffer.GetView(), Offset);
- Offset += Buffer.GetSize();
+ CompositeBuffer Compressed = std::move(CompressedBuffer).GetCompressed();
+ BasicFileWriter BlockWriter(BlockFile, 64u * 1024u);
+ for (const SharedBuffer& Segment : Compressed.GetSegments())
+ {
+ size_t SegmentSize = Segment.GetSize();
+ BlockWriter.Write(Segment.GetData(), SegmentSize, Offset);
+ Offset += SegmentSize;
+ }
}
void* FileHandle = BlockFile.Detach();
BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true);
@@ -270,13 +289,14 @@ BuildContainer(CidStore& ChunkStore,
ProjectStore::Oplog& Oplog,
size_t MaxBlockSize,
size_t MaxChunkEmbedSize,
+ size_t ChunkFileSizeLimit,
bool BuildBlocks,
bool IgnoreMissingAttachments,
const std::vector<Block>& KnownBlocks,
WorkerThreadPool& WorkerPool,
const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
- const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
+ const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles,
JobContext* OptionalContext,
AsyncRemoteResult& RemoteResult)
@@ -287,22 +307,24 @@ BuildContainer(CidStore& ChunkStore,
CbObject OplogContainerObject;
{
- std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
- std::unordered_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseAttachments;
+ struct FoundAttachment
+ {
+ std::filesystem::path RawPath; // If not stored in cid
+ uint64_t Size = 0;
+ Oid Key = Oid::Zero;
+ };
+
+ std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments;
RwLock BlocksLock;
std::vector<Block> Blocks;
CompressedBuffer OpsBuffer;
- std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
-
- std::unordered_map<IoHash, int, IoHash::Hasher> Attachments;
-
std::filesystem::path AttachmentTempPath = Oplog.TempPath();
AttachmentTempPath.append(".pending");
CreateDirectories(AttachmentTempPath);
- auto RewriteOp = [&](int LSN, CbObjectView Op, const std::function<void(CbObjectView)>& CB) {
+ auto RewriteOp = [&](const Oid& Key, CbObjectView Op, const std::function<void(CbObjectView)>& CB) {
bool OpRewritten = false;
CbArrayView Files = Op["files"sv].AsArrayView();
if (Files.Num() == 0)
@@ -316,6 +338,15 @@ BuildContainer(CidStore& ChunkStore,
for (CbFieldView& Field : Files)
{
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ CB(Op);
+ return;
+ }
+
bool CopyField = true;
if (CbObjectView View = Field.AsObjectView())
@@ -344,73 +375,35 @@ BuildContainer(CidStore& ChunkStore,
throw std::runtime_error(fmt::format("failed to open file '{}'", FilePath));
}
}
- SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath));
- // Loose file, just hash it for now and leave compression for later via callback
-
- const uint64_t RawSize = DataBuffer.GetSize();
- if (RawSize > MaxChunkEmbedSize)
- {
- IoHashStream Hasher;
- CompositeBuffer RawData(DataBuffer);
- UniqueBuffer RawBlockCopy;
- CompositeBuffer::Iterator It = RawData.GetIterator(0);
- const uint64_t BlockSize = MaxChunkEmbedSize;
- for (uint64_t RawOffset = 0; RawOffset < RawSize;)
- {
- const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize);
- const MemoryView RawBlock = RawData.ViewOrCopyRange(It, RawBlockSize, RawBlockCopy);
- Hasher.Append(RawBlock);
- RawOffset += RawBlockSize;
- }
- DataHash = Hasher.GetHash();
- LargeChunkHashes.insert(DataHash);
- }
- else
{
- DataHash = IoHash::HashBuffer(DataBuffer);
+ Stopwatch HashTimer;
+ SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath));
+ DataHash = IoHash::HashBuffer(CompositeBuffer(DataBuffer));
+ ZEN_INFO("Hashed loose file '{}' {}: {} in {}",
+ FilePath,
+ NiceBytes(DataBuffer.GetSize()),
+ DataHash,
+ NiceTimeSpanMs(HashTimer.GetElapsedTimeMs()));
}
- LooseAttachments.insert_or_assign(
- DataHash,
- [AttachmentBuffer = std::move(DataBuffer), &Oplog, AttachmentTempPath](const IoHash& DataHash) -> IoBuffer {
- Stopwatch AttachmentTimer;
- uint64_t RawSize = AttachmentBuffer.GetSize();
- CompressedBuffer Compressed =
- CompressedBuffer::Compress(AttachmentBuffer, OodleCompressor::Mermaid, OodleCompressionLevel::Normal);
- ZEN_ASSERT(Compressed.DecodeRawHash() == DataHash);
- uint64_t PayloadSize = Compressed.GetCompressedSize();
- ZEN_INFO("Compressed loose file attachment {} ({} -> {}) in {}",
- DataHash,
- NiceBytes(RawSize),
- NiceBytes(PayloadSize),
- NiceTimeSpanMs(static_cast<uint64_t>(AttachmentTimer.GetElapsedTimeMs())));
- std::filesystem::path AttachmentPath = AttachmentTempPath;
- AttachmentPath.append(DataHash.ToHexString());
-
- IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed), AttachmentPath);
- ZEN_INFO("Saved temp attachment to '{}', {}", AttachmentPath, NiceBytes(TempAttachmentBuffer.GetSize()));
- return TempAttachmentBuffer;
- });
+ // Rewrite file array entry with new data reference
+ CbObjectWriter Writer;
+ RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
+ if (Field.GetName() == "data"sv)
+ {
+ // omit this field as we will write it explicitly ourselves
+ return true;
+ }
+ return false;
+ });
+ Writer.AddBinaryAttachment("data"sv, DataHash);
+ UploadAttachments.insert_or_assign(DataHash, FoundAttachment{.RawPath = FilePath, .Key = Key});
+
+ CbObject RewrittenOp = Writer.Save();
+ Cbo.AddObject(std::move(RewrittenOp));
+ CopyField = false;
}
-
- // Rewrite file array entry with new data reference
- CbObjectWriter Writer;
- RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
- if (Field.GetName() == "data"sv)
- {
- // omit this field as we will write it explicitly ourselves
- return true;
- }
- return false;
- });
- Writer.AddBinaryAttachment("data"sv, DataHash);
-
- CbObject RewrittenOp = Writer.Save();
- Cbo.AddObject(std::move(RewrittenOp));
- CopyField = false;
-
- Attachments.insert_or_assign(DataHash, LSN);
}
if (CopyField)
@@ -449,24 +442,24 @@ BuildContainer(CidStore& ChunkStore,
Stopwatch Timer;
- tsl::robin_map<int, std::string> OpLSNToKey;
- CompressedBuffer CompressedOpsSection;
+ size_t TotalOpCount = Oplog.GetOplogEntryCount();
+ CompressedBuffer CompressedOpsSection;
{
+ Stopwatch RewriteOplogTimer;
CbObjectWriter SectionOpsWriter;
SectionOpsWriter.BeginArray("ops"sv);
{
- Stopwatch RewriteOplogTimer;
- Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObjectView Op) {
+ Oplog.IterateOplogWithKey([&](int, const Oid& Key, CbObjectView Op) {
if (RemoteResult.IsError())
{
return;
}
- std::string_view Key = Op["key"sv].AsString();
- OpLSNToKey.insert({LSN, std::string(Key)});
- Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); });
+ Op.IterateAttachments([&](CbFieldView FieldView) {
+ UploadAttachments.insert_or_assign(FieldView.AsAttachment(), FoundAttachment{.Key = Key});
+ });
if (EmbedLooseFiles)
{
- RewriteOp(LSN, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; });
+ RewriteOp(Key, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; });
}
else
{
@@ -476,30 +469,42 @@ BuildContainer(CidStore& ChunkStore,
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return;
}
- if (OpCount % 100000 == 0)
+ if (OpCount % 1000 == 0)
{
- ReportMessage(OptionalContext, fmt::format("Building oplog, at op {}...", OpCount));
+ ReportProgress(OptionalContext,
+ fmt::format("Building oplog: {} ops processed", OpCount),
+ TotalOpCount,
+ TotalOpCount - OpCount);
}
});
+ if (RemoteResult.IsError())
+ {
+ return {};
+ }
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return {};
}
- ReportMessage(OptionalContext,
- fmt::format("Rewrote {} ops to new oplog in {}",
- OpCount,
- NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs()))));
+ ReportProgress(OptionalContext, fmt::format("Building oplog: {} ops processed", OpCount), TotalOpCount, 0);
}
SectionOpsWriter.EndArray(); // "ops"
+ ReportMessage(OptionalContext,
+ fmt::format("Rewrote {} ops to new oplog in {}",
+ OpCount,
+ NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs()))));
+
{
Stopwatch CompressOpsTimer;
- CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer(),
- OodleCompressor::Mermaid,
- OodleCompressionLevel::Normal);
+ CompressedOpsSection =
+ CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer(), OodleCompressor::Mermaid, OodleCompressionLevel::Fast);
ReportMessage(OptionalContext,
fmt::format("Compressed oplog section {} ({} -> {}) in {}",
CompressedOpsSection.DecodeRawHash(),
@@ -512,357 +517,706 @@ BuildContainer(CidStore& ChunkStore,
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return {};
}
- if (!Attachments.empty() && !KnownBlocks.empty())
- {
- ReportMessage(OptionalContext, fmt::format("Checking {} known blocks for reuse", KnownBlocks.size()));
- Stopwatch ReuseTimer;
-
- size_t SkippedAttachmentCount = 0;
- for (const Block& KnownBlock : KnownBlocks)
+ auto FindReuseBlocks = [](const std::vector<Block>& KnownBlocks,
+ const std::unordered_set<IoHash, IoHash::Hasher>& Attachments,
+ JobContext* OptionalContext) -> std::vector<size_t> {
+ std::vector<size_t> ReuseBlockIndexes;
+ if (!Attachments.empty() && !KnownBlocks.empty())
{
- size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size();
- if (BlockAttachmentCount == 0)
- {
- continue;
- }
- size_t FoundAttachmentCount = 0;
- for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
+ ReportMessage(
+ OptionalContext,
+ fmt::format("Checking {} Attachments against {} known blocks for reuse", Attachments.size(), KnownBlocks.size()));
+ Stopwatch ReuseTimer;
+
+ for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++)
{
- if (Attachments.contains(KnownHash))
+ const Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size();
+ if (BlockAttachmentCount == 0)
{
- FoundAttachmentCount++;
+ continue;
}
- }
-
- size_t ReusePercent = (FoundAttachmentCount * 100) / BlockAttachmentCount;
- // TODO: Configure reuse-level
- if (ReusePercent > 80)
- {
- ZEN_DEBUG("Reusing block {}. {} attachments found, usage level: {}%",
- KnownBlock.BlockHash,
- FoundAttachmentCount,
- ReusePercent);
+ size_t FoundAttachmentCount = 0;
for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
{
- Attachments.erase(KnownHash);
- SkippedAttachmentCount++;
+ if (Attachments.contains(KnownHash))
+ {
+ FoundAttachmentCount++;
+ }
}
- Blocks.push_back(KnownBlock);
- }
- else if (FoundAttachmentCount > 0)
- {
- ZEN_DEBUG("Skipping block {}. {} attachments found, usage level: {}%",
- KnownBlock.BlockHash,
- FoundAttachmentCount,
- ReusePercent);
+ size_t ReusePercent = (FoundAttachmentCount * 100) / BlockAttachmentCount;
+ // TODO: Configure reuse-level
+ if (ReusePercent > 80)
+ {
+ ZEN_DEBUG("Reusing block {}. {} attachments found, usage level: {}%",
+ KnownBlock.BlockHash,
+ FoundAttachmentCount,
+ ReusePercent);
+ ReuseBlockIndexes.push_back(KnownBlockIndex);
+ }
+ else if (FoundAttachmentCount > 0)
+ {
+ ZEN_DEBUG("Skipping block {}. {} attachments found, usage level: {}%",
+ KnownBlock.BlockHash,
+ FoundAttachmentCount,
+ ReusePercent);
+ }
}
}
- ReportMessage(OptionalContext,
- fmt::format("Reusing {} out of {} known blocks, skipping upload of {} attachments, completed in {}",
- Blocks.size(),
- KnownBlocks.size(),
- SkippedAttachmentCount,
- NiceTimeSpanMs(static_cast<uint64_t>(ReuseTimer.GetElapsedTimeMs()))));
- }
+ return ReuseBlockIndexes;
+ };
- if (IsCancelled(OptionalContext))
+ std::unordered_set<IoHash, IoHash::Hasher> FoundHashes;
+ FoundHashes.reserve(UploadAttachments.size());
+ for (const auto& It : UploadAttachments)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- return {};
+ FoundHashes.insert(It.first);
}
- ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size()));
-
- // Sort attachments so we get predictable blocks for the same oplog upload
- std::vector<IoHash> SortedAttachments;
+ size_t ReusedAttachmentCount = 0;
+ std::vector<size_t> ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext);
+ for (size_t KnownBlockIndex : ReusedBlockIndexes)
{
- SortedAttachments.reserve(Attachments.size());
- for (const auto& It : Attachments)
+ const Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
{
- SortedAttachments.push_back(It.first);
+ if (UploadAttachments.erase(KnownHash) == 1)
+ {
+ ReusedAttachmentCount++;
+ }
}
- std::sort(SortedAttachments.begin(),
- SortedAttachments.end(),
- [&Attachments, &OpLSNToKey](const IoHash& Lhs, const IoHash& Rhs) {
- auto LhsLNSIt = Attachments.find(Lhs);
- ZEN_ASSERT_SLOW(LhsLNSIt != Attachments.end());
- auto RhsLNSIt = Attachments.find(Rhs);
- ZEN_ASSERT_SLOW(RhsLNSIt != Attachments.end());
- if (LhsLNSIt->second == RhsLNSIt->second)
- {
- return Lhs < Rhs;
- }
- auto LhsKeyIt = OpLSNToKey.find(LhsLNSIt->second);
- ZEN_ASSERT_SLOW(LhsKeyIt != OpLSNToKey.end());
- auto RhsKeyIt = OpLSNToKey.find(RhsLNSIt->second);
- ZEN_ASSERT_SLOW(RhsKeyIt != OpLSNToKey.end());
- return LhsKeyIt->second < RhsKeyIt->second;
- });
}
- if (IsCancelled(OptionalContext))
+ struct ChunkedFile
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- return {};
- }
- ReportMessage(OptionalContext,
- fmt::format("Assembling {} attachments from {} ops into blocks and loose attachments",
- SortedAttachments.size(),
- OpLSNToKey.size()));
+ IoBuffer Source;
- for (const IoHash& AttachmentHash : LargeChunkHashes)
- {
- if (IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- return {};
- }
+ ChunkedInfoWithSource Chunked;
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkLoookup;
+ };
+ std::vector<ChunkedFile> ChunkedFiles;
- if (auto It = LooseAttachments.find(AttachmentHash); It != LooseAttachments.end())
- {
- OnLargeAttachment(AttachmentHash, std::move(It->second));
- LooseAttachments.erase(It);
- }
- else
- {
- OnLargeAttachment(AttachmentHash,
- [&ChunkStore](const IoHash& AttachmentHash) { return ChunkStore.FindChunkByCid(AttachmentHash); });
- }
- }
- size_t LargeAttachmentCount = LargeChunkHashes.size();
-
- Latch BlockCreateLatch(1);
- size_t GeneratedBlockCount = 0;
- size_t BlockSize = 0;
- std::vector<SharedBuffer> ChunksInBlock;
- int LastLSNOp = -1;
- auto GetPayload = [&](const IoHash& AttachmentHash) {
- if (auto It = LooseAttachments.find(AttachmentHash); It != LooseAttachments.end())
- {
- IoBuffer Payload = It->second(AttachmentHash);
- LooseAttachments.erase(It);
- return Payload;
- }
- return ChunkStore.FindChunkByCid(AttachmentHash);
+ auto ChunkFile = [AttachmentTempPath](const IoHash& RawHash,
+ IoBuffer& RawData,
+ const IoBufferFileReference& FileRef,
+ JobContext*) -> ChunkedFile {
+ ChunkedFile Chunked;
+ Stopwatch Timer;
+
+ uint64_t Offset = FileRef.FileChunkOffset;
+ uint64_t Size = FileRef.FileChunkSize;
+
+ BasicFile SourceFile;
+ SourceFile.Attach(FileRef.FileHandle);
+ auto __ = MakeGuard([&SourceFile]() { SourceFile.Detach(); });
+
+ Chunked.Chunked = ChunkData(SourceFile, Offset, Size, UShaderByteCodeParams);
+ Chunked.Source = RawData;
+
+ ZEN_INFO("Chunked large attachment '{}' {} into {} chunks in {}",
+ RawHash,
+ NiceBytes(Chunked.Chunked.Info.RawSize),
+ Chunked.Chunked.Info.ChunkHashes.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+
+ return Chunked;
};
- uint32_t ResolvedLargeCount = 0;
- uint32_t ResolvedSmallCount = 0;
- uint32_t ResolvedFailedCount = 0;
- uint32_t ComposedBlocks = 0;
+ RwLock ResolveLock;
+ std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes;
+ std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
+ std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments;
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> LooseUploadAttachments;
+ std::unordered_set<IoHash, IoHash::Hasher> MissingHashes;
- uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs();
+ ReportMessage(OptionalContext, fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount));
- for (auto HashIt = SortedAttachments.begin(); HashIt != SortedAttachments.end(); HashIt++)
+ Latch ResolveAttachmentsLatch(1);
+ for (auto& It : UploadAttachments)
{
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- break;
- }
- if ((ResolvedLargeCount + ResolvedSmallCount) % 1000 == 0)
- {
- ReportProgress(OptionalContext,
- fmt::format("Resolving attachments: {} large, {} small, {} blocks assembled",
- ResolvedLargeCount,
- ResolvedSmallCount,
- ComposedBlocks),
- SortedAttachments.size(),
- SortedAttachments.size() - (ResolvedLargeCount + ResolvedSmallCount + ResolvedFailedCount));
- }
- const IoHash& AttachmentHash(*HashIt);
- if (LargeChunkHashes.contains(AttachmentHash))
- {
- ResolvedLargeCount++;
- continue;
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
}
- IoBuffer Payload = GetPayload(AttachmentHash);
- if (!Payload)
- {
- auto It = Attachments.find(AttachmentHash);
- ZEN_ASSERT(It != Attachments.end());
- std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second);
- ZEN_ASSERT(Op.has_value());
- ExtendableStringBuilder<1024> Sb;
- Sb.Append("Failed to find attachment '");
- Sb.Append(AttachmentHash.ToHexString());
- Sb.Append("' for op: \n");
- Op.value().ToJson(Sb);
- ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView()));
-
- if (IgnoreMissingAttachments)
- {
- ResolvedFailedCount++;
- continue;
- }
- else
+ ResolveAttachmentsLatch.AddCount(1);
+
+ WorkerPool.ScheduleWork([&ChunkStore,
+ UploadAttachment = &It.second,
+ RawHash = It.first,
+ &ResolveAttachmentsLatch,
+ &ResolveLock,
+ &ChunkedHashes,
+ &LargeChunkHashes,
+ &ChunkedUploadAttachments,
+ &LooseUploadAttachments,
+ &MissingHashes,
+ &OnLargeAttachment,
+ &AttachmentTempPath,
+ &ChunkFile,
+ &ChunkedFiles,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ &RemoteResult,
+ OptionalContext]() {
+ auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); });
+ try
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
- BlockCreateLatch.CountDown();
- while (!BlockCreateLatch.Wait(1000))
+ if (IsCancelled(OptionalContext))
{
- ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
+ return;
+ }
+
+ if (!UploadAttachment->RawPath.empty())
+ {
+ const std::filesystem::path& FilePath = UploadAttachment->RawPath;
+ IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath);
+ if (RawData)
+ {
+ if (RawData.GetSize() > ChunkFileSizeLimit)
+ {
+ IoBufferFileReference FileRef;
+ (void)RawData.GetFileReference(FileRef);
+
+ ChunkedFile Chunked = ChunkFile(RawHash, RawData, FileRef, OptionalContext);
+ ResolveLock.WithExclusiveLock(
+ [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() {
+ ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size());
+ ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size());
+ for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
+ {
+ ChunkedHashes.insert(ChunkHash);
+ }
+ ChunkedFiles.emplace_back(std::move(Chunked));
+ });
+ }
+ else if (RawData.GetSize() > (MaxChunkEmbedSize * 2))
+ {
+ // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't
+ // it will be a loose attachment instead of going into a block
+ OnLargeAttachment(RawHash, [RawData = std::move(RawData), AttachmentTempPath](const IoHash& RawHash) {
+ size_t RawSize = RawData.GetSize();
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::VeryFast);
+
+ std::filesystem::path AttachmentPath = AttachmentTempPath;
+ AttachmentPath.append(RawHash.ToHexString());
+
+ IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed), AttachmentPath);
+ ZEN_INFO("Saved temp attachment to '{}', {} ({})",
+ AttachmentPath,
+ NiceBytes(RawSize),
+ NiceBytes(TempAttachmentBuffer.GetSize()));
+ return TempAttachmentBuffer;
+ });
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ }
+ else
+ {
+ size_t RawSize = RawData.GetSize();
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::VeryFast);
+
+ std::filesystem::path AttachmentPath = AttachmentTempPath;
+ AttachmentPath.append(RawHash.ToHexString());
+
+ IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed), AttachmentPath);
+ ZEN_INFO("Saved temp attachment to '{}', {} ({})",
+ AttachmentPath,
+ NiceBytes(RawSize),
+ NiceBytes(TempAttachmentBuffer.GetSize()));
+
+ if (Compressed.GetCompressedSize() > MaxChunkEmbedSize)
+ {
+ OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; });
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ }
+ else
+ {
+ UploadAttachment->Size = Compressed.GetCompressedSize();
+ ResolveLock.WithExclusiveLock(
+ [RawHash, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() {
+ LooseUploadAttachments.insert_or_assign(RawHash, std::move(Data));
+ });
+ }
+ }
+ }
+ else
+ {
+ ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
+ }
+ }
+ else
+ {
+ IoBuffer Data = ChunkStore.FindChunkByCid(RawHash);
+ if (Data)
+ {
+ auto GetForChunking =
+ [](size_t ChunkFileSizeLimit, const IoBuffer& Data, IoBufferFileReference& OutFileRef) -> bool {
+ if (Data.IsWholeFile())
+ {
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize);
+ if (Compressed)
+ {
+ if (VerifyRawSize > ChunkFileSizeLimit)
+ {
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize;
+ if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ {
+ if (CompressionLevel == OodleCompressionLevel::None)
+ {
+ CompositeBuffer Decompressed = Compressed.DecompressToComposite();
+ if (Decompressed)
+ {
+ std::span<const SharedBuffer> Segments = Decompressed.GetSegments();
+ if (Segments.size() == 1)
+ {
+ IoBuffer DecompressedData = Segments[0].AsIoBuffer();
+ if (DecompressedData.GetFileReference(OutFileRef))
+ {
+ return true;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ return false;
+ };
+
+ IoBufferFileReference FileRef;
+ if (GetForChunking(ChunkFileSizeLimit, Data, FileRef))
+ {
+ ChunkedFile Chunked = ChunkFile(RawHash, Data, FileRef, OptionalContext);
+ ResolveLock.WithExclusiveLock(
+ [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() {
+ ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size());
+ ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size());
+ for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
+ {
+ ChunkedHashes.insert(ChunkHash);
+ }
+ ChunkedFiles.emplace_back(std::move(Chunked));
+ });
+ }
+ else if (Data.GetSize() > MaxChunkEmbedSize)
+ {
+ OnLargeAttachment(RawHash,
+ [&ChunkStore](const IoHash& RawHash) { return ChunkStore.FindChunkByCid(RawHash); });
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ }
+ else
+ {
+ UploadAttachment->Size = Data.GetSize();
+ }
+ }
+ else
+ {
+ ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
+ }
}
- return {};
}
- }
+ catch (std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to resolve attachment {}", RawHash),
+ Ex.what());
+ }
+ });
+ }
+ ResolveAttachmentsLatch.CountDown();
- uint64_t PayloadSize = Payload.GetSize();
- if (PayloadSize > MaxChunkEmbedSize)
+ while (!ResolveAttachmentsLatch.Wait(1000))
+ {
+ ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining();
+ if (IsCancelled(OptionalContext))
{
- if (LargeChunkHashes.insert(AttachmentHash).second)
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ while (!ResolveAttachmentsLatch.Wait(1000))
{
- OnLargeAttachment(AttachmentHash, [Payload = std::move(Payload)](const IoHash&) { return std::move(Payload); });
- LargeAttachmentCount++;
+ Remaining = ResolveAttachmentsLatch.Remaining();
+ ReportProgress(OptionalContext,
+ fmt::format("Aborting, {} attachments remaining...", Remaining),
+ UploadAttachments.size(),
+ Remaining);
}
- ResolvedLargeCount++;
- continue;
+ ReportProgress(OptionalContext, fmt::format("Resolving attachments, {} remaining...", 0), UploadAttachments.size(), 0);
+ return {};
}
- else
+ ReportProgress(OptionalContext,
+ fmt::format("Resolving attachments, {} remaining...", Remaining),
+ UploadAttachments.size(),
+ Remaining);
+ }
+ if (UploadAttachments.size() > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Resolving attachments, {} remaining...", 0), UploadAttachments.size(), 0);
+ }
+
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
+
+ for (const IoHash& AttachmentHash : MissingHashes)
+ {
+ auto It = UploadAttachments.find(AttachmentHash);
+ ZEN_ASSERT(It != UploadAttachments.end());
+ std::optional<CbObject> Op = Oplog.GetOpByKey(It->second.Key);
+ ZEN_ASSERT(Op.has_value());
+ ExtendableStringBuilder<1024> Sb;
+ Sb.Append("Failed to find attachment '");
+ Sb.Append(AttachmentHash.ToHexString());
+ Sb.Append("' for op: \n");
+ Op.value().ToJson(Sb);
+
+ if (IgnoreMissingAttachments)
{
- ResolvedSmallCount++;
+ ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView()));
}
-
- if (!BlockAttachmentHashes.insert(AttachmentHash).second)
+ else
{
- continue;
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
+ return {};
}
+ UploadAttachments.erase(AttachmentHash);
+ }
- auto It = Attachments.find(AttachmentHash);
- const int CurrentOpLSN = It->second;
+ for (const auto& It : ChunkedUploadAttachments)
+ {
+ UploadAttachments.erase(It.first);
+ }
+ for (const auto& It : LargeChunkHashes)
+ {
+ UploadAttachments.erase(It);
+ }
- BlockSize += PayloadSize;
- if (BuildBlocks)
+ std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext);
+ for (size_t KnownBlockIndex : ReusedBlockIndexes)
+ {
+ const Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
{
- ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload)));
+ if (ChunkedHashes.erase(KnownHash) == 1)
+ {
+ ReusedAttachmentCount++;
+ }
}
- else
+ }
+
+ ReusedBlockIndexes.insert(ReusedBlockIndexes.end(), ReusedBlockFromChunking.begin(), ReusedBlockFromChunking.end());
+ std::sort(ReusedBlockIndexes.begin(), ReusedBlockIndexes.end());
+ auto UniqueKnownBlocksEnd = std::unique(ReusedBlockIndexes.begin(), ReusedBlockIndexes.end());
+ size_t ReuseBlockCount = std::distance(ReusedBlockIndexes.begin(), UniqueKnownBlocksEnd);
+ if (ReuseBlockCount > 0)
+ {
+ Blocks.reserve(ReuseBlockCount);
+ for (auto It = ReusedBlockIndexes.begin(); It != UniqueKnownBlocksEnd; It++)
{
- Payload = {};
+ Blocks.push_back(KnownBlocks[*It]);
}
+ ReportMessage(OptionalContext, fmt::format("Reused {} attachments from {} blocks", ReusedAttachmentCount, ReuseBlockCount));
+ }
+
+ std::vector<std::pair<IoHash, Oid>> SortedUploadAttachments;
+ SortedUploadAttachments.reserve(UploadAttachments.size());
+ for (const auto& It : UploadAttachments)
+ {
+ SortedUploadAttachments.push_back(std::make_pair(It.first, It.second.Key));
+ }
+
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
+
+ ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", SortedUploadAttachments.size(), TotalOpCount));
+
+ // Sort attachments so we get predictable blocks for the same oplog upload
+ std::sort(SortedUploadAttachments.begin(),
+ SortedUploadAttachments.end(),
+ [](const std::pair<IoHash, Oid>& Lhs, const std::pair<IoHash, Oid>& Rhs) {
+ if (Lhs.second == Rhs.second)
+ {
+ // Same key, sort by raw hash
+ return Lhs.first < Rhs.first;
+ }
+ // Sort by key
+ return Lhs.second < Rhs.second;
+ });
+
+ std::vector<size_t> ChunkedFilesOrder;
+ ChunkedFilesOrder.reserve(ChunkedFiles.size());
+ for (size_t Index = 0; Index < ChunkedFiles.size(); Index++)
+ {
+ ChunkedFilesOrder.push_back(Index);
+ }
+ std::sort(ChunkedFilesOrder.begin(), ChunkedFilesOrder.end(), [&ChunkedFiles](size_t Lhs, size_t Rhs) {
+ return ChunkedFiles[Lhs].Chunked.Info.RawHash < ChunkedFiles[Rhs].Chunked.Info.RawHash;
+ });
+
+ // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded
+ // ChunkedHashes contains all chunked up chunks to be composed into blocks
+
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
+ ReportMessage(OptionalContext,
+ fmt::format("Assembling {} attachments and {} chunked parts from {} ops into blocks and loose attachments",
+ SortedUploadAttachments.size(),
+ ChunkedHashes.size(),
+ TotalOpCount));
+
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
+
+ // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded
+ // ChunkedHashes contains all chunked up chunks to be composed into blocks
+
+ size_t ChunkAssembleCount = SortedUploadAttachments.size() + ChunkedHashes.size();
+ size_t ChunksAssembled = 0;
+ ReportMessage(OptionalContext, fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount));
- if (BlockSize >= MaxBlockSize && (CurrentOpLSN != LastLSNOp))
+ Latch BlockCreateLatch(1);
+ size_t GeneratedBlockCount = 0;
+ size_t BlockSize = 0;
+ std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock;
+
+ Oid LastOpKey = Oid::Zero;
+ uint32_t ComposedBlocks = 0;
+
+ uint64_t CreateBlocksStartMS = Timer.GetElapsedTimeMs();
+ try
+ {
+ uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs();
+ std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
+ auto NewBlock = [&]() {
+ size_t BlockIndex = AddBlock(BlocksLock, Blocks);
+ size_t ChunkCount = ChunksInBlock.size();
+ if (BuildBlocks)
+ {
+ CreateBlock(WorkerPool,
+ BlockCreateLatch,
+ std::move(ChunksInBlock),
+ BlocksLock,
+ Blocks,
+ BlockIndex,
+ AsyncOnBlock,
+ RemoteResult);
+ ComposedBlocks++;
+ }
+ else
+ {
+ ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size());
+ OnBlockChunks(std::move(ChunksInBlock));
+ }
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope _(BlocksLock);
+ Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
+ BlockAttachmentHashes.begin(),
+ BlockAttachmentHashes.end());
+ }
+ uint64_t NowMS = Timer.GetElapsedTimeMs();
+ ZEN_INFO("Assembled block {} with {} chunks in {} ({})",
+ BlockIndex,
+ ChunkCount,
+ NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS),
+ NiceBytes(BlockSize));
+ FetchAttachmentsStartMS = NowMS;
+ BlockAttachmentHashes.clear();
+ ChunksInBlock.clear();
+ BlockSize = 0;
+ GeneratedBlockCount++;
+ };
+
+ for (auto HashIt = SortedUploadAttachments.begin(); HashIt != SortedUploadAttachments.end(); HashIt++)
{
- size_t BlockIndex = AddBlock(BlocksLock, Blocks);
- size_t ChunkCount = ChunksInBlock.size();
- if (BuildBlocks)
+ if (IsCancelled(OptionalContext))
{
- CreateBlock(WorkerPool,
- BlockCreateLatch,
- std::move(ChunksInBlock),
- BlocksLock,
- Blocks,
- BlockIndex,
- AsyncOnBlock,
- RemoteResult);
- ComposedBlocks++;
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ break;
}
- else
+ if (ChunksAssembled % 1000 == 0)
{
- ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size());
- OnBlockChunks(BlockAttachmentHashes);
+ ReportProgress(
+ OptionalContext,
+ fmt::format("Assembling blocks: {} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
+ ChunkAssembleCount,
+ ChunkAssembleCount - ChunksAssembled);
}
+ const IoHash& RawHash(HashIt->first);
+ const Oid CurrentOpKey = HashIt->second;
+ const IoHash& AttachmentHash(HashIt->first);
+ auto InfoIt = UploadAttachments.find(RawHash);
+ ZEN_ASSERT(InfoIt != UploadAttachments.end());
+ uint64_t PayloadSize = InfoIt->second.Size;
+
+ if (BlockAttachmentHashes.insert(AttachmentHash).second)
{
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope _(BlocksLock);
- Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
- BlockAttachmentHashes.begin(),
- BlockAttachmentHashes.end());
+ if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end())
+ {
+ ChunksInBlock.emplace_back(std::make_pair(RawHash, [IoBuffer = SharedBuffer(It->second)](const IoHash&) {
+ return CompositeBuffer(IoBuffer);
+ }));
+ LooseUploadAttachments.erase(It);
+ }
+ else
+ {
+ ChunksInBlock.emplace_back(std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) {
+ return CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash)));
+ }));
+ }
+ BlockSize += PayloadSize;
+
+ if (BlockSize >= MaxBlockSize && (CurrentOpKey != LastOpKey))
+ {
+ NewBlock();
+ }
+ LastOpKey = CurrentOpKey;
+ ChunksAssembled++;
}
- uint64_t NowMS = Timer.GetElapsedTimeMs();
- ZEN_INFO("Assembled block {} with {} chunks in {} ({})",
- BlockIndex,
- ChunkCount,
- NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS),
- NiceBytes(BlockSize));
- FetchAttachmentsStartMS = NowMS;
- BlockAttachmentHashes.clear();
- ChunksInBlock.clear();
- BlockSize = 0;
- GeneratedBlockCount++;
}
- LastLSNOp = CurrentOpLSN;
- }
-
- if (BlockSize > 0)
- {
- if (!IsCancelled(OptionalContext))
+ if (!RemoteResult.IsError())
{
- size_t BlockIndex = AddBlock(BlocksLock, Blocks);
- size_t ChunkCount = ChunksInBlock.size();
- if (BuildBlocks)
+ // Keep the chunked files as separate blocks to make the blocks generated
+ // more consistent
+ if (BlockSize > 0)
{
- CreateBlock(WorkerPool,
- BlockCreateLatch,
- std::move(ChunksInBlock),
- BlocksLock,
- Blocks,
- BlockIndex,
- AsyncOnBlock,
- RemoteResult);
- ComposedBlocks++;
+ NewBlock();
}
- else
+
+ for (size_t ChunkedFileIndex : ChunkedFilesOrder)
{
- ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size());
- OnBlockChunks(BlockAttachmentHashes);
+ const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex];
+ const ChunkedInfoWithSource& Chunked = ChunkedFile.Chunked;
+ size_t ChunkCount = Chunked.Info.ChunkHashes.size();
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++)
+ {
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ break;
+ }
+ if (ChunksAssembled % 1000 == 0)
+ {
+ ReportProgress(OptionalContext,
+ fmt::format("Assembling blocks: {} attachments processed, {} blocks assembled",
+ ChunksAssembled,
+ ComposedBlocks),
+ ChunkAssembleCount,
+ ChunkAssembleCount - ChunksAssembled);
+ }
+ const IoHash& ChunkHash = ChunkedFile.Chunked.Info.ChunkHashes[ChunkIndex];
+ if (auto FindIt = ChunkedHashes.find(ChunkHash); FindIt != ChunkedHashes.end())
+ {
+ if (BlockAttachmentHashes.insert(ChunkHash).second)
+ {
+ const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex];
+ ChunksInBlock.emplace_back(std::make_pair(
+ ChunkHash,
+ [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](const IoHash&) {
+ return CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::None)
+ .GetCompressed();
+ }));
+ BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size;
+ if (BuildBlocks)
+ {
+ if (BlockSize >= MaxBlockSize)
+ {
+ NewBlock();
+ }
+ }
+ ChunksAssembled++;
+ }
+ ChunkedHashes.erase(FindIt);
+ }
+ }
}
+ }
+
+ if (BlockSize > 0 && !RemoteResult.IsError())
+ {
+ if (!IsCancelled(OptionalContext))
{
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope _(BlocksLock);
- Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
- BlockAttachmentHashes.begin(),
- BlockAttachmentHashes.end());
+ NewBlock();
}
- uint64_t NowMS = Timer.GetElapsedTimeMs();
- ZEN_INFO("Assembled block {} with {} chunks in {} ({})",
- BlockIndex,
- ChunkCount,
- NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS),
- NiceBytes(BlockSize));
- FetchAttachmentsStartMS = NowMS;
-
- BlockAttachmentHashes.clear();
- ChunksInBlock.clear();
- BlockSize = 0;
- GeneratedBlockCount++;
}
- }
- ReportProgress(OptionalContext,
- fmt::format("Resolving attachments: {} large, {} small, {} blocks assembled",
- ResolvedLargeCount,
- ResolvedSmallCount,
- ComposedBlocks),
- SortedAttachments.size(),
- 0);
- ReportMessage(OptionalContext,
- fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}",
- SortedAttachments.size(),
- OpLSNToKey.size(),
- GeneratedBlockCount,
- LargeAttachmentCount,
- NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs()))));
+ ReportProgress(OptionalContext,
+ fmt::format("Assembling blocks: {} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
+ ChunkAssembleCount,
+ 0);
- if (IsCancelled(OptionalContext))
+ ReportMessage(OptionalContext,
+ fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and in {}",
+ ChunkAssembleCount,
+ TotalOpCount,
+ GeneratedBlockCount,
+ NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs()))));
+
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ptrdiff_t Remaining = BlockCreateLatch.Remaining();
+ ReportProgress(OptionalContext,
+ fmt::format("Aborting, {} blocks remaining...", Remaining),
+ GeneratedBlockCount,
+ Remaining);
+ }
+ if (GeneratedBlockCount > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", 0), GeneratedBlockCount, 0);
+ }
+ return {};
+ }
+ }
+ catch (std::exception& Ex)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
BlockCreateLatch.CountDown();
while (!BlockCreateLatch.Wait(1000))
{
- ptrdiff_t Remaining = BlockCreateLatch.Remaining();
- ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", Remaining), GeneratedBlockCount, Remaining);
}
- if (GeneratedBlockCount > 0)
- {
- ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", 0), GeneratedBlockCount, 0);
- }
- return {};
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), "Block creation failed", Ex.what());
+ throw;
}
BlockCreateLatch.CountDown();
@@ -872,6 +1226,7 @@ BuildContainer(CidStore& ChunkStore,
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
while (!BlockCreateLatch.Wait(1000))
{
Remaining = BlockCreateLatch.Remaining();
@@ -885,9 +1240,13 @@ BuildContainer(CidStore& ChunkStore,
}
ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", Remaining), GeneratedBlockCount, Remaining);
}
+
if (GeneratedBlockCount > 0)
{
+ uint64_t NowMS = Timer.GetElapsedTimeMs();
ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0);
+ ReportMessage(OptionalContext,
+ fmt::format("Created {} blocks in {}", GeneratedBlockCount, NiceTimeSpanMs(NowMS - CreateBlocksStartMS)));
}
if (!RemoteResult.IsError())
@@ -937,6 +1296,35 @@ BuildContainer(CidStore& ChunkStore,
}
}
OplogContinerWriter.EndArray(); // "blocks"sv
+ OplogContinerWriter.BeginArray("chunkedfiles"sv);
+ {
+ for (const ChunkedFile& F : ChunkedFiles)
+ {
+ OplogContinerWriter.BeginObject();
+ {
+ OplogContinerWriter.AddHash("rawhash"sv, F.Chunked.Info.RawHash);
+ OplogContinerWriter.AddInteger("rawsize"sv, F.Chunked.Info.RawSize);
+ OplogContinerWriter.BeginArray("chunks"sv);
+ {
+ for (const IoHash& RawHash : F.Chunked.Info.ChunkHashes)
+ {
+ OplogContinerWriter.AddHash(RawHash);
+ }
+ }
+ OplogContinerWriter.EndArray(); // "chunks"
+ OplogContinerWriter.BeginArray("sequence"sv);
+ {
+ for (uint32_t ChunkIndex : F.Chunked.Info.ChunkSequence)
+ {
+ OplogContinerWriter.AddInteger(ChunkIndex);
+ }
+ }
+ OplogContinerWriter.EndArray(); // "sequence"
+ }
+ OplogContinerWriter.EndObject();
+ }
+ }
+ OplogContinerWriter.EndArray(); // "chunkedfiles"sv
OplogContinerWriter.BeginArray("chunks"sv);
{
@@ -959,11 +1347,12 @@ BuildContainer(CidStore& ChunkStore,
ProjectStore::Oplog& Oplog,
size_t MaxBlockSize,
size_t MaxChunkEmbedSize,
+ size_t ChunkFileSizeLimit,
bool BuildBlocks,
bool IgnoreMissingAttachments,
const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
- const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
+ const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles)
{
WorkerThreadPool& WorkerPool = GetSmallWorkerPool();
@@ -974,6 +1363,7 @@ BuildContainer(CidStore& ChunkStore,
Oplog,
MaxBlockSize,
MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
BuildBlocks,
IgnoreMissingAttachments,
{},
@@ -1001,7 +1391,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments,
- const std::vector<std::vector<IoHash>>& BlockChunks,
+ const std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>>& BlockChunks,
const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks,
const tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LooseFileAttachments,
const std::unordered_set<IoHash, IoHash::Hasher>& Needs,
@@ -1019,11 +1409,12 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
ReportMessage(OptionalContext, "Filtering needed attachments for upload...");
- std::unordered_set<IoHash, IoHash::Hasher> AttachmentsToUpload;
+ std::unordered_set<IoHash, IoHash::Hasher> AttachmentsToUpload;
+ std::unordered_map<IoHash, FetchChunkFunc, IoHash::Hasher> BulkBlockAttachmentsToUpload;
- size_t BlockAttachmentCountToUpload = 0;
- size_t LargeAttachmentCountToUpload = 0;
- std::atomic<ptrdiff_t> BulkAttachmentCountToUpload = 0;
+ size_t BlockAttachmentCountToUpload = 0;
+ size_t LargeAttachmentCountToUpload = 0;
+ size_t BulkAttachmentCountToUpload = 0;
AttachmentsToUpload.reserve(ForceAll ? CreatedBlocks.size() + LargeAttachments.size() : Needs.size());
for (const auto& CreatedBlock : CreatedBlocks)
@@ -1042,39 +1433,19 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
LargeAttachmentCountToUpload++;
}
}
- for (const std::vector<IoHash>& BlockHashes : BlockChunks)
+ for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& BlockHashes : BlockChunks)
{
- if (ForceAll)
- {
- AttachmentsToUpload.insert(BlockHashes.begin(), BlockHashes.end());
- BulkAttachmentCountToUpload += BlockHashes.size();
- continue;
- }
- for (const IoHash& Hash : BlockHashes)
+ for (const std::pair<IoHash, FetchChunkFunc>& Chunk : BlockHashes)
{
- if (Needs.contains(Hash))
+ if (ForceAll || Needs.contains(Chunk.first))
{
- AttachmentsToUpload.insert(Hash);
+ BulkBlockAttachmentsToUpload.insert(std::make_pair(Chunk.first, Chunk.second));
BulkAttachmentCountToUpload++;
}
}
}
- for (const IoHash& Needed : Needs)
- {
- if (!AttachmentsToUpload.contains(Needed))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- "Invalid attachment",
- fmt::format("Upload requested an unknown attachment '{}'", Needed));
- ReportMessage(
- OptionalContext,
- fmt::format("Failed to upload attachment '{}'. ({}): {}", Needed, RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return;
- }
- }
-
- if (AttachmentsToUpload.empty())
+ if (AttachmentsToUpload.empty() && BulkBlockAttachmentsToUpload.empty())
{
ReportMessage(OptionalContext, "No attachments needed");
return;
@@ -1085,30 +1456,29 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
if (!RemoteResult.IsError())
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
}
return;
}
ReportMessage(OptionalContext,
fmt::format("Saving {} attachments ({} blocks, {} attachments, {} bulk attachments)",
- AttachmentsToUpload.size(),
+ AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(),
BlockAttachmentCountToUpload,
LargeAttachmentCountToUpload,
- BulkAttachmentCountToUpload.load()));
+ BulkAttachmentCountToUpload));
+
+ Stopwatch Timer;
ptrdiff_t AttachmentsToSave(0);
Latch SaveAttachmentsLatch(1);
- for (const IoHash& RawHash : LargeAttachments)
+ for (const IoHash& RawHash : AttachmentsToUpload)
{
if (RemoteResult.IsError())
{
break;
}
- if (!AttachmentsToUpload.contains(RawHash))
- {
- continue;
- }
SaveAttachmentsLatch.AddCount(1);
AttachmentsToSave++;
@@ -1126,10 +1496,12 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
{
return;
}
+ bool IsBlock = false;
IoBuffer Payload;
if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
{
Payload = BlockIt->second;
+ IsBlock = true;
}
else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
{
@@ -1161,76 +1533,24 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
RemoteResult.GetErrorReason()));
return;
}
- Info.AttachmentsUploaded.fetch_add(1);
- Info.AttachmentBytesUploaded.fetch_add(PayloadSize);
- ZEN_INFO("Saved large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(PayloadSize));
- return;
- });
- }
-
- if (IsCancelled(OptionalContext))
- {
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- }
- return;
- }
-
- for (auto& It : CreatedBlocks)
- {
- if (RemoteResult.IsError())
- {
- break;
- }
- const IoHash& RawHash = It.first;
- if (!AttachmentsToUpload.contains(RawHash))
- {
- continue;
- }
- IoBuffer Payload = It.second;
- ZEN_ASSERT(Payload);
- SaveAttachmentsLatch.AddCount(1);
- AttachmentsToSave++;
- WorkerPool.ScheduleWork([&ChunkStore,
- &RemoteStore,
- &SaveAttachmentsLatch,
- &RemoteResult,
- Payload = std::move(Payload),
- RawHash,
- &Info,
- OptionalContext]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
+ if (IsBlock)
{
- return;
+ Info.AttachmentBlocksUploaded.fetch_add(1);
+ Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize);
+ ZEN_INFO("Saved block attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(PayloadSize));
}
-
- size_t PayloadSize = Payload.GetSize();
- RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash);
- if (Result.ErrorCode)
+ else
{
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachment '{}', {} ({}): {}",
- RawHash,
- NiceBytes(PayloadSize),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- return;
+ Info.AttachmentsUploaded.fetch_add(1);
+ Info.AttachmentBytesUploaded.fetch_add(PayloadSize);
+ ZEN_INFO("Saved large attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(PayloadSize));
}
-
- Info.AttachmentBlocksUploaded.fetch_add(1);
- Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize);
-
- ZEN_INFO("Saved block attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(PayloadSize));
return;
});
}
@@ -1240,80 +1560,85 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
if (!RemoteResult.IsError())
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
}
return;
}
- for (const std::vector<IoHash>& Chunks : BlockChunks)
+ if (!BulkBlockAttachmentsToUpload.empty())
{
- if (RemoteResult.IsError())
- {
- break;
- }
-
- std::vector<IoHash> NeededChunks;
- NeededChunks.reserve(Chunks.size());
- for (const IoHash& Chunk : Chunks)
+ for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& Chunks : BlockChunks)
{
- if (AttachmentsToUpload.contains(Chunk))
+ if (RemoteResult.IsError())
{
- NeededChunks.push_back(Chunk);
+ break;
}
- }
- if (NeededChunks.empty())
- {
- continue;
- }
- SaveAttachmentsLatch.AddCount(1);
- AttachmentsToSave++;
- WorkerPool.ScheduleWork([&RemoteStore,
- &ChunkStore,
- &SaveAttachmentsLatch,
- &RemoteResult,
- &Chunks,
- NeededChunks = std::move(NeededChunks),
- &BulkAttachmentCountToUpload,
- &Info,
- OptionalContext]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- size_t ChunksSize = 0;
- std::vector<SharedBuffer> ChunkBuffers;
- ChunkBuffers.reserve(NeededChunks.size());
- for (const IoHash& Chunk : NeededChunks)
+ std::vector<IoHash> NeededChunks;
+ NeededChunks.reserve(Chunks.size());
+ for (const std::pair<IoHash, FetchChunkFunc>& Chunk : Chunks)
{
- IoBuffer ChunkPayload = ChunkStore.FindChunkByCid(Chunk);
- if (!ChunkPayload)
+ const IoHash& ChunkHash = Chunk.first;
+ if (BulkBlockAttachmentsToUpload.contains(ChunkHash) && !AttachmentsToUpload.contains(ChunkHash))
{
- RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
- fmt::format("Missing chunk {}"sv, Chunk),
- fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
- ChunkBuffers.clear();
- break;
+ NeededChunks.push_back(Chunk.first);
}
- ChunksSize += ChunkPayload.GetSize();
- ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload)));
}
- RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
- if (Result.ErrorCode)
+ if (NeededChunks.empty())
{
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachments with {} chunks ({}): {}",
- Chunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- return;
+ continue;
}
- Info.AttachmentsUploaded.fetch_add(NeededChunks.size());
- Info.AttachmentBytesUploaded.fetch_add(ChunksSize);
-
- ZEN_INFO("Saved {} bulk attachments in {} ({})",
- Chunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(ChunksSize));
- BulkAttachmentCountToUpload.fetch_sub(Chunks.size());
- });
+
+ SaveAttachmentsLatch.AddCount(1);
+ AttachmentsToSave++;
+ WorkerPool.ScheduleWork([&RemoteStore,
+ &ChunkStore,
+ &SaveAttachmentsLatch,
+ &RemoteResult,
+ NeededChunks = std::move(NeededChunks),
+ &BulkBlockAttachmentsToUpload,
+ &Info,
+ OptionalContext]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ size_t ChunksSize = 0;
+ std::vector<SharedBuffer> ChunkBuffers;
+ ChunkBuffers.reserve(NeededChunks.size());
+ for (const IoHash& Chunk : NeededChunks)
+ {
+ auto It = BulkBlockAttachmentsToUpload.find(Chunk);
+ ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end());
+ CompositeBuffer ChunkPayload = It->second(It->first);
+ if (!ChunkPayload)
+ {
+ RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
+ fmt::format("Missing chunk {}"sv, Chunk),
+ fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
+ ChunkBuffers.clear();
+ break;
+ }
+ ChunksSize += ChunkPayload.GetSize();
+ ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer()));
+ }
+ RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to save attachments with {} chunks ({}): {}",
+ NeededChunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ return;
+ }
+ Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size());
+ Info.AttachmentBytesUploaded.fetch_add(ChunksSize);
+
+ ZEN_INFO("Saved {} bulk attachments in {} ({})",
+ NeededChunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(ChunksSize));
+ });
+ }
}
SaveAttachmentsLatch.CountDown();
@@ -1325,18 +1650,22 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
if (!RemoteResult.IsError())
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
}
}
- ReportProgress(
- OptionalContext,
- fmt::format("Saving attachments, {} remaining...", BlockChunks.empty() ? Remaining : BulkAttachmentCountToUpload.load()),
- AttachmentsToSave,
- Remaining);
+ ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", Remaining), AttachmentsToSave, Remaining);
}
if (AttachmentsToSave > 0)
{
ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0);
}
+ ReportMessage(OptionalContext,
+ fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {}",
+ AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(),
+ BlockAttachmentCountToUpload,
+ LargeAttachmentCountToUpload,
+ BulkAttachmentCountToUpload,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs())));
}
RemoteProjectStore::Result
@@ -1346,6 +1675,7 @@ SaveOplog(CidStore& ChunkStore,
ProjectStore::Oplog& Oplog,
size_t MaxBlockSize,
size_t MaxChunkEmbedSize,
+ size_t ChunkFileSizeLimit,
bool EmbedLooseFiles,
bool ForceUpload,
bool IgnoreMissingAttachments,
@@ -1409,8 +1739,8 @@ SaveOplog(CidStore& ChunkStore,
ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()));
};
- std::vector<std::vector<IoHash>> BlockChunks;
- auto OnBlockChunks = [&BlockChunks](const std::unordered_set<IoHash, IoHash::Hasher>& Chunks) {
+ std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>> BlockChunks;
+ auto OnBlockChunks = [&BlockChunks](std::vector<std::pair<IoHash, FetchChunkFunc>>&& Chunks) {
BlockChunks.push_back({Chunks.begin(), Chunks.end()});
ZEN_DEBUG("Found {} block chunks", Chunks.size());
};
@@ -1480,11 +1810,16 @@ SaveOplog(CidStore& ChunkStore,
}
}
+ // TODO: We need to check if remote store actually *has* all KnownBlocks
+ // We can't reconstruct known blocks on demand as they may contain chunks that we don't have
+ // and we don't care about :(
+
CbObject OplogContainerObject = BuildContainer(ChunkStore,
Project,
Oplog,
MaxBlockSize,
MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
RemoteStoreInfo.CreateBlocks,
IgnoreMissingAttachments,
KnownBlocks,
@@ -1504,6 +1839,7 @@ SaveOplog(CidStore& ChunkStore,
RemoteProjectStore::Result Result = {.ErrorCode = 0,
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
.Text = "Operation cancelled"};
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return Result;
}
@@ -1550,6 +1886,7 @@ SaveOplog(CidStore& ChunkStore,
RemoteProjectStore::Result Result = {.ErrorCode = 0,
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
.Text = "Operation cancelled"};
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Text));
return Result;
}
@@ -1629,6 +1966,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
const std::function<bool(const IoHash& RawHash)>& HasAttachment,
const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
JobContext* OptionalContext)
{
using namespace std::literals;
@@ -1657,10 +1995,11 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment();
CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView();
+
+ std::vector<IoHash> NeededChunks;
+ NeededChunks.reserve(ChunksArray.Num());
if (BlockHash == IoHash::Zero)
{
- std::vector<IoHash> NeededChunks;
- NeededChunks.reserve(ChunksArray.GetSize());
for (CbFieldView ChunkField : ChunksArray)
{
IoHash ChunkHash = ChunkField.AsBinaryAttachment();
@@ -1670,27 +2009,55 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
}
NeededChunks.emplace_back(ChunkHash);
}
-
- if (!NeededChunks.empty())
+ }
+ else
+ {
+ for (CbFieldView ChunkField : ChunksArray)
{
- OnNeedBlock(IoHash::Zero, std::move(NeededChunks));
+ const IoHash ChunkHash = ChunkField.AsHash();
+ if (HasAttachment(ChunkHash))
+ {
+ continue;
+ }
+ NeededChunks.emplace_back(ChunkHash);
}
- continue;
}
- for (CbFieldView ChunkField : ChunksArray)
+ if (!NeededChunks.empty())
{
- IoHash ChunkHash = ChunkField.AsHash();
- if (HasAttachment(ChunkHash))
+ OnNeedBlock(BlockHash, std::move(NeededChunks));
+ if (BlockHash != IoHash::Zero)
{
- continue;
+ NeedBlockCount++;
}
+ }
+ }
- OnNeedBlock(BlockHash, {});
- NeedBlockCount++;
- break;
+ CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView();
+ for (CbFieldView ChunkedFileField : ChunkedFilesArray)
+ {
+ CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView();
+ ChunkedInfo Chunked;
+ Chunked.RawHash = ChunkedFileView["rawhash"sv].AsHash();
+ Chunked.RawSize = ChunkedFileView["rawsize"sv].AsUInt64();
+ CbArrayView ChunksArray = ChunkedFileView["chunks"sv].AsArrayView();
+ Chunked.ChunkHashes.reserve(ChunksArray.Num());
+ for (CbFieldView ChunkField : ChunksArray)
+ {
+ const IoHash ChunkHash = ChunkField.AsHash();
+ Chunked.ChunkHashes.emplace_back(ChunkHash);
}
- };
+ CbArrayView SequenceArray = ChunkedFileView["sequence"sv].AsArrayView();
+ Chunked.ChunkSequence.reserve(SequenceArray.Num());
+ for (CbFieldView SequenceField : SequenceArray)
+ {
+ uint32_t SequenceIndex = SequenceField.AsUInt32();
+ ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size());
+ Chunked.ChunkSequence.push_back(SequenceIndex);
+ }
+ OnChunkedAttachment(Chunked);
+ }
+
ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
@@ -1788,7 +2155,6 @@ LoadOplog(CidStore& ChunkStore,
WorkerThreadPool& WorkerPool = GetSmallWorkerPool();
std::unordered_set<IoHash, IoHash::Hasher> Attachments;
- std::vector<std::vector<IoHash>> ChunksInBlocks;
RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo();
ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName));
@@ -1815,12 +2181,19 @@ LoadOplog(CidStore& ChunkStore,
std::atomic_size_t AttachmentCount = 0;
auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) {
- return !ForceDownload && ChunkStore.ContainsChunk(RawHash);
+ if (ForceDownload)
+ {
+ return false;
+ }
+ if (ChunkStore.ContainsChunk(RawHash))
+ {
+ return true;
+ }
+ return false;
};
auto OnNeedBlock = [&RemoteStore,
&ChunkStore,
&WorkerPool,
- &ChunksInBlocks,
&AttachmentsWorkLatch,
&AttachmentCount,
&RemoteResult,
@@ -1896,6 +2269,7 @@ LoadOplog(CidStore& ChunkStore,
&RemoteStore,
BlockHash,
&RemoteResult,
+ Chunks = std::move(Chunks),
&Info,
IgnoreMissingAttachments,
OptionalContext]() {
@@ -1922,30 +2296,36 @@ LoadOplog(CidStore& ChunkStore,
}
return;
}
- Info.AttachmentBlocksDownloaded.fetch_add(1);
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
uint64_t BlockSize = BlockResult.Bytes.GetSize();
+ Info.AttachmentBlocksDownloaded.fetch_add(1);
ZEN_INFO("Loaded block attachment '{}' in {} ({})",
BlockHash,
NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
NiceBytes(BlockSize));
- if (RemoteResult.IsError())
- {
- return;
- }
Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
-
+ std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
+ WantedChunks.reserve(Chunks.size());
+ WantedChunks.insert(Chunks.begin(), Chunks.end());
bool StoreChunksOK =
- IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
- uint64_t ChunkSize = Chunk.GetCompressedSize();
- CidStore::InsertResult InsertResult =
- ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
- if (InsertResult.New)
- {
- Info.AttachmentBytesStored.fetch_add(ChunkSize);
- Info.AttachmentsStored.fetch_add(1);
- }
- });
-
+ IterateBlock(std::move(BlockResult.Bytes),
+ [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
+ if (WantedChunks.contains(AttachmentRawHash))
+ {
+ uint64_t ChunkSize = Chunk.GetCompressedSize();
+ CidStore::InsertResult InsertResult =
+ ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(ChunkSize);
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ WantedChunks.erase(AttachmentRawHash);
+ }
+ });
if (!StoreChunksOK)
{
ReportMessage(OptionalContext,
@@ -1958,6 +2338,7 @@ LoadOplog(CidStore& ChunkStore,
{});
return;
}
+ ZEN_ASSERT(WantedChunks.empty());
});
};
@@ -2027,8 +2408,22 @@ LoadOplog(CidStore& ChunkStore,
});
};
- RemoteProjectStore::Result Result =
- SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OptionalContext);
+ std::vector<ChunkedInfo> FilesToDechunk;
+ auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) {
+ if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash))
+ {
+ Oplog.CaptureAddedAttachments(Chunked.ChunkHashes);
+ FilesToDechunk.push_back(Chunked);
+ }
+ };
+
+ RemoteProjectStore::Result Result = SaveOplogContainer(Oplog,
+ LoadContainerResult.ContainerObject,
+ HasAttachment,
+ OnNeedBlock,
+ OnNeedAttachment,
+ OnChunkedAttachment,
+ OptionalContext);
if (Result.ErrorCode != 0)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
@@ -2057,8 +2452,101 @@ LoadOplog(CidStore& ChunkStore,
}
if (Result.ErrorCode == 0)
{
+ if (!FilesToDechunk.empty())
+ {
+ ReportMessage(OptionalContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size()));
+
+ Latch DechunkLatch(1);
+ std::filesystem::path TempFilePath = Oplog.TempPath();
+ for (const ChunkedInfo& Chunked : FilesToDechunk)
+ {
+ std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString();
+ DechunkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&ChunkStore, &DechunkLatch, TempFileName, &Chunked, &RemoteResult, OptionalContext]() {
+ auto _ = MakeGuard([&DechunkLatch] { DechunkLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ Stopwatch Timer;
+ IoBuffer TmpBuffer;
+ {
+ BasicFile TmpFile;
+ TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate);
+ {
+ BasicFileWriter TmpWriter(TmpFile, 64u * 1024u);
+
+ uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder();
+ BLAKE3Stream HashingStream;
+ for (std::uint32_t SequenceIndex : Chunked.ChunkSequence)
+ {
+ const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex];
+ IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash);
+ if (!Chunk)
+ {
+ RemoteResult.SetError(
+ gsl::narrow<int>(HttpResponseCode::NotFound),
+ "Missing chunk",
+ fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
+ ReportMessage(OptionalContext,
+ fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
+ return;
+ }
+ CompositeBuffer Decompressed =
+ CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite();
+ for (const SharedBuffer& Segment : Decompressed.GetSegments())
+ {
+ MemoryView SegmentData = Segment.GetView();
+ HashingStream.Append(SegmentData);
+ TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset);
+ Offset += SegmentData.GetSize();
+ }
+ }
+ BLAKE3 RawHash = HashingStream.GetHash();
+ ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash));
+ UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash);
+ TmpWriter.Write(Header.GetData(), Header.GetSize(), 0);
+ }
+ TmpFile.Flush();
+ uint64_t TmpFileSize = TmpFile.FileSize();
+ TmpBuffer = IoBuffer(IoBuffer::File, TmpFile.Detach(), 0, TmpFileSize, /*IsWholeFile*/ true);
+ IoHash ValidateRawHash;
+ uint64_t ValidateRawSize = 0;
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(TmpBuffer, ValidateRawHash, ValidateRawSize));
+ ZEN_ASSERT(ValidateRawHash == Chunked.RawHash);
+ ZEN_ASSERT(ValidateRawSize == Chunked.RawSize);
+ }
+ ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace);
+ ZEN_INFO("Dechunked attachment {} ({}) in {}",
+ Chunked.RawHash,
+ NiceBytes(Chunked.RawSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ }
+ DechunkLatch.CountDown();
+
+ while (!DechunkLatch.Wait(1000))
+ {
+ ptrdiff_t Remaining = DechunkLatch.Remaining();
+ if (IsCancelled(OptionalContext))
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ }
+ }
+ ReportProgress(OptionalContext,
+ fmt::format("Dechunking attachments, {} remaining...", Remaining),
+ FilesToDechunk.size(),
+ Remaining);
+ }
+ ReportProgress(OptionalContext, fmt::format("Dechunking attachments, {} remaining...", 0), FilesToDechunk.size(), 0);
+ }
Result = RemoteResult.ConvertResult();
}
+
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
ReportMessage(OptionalContext,