aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/remoteprojectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-02-05 16:52:42 +0100
committerGitHub <[email protected]>2024-02-05 16:52:42 +0100
commit222392fff48e1659ec8bc59e42de09f3625111ad (patch)
tree4e17c75beabd7704f2bca3feb26d0aa9e7de204e /src/zenserver/projectstore/remoteprojectstore.cpp
parent5.4.1-pre0 (diff)
downloadzen-222392fff48e1659ec8bc59e42de09f3625111ad.tar.xz
zen-222392fff48e1659ec8bc59e42de09f3625111ad.zip
compress large attachments on demand (#647)
- Improvement: Speed up oplog export by fetching/compressing big attachments on demand - Improvement: Speed up oplog export by batch-fetcing small attachments - Improvement: Speed up oplog import by batching writes of oplog ops - Improvement: Tweak oplog export default block size and embed size limit - Improvement: Add more messaging and progress during oplog import/export
Diffstat (limited to 'src/zenserver/projectstore/remoteprojectstore.cpp')
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp710
1 files changed, 386 insertions, 324 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 445179983..a8aa22526 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -197,17 +197,15 @@ CreateBlock(WorkerThreadPool& WorkerPool,
{
return;
}
- if (!Chunks.empty())
+ ZEN_ASSERT(!Chunks.empty());
+ CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); // Move to callback and return IoHash
+ IoHash BlockHash = CompressedBlock.DecodeRawHash();
{
- CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); // Move to callback and return IoHash
- IoHash BlockHash = CompressedBlock.DecodeRawHash();
- AsyncOnBlock(std::move(CompressedBlock), BlockHash);
- {
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope __(SectionsLock);
- Blocks[BlockIndex].BlockHash = BlockHash;
- }
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope __(SectionsLock);
+ Blocks[BlockIndex].BlockHash = BlockHash;
}
+ AsyncOnBlock(std::move(CompressedBlock), BlockHash);
});
}
@@ -234,30 +232,27 @@ BuildContainer(CidStore& ChunkStore,
const std::vector<Block>& KnownBlocks,
WorkerThreadPool& WorkerPool,
const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
- const std::function<void(const IoHash&)>& OnLargeAttachment,
+ const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
- tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutLooseAttachments,
+ bool EmbedLooseFiles,
JobContext* OptionalContext,
AsyncRemoteResult& RemoteResult)
{
using namespace std::literals;
- std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
- CbObjectWriter SectionOpsWriter;
- SectionOpsWriter.BeginArray("ops"sv);
-
size_t OpCount = 0;
CbObject OplogContainerObject;
{
+ std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
+ std::unordered_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseAttachments;
+
RwLock BlocksLock;
std::vector<Block> Blocks;
CompressedBuffer OpsBuffer;
std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
- size_t BlockSize = 0;
- std::vector<SharedBuffer> ChunksInBlock;
std::unordered_map<IoHash, int, IoHash::Hasher> Attachments;
auto RewriteOp = [&](int LSN, CbObjectView Op, const std::function<void(CbObjectView)>& CB) {
@@ -282,111 +277,69 @@ BuildContainer(CidStore& ChunkStore,
if (DataHash == IoHash::Zero)
{
+ std::string_view ServerPath = View["serverpath"sv].AsString();
+ std::filesystem::path FilePath = Project.RootDir / ServerPath;
+ if (!std::filesystem::is_regular_file(FilePath))
{
- // Read file contents into memory and compress
-
- std::string_view ServerPath = View["serverpath"sv].AsString();
- std::filesystem::path FilePath = Project.RootDir / ServerPath;
- BasicFile DataFile;
- std::error_code Ec;
- DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec);
- if (Ec)
- {
- ExtendableStringBuilder<1024> Sb;
- Sb.Append("Failed to find attachment '");
- Sb.Append(FilePath.string());
- Sb.Append("' for op: \n");
- Op.ToJson(Sb);
-
- ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", FilePath, Sb.ToView()));
- if (IgnoreMissingAttachments)
- {
- continue;
- }
- else
- {
- throw std::system_error(
- Ec,
- fmt::format("failed to open file '{}', mode: {:x}", FilePath, uint32_t(BasicFile::Mode::kRead)));
- }
- }
-
- IoBuffer FileIoBuffer = DataFile.ReadAll();
- DataFile.Close();
-
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer)));
-
- DataHash = Compressed.DecodeRawHash();
- uint64_t PayloadSize = Compressed.GetCompressed().GetSize();
- if (PayloadSize > MaxChunkEmbedSize)
+ ExtendableStringBuilder<1024> Sb;
+ Sb.Append("Failed to find attachment '");
+ Sb.Append(FilePath.string());
+ Sb.Append("' for op: \n");
+ Op.ToJson(Sb);
+
+ ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", FilePath, Sb.ToView()));
+ if (IgnoreMissingAttachments)
{
- // Write it out as a temporary file
- IoBuffer AttachmentBuffer;
- std::filesystem::path AttachmentPath = Oplog.TempPath() / DataHash.ToHexString();
- if (std::filesystem::is_regular_file(AttachmentPath))
- {
- AttachmentBuffer = IoBufferBuilder::MakeFromFile(AttachmentPath);
- if (AttachmentBuffer.GetSize() != PayloadSize)
- {
- AttachmentBuffer = IoBuffer{};
- }
- }
- if (!AttachmentBuffer)
- {
- BasicFile BlockFile;
- uint32_t RetriesLeft = 3;
- BlockFile.Open(AttachmentPath, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) {
- if (RetriesLeft == 0)
- {
- return false;
- }
- ZEN_WARN("Failed to create temp attachment '{}': '{}', retries left: {}.",
- AttachmentPath,
- Ec.message(),
- RetriesLeft);
- Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms
- RetriesLeft--;
- return true;
- });
- uint64_t Offset = 0;
- for (const SharedBuffer& Buffer : Compressed.GetCompressed().GetSegments())
- {
- BlockFile.Write(Buffer.GetView(), Offset);
- Offset += Buffer.GetSize();
- }
- void* FileHandle = BlockFile.Detach();
- AttachmentBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true);
-
- AttachmentBuffer.SetDeleteOnClose(true);
- ZEN_DEBUG("Saved temp attachment {}, {}", DataHash, NiceBytes(PayloadSize));
- }
- OutLooseAttachments->insert_or_assign(DataHash, AttachmentBuffer);
+ continue;
}
else
{
- // If it is small we just hang on to the compressed buffer
- OutLooseAttachments->insert_or_assign(DataHash, Compressed.GetCompressed().Flatten().AsIoBuffer());
+ throw std::runtime_error(fmt::format("failed to open file '{}'", FilePath));
}
}
+ SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath));
+ // Large file, just hash it for now and leave compression for later via callback
+ // We don't want to spend time compressing a large attachment if we don't need to upload it
+ DataHash = IoHash::HashBuffer(DataBuffer);
+ if (DataBuffer.GetSize() > MaxChunkEmbedSize)
+ {
+ LargeChunkHashes.insert(DataHash);
+ }
+ LooseAttachments.insert_or_assign(
+ DataHash,
+ [AttachmentBuffer = std::move(DataBuffer)](const IoHash& DataHash) -> IoBuffer {
+ Stopwatch AttachmentTimer;
+ uint64_t RawSize = AttachmentBuffer.GetSize();
+ CompressedBuffer Compressed =
+ CompressedBuffer::Compress(AttachmentBuffer, OodleCompressor::Mermaid, OodleCompressionLevel::Normal);
+ ZEN_ASSERT(Compressed.DecodeRawHash() == DataHash);
+ uint64_t PayloadSize = Compressed.GetCompressedSize();
+ ZEN_INFO("Compressed loose file attachment {} ({} -> {}) in {}",
+ DataHash,
+ NiceBytes(RawSize),
+ NiceBytes(PayloadSize),
+ NiceTimeSpanMs(static_cast<uint64_t>(AttachmentTimer.GetElapsedTimeMs())));
+ return Compressed.GetCompressed().Flatten().AsIoBuffer();
+ });
+ }
- // Rewrite file array entry with new data reference
- CbObjectWriter Writer;
- RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
- if (Field.GetName() == "data"sv)
- {
- // omit this field as we will write it explicitly ourselves
- return true;
- }
- return false;
- });
- Writer.AddBinaryAttachment("data"sv, DataHash);
+ // Rewrite file array entry with new data reference
+ CbObjectWriter Writer;
+ RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
+ if (Field.GetName() == "data"sv)
+ {
+ // omit this field as we will write it explicitly ourselves
+ return true;
+ }
+ return false;
+ });
+ Writer.AddBinaryAttachment("data"sv, DataHash);
- CbObject RewrittenOp = Writer.Save();
- Cbo.AddObject(std::move(RewrittenOp));
- CopyField = false;
+ CbObject RewrittenOp = Writer.Save();
+ Cbo.AddObject(std::move(RewrittenOp));
+ CopyField = false;
- Attachments.insert_or_assign(DataHash, LSN);
- }
+ Attachments.insert_or_assign(DataHash, LSN);
}
if (CopyField)
@@ -423,41 +376,66 @@ BuildContainer(CidStore& ChunkStore,
ReportMessage(OptionalContext, "Building exported oplog and collecting attachments");
- Stopwatch Timer;
- tsl::robin_map<int, std::string> OpLSNToKey;
+ Stopwatch Timer;
+ tsl::robin_map<int, std::string> OpLSNToKey;
+ CompressedBuffer CompressedOpsSection;
{
- Stopwatch RewriteOplogTimer;
- Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObjectView Op) {
- if (RemoteResult.IsError())
- {
- return;
- }
- std::string_view Key = Op["key"sv].AsString();
- OpLSNToKey.insert({LSN, std::string(Key)});
- Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); });
- if (OutLooseAttachments != nullptr)
- {
- RewriteOp(LSN, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; });
- }
- else
- {
- SectionOpsWriter << Op;
- }
- OpCount++;
+ CbObjectWriter SectionOpsWriter;
+ SectionOpsWriter.BeginArray("ops"sv);
+ {
+ Stopwatch RewriteOplogTimer;
+ Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObjectView Op) {
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ std::string_view Key = Op["key"sv].AsString();
+ OpLSNToKey.insert({LSN, std::string(Key)});
+ Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); });
+ if (EmbedLooseFiles)
+ {
+ RewriteOp(LSN, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; });
+ }
+ else
+ {
+ SectionOpsWriter << Op;
+ }
+ OpCount++;
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ return;
+ }
+ if (OpCount % 100000 == 0)
+ {
+ ReportMessage(OptionalContext, fmt::format("Building oplog, at op {}...", OpCount));
+ }
+ });
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ return {};
}
- if (OpCount % 100000 == 0)
- {
- ReportMessage(OptionalContext, fmt::format("Building oplog, at op {}...", OpCount));
- }
- });
- ReportMessage(OptionalContext,
- fmt::format("Rewrote {} ops to new oplog in {}",
- OpCount,
- NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs()))));
+ ReportMessage(OptionalContext,
+ fmt::format("Rewrote {} ops to new oplog in {}",
+ OpCount,
+ NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs()))));
+ }
+ SectionOpsWriter.EndArray(); // "ops"
+
+ {
+ Stopwatch CompressOpsTimer;
+ CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer(),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::Normal);
+ ReportMessage(OptionalContext,
+ fmt::format("Compressed oplog section {} ({} -> {}) in {}",
+ CompressedOpsSection.DecodeRawHash(),
+ NiceBytes(CompressedOpsSection.DecodeRawSize()),
+ NiceBytes(CompressedOpsSection.GetCompressedSize()),
+ NiceTimeSpanMs(static_cast<uint64_t>(CompressOpsTimer.GetElapsedTimeMs()))));
+ }
}
if (IsCancelled(OptionalContext))
@@ -565,96 +543,205 @@ BuildContainer(CidStore& ChunkStore,
SortedAttachments.size(),
OpLSNToKey.size()));
- auto GetPayload = [&](const IoHash& AttachmentHash) {
- if (OutLooseAttachments != nullptr)
+ for (const IoHash& AttachmentHash : LargeChunkHashes)
+ {
+ if (IsCancelled(OptionalContext))
{
- auto PayloadIt = OutLooseAttachments->find(AttachmentHash);
- if (PayloadIt != OutLooseAttachments->end())
- {
- return PayloadIt->second;
- }
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ return {};
+ }
+
+ if (auto It = LooseAttachments.find(AttachmentHash); It != LooseAttachments.end())
+ {
+ OnLargeAttachment(AttachmentHash, std::move(It->second));
+ LooseAttachments.erase(It);
}
- return ChunkStore.FindChunkByCid(AttachmentHash);
+ else
+ {
+ OnLargeAttachment(AttachmentHash,
+ [&ChunkStore](const IoHash& AttachmentHash) { return ChunkStore.FindChunkByCid(AttachmentHash); });
+ }
+ }
+ size_t LargeAttachmentCount = LargeChunkHashes.size();
+
+ Latch BlockCreateLatch(1);
+ size_t GeneratedBlockCount = 0;
+ size_t BlockSize = 0;
+ std::vector<SharedBuffer> ChunksInBlock;
+ int LastLSNOp = -1;
+ auto GetPayload = [&](const IoHash& AttachmentHash) {
+ if (auto It = LooseAttachments.find(AttachmentHash); It != LooseAttachments.end())
+ {
+ IoBuffer Payload = It->second(AttachmentHash);
+ LooseAttachments.erase(It);
+ return Payload;
+ }
+ return ChunkStore.FindChunkByCid(AttachmentHash);
};
- Latch BlockCreateLatch(1);
- size_t GeneratedBlockCount = 0;
- size_t LargeAttachmentCount = 0;
- int LastLSNOp = -1;
+ uint32_t ResolvedLargeCount = 0;
+ uint32_t ResolvedSmallCount = 0;
+ uint32_t ResolvedFailedCount = 0;
+ uint32_t ComposedBlocks = 0;
- for (const IoHash& AttachmentHash : SortedAttachments)
+ for (auto HashIt = SortedAttachments.begin(); HashIt != SortedAttachments.end();)
{
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- BlockCreateLatch.CountDown();
- while (!BlockCreateLatch.Wait(1000))
+ break;
+ }
+ ReportProgress(OptionalContext,
+ fmt::format("Resolving attachments: {} large, {} small, {} blocks built",
+ ResolvedLargeCount,
+ ResolvedSmallCount,
+ ComposedBlocks),
+ SortedAttachments.size(),
+ SortedAttachments.size() - (ResolvedLargeCount + ResolvedSmallCount + ResolvedFailedCount));
+
+ std::vector<IoHash> SmallChunkHashes;
+ SmallChunkHashes.reserve(8);
+ while (HashIt != SortedAttachments.end() && SmallChunkHashes.size() < 8)
+ {
+ if (LargeChunkHashes.contains(*HashIt))
{
- ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
+ ResolvedLargeCount++;
}
- return {};
+ else
+ {
+ SmallChunkHashes.push_back(*HashIt);
+ }
+ ++HashIt;
}
- auto It = Attachments.find(AttachmentHash);
- ZEN_ASSERT(It != Attachments.end());
- IoBuffer Payload = GetPayload(AttachmentHash);
- if (!Payload)
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> ResolvedSmallChunks;
+ RwLock ResolvedSmallChunksLock;
+ Latch ResolvedSmallChunksLatch(1);
+ for (const IoHash& SmallChunkHash : SmallChunkHashes)
{
- std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second);
- ZEN_ASSERT(Op.has_value());
- ExtendableStringBuilder<1024> Sb;
- Sb.Append("Failed to find attachment '");
- Sb.Append(AttachmentHash.ToHexString());
- Sb.Append("' for op: \n");
- Op.value().ToJson(Sb);
+ ResolvedSmallChunksLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&GetPayload, SmallChunkHash, &ResolvedSmallChunksLatch, &ResolvedSmallChunksLock, &ResolvedSmallChunks]() {
+ auto _ = MakeGuard([&ResolvedSmallChunksLatch] { ResolvedSmallChunksLatch.CountDown(); });
- ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView()));
+ IoBuffer Payload = GetPayload(SmallChunkHash);
+ {
+ RwLock::ExclusiveLockScope __(ResolvedSmallChunksLock);
+ ResolvedSmallChunks.insert_or_assign(SmallChunkHash, std::move(Payload));
+ }
+ });
+ }
+ ResolvedSmallChunksLatch.CountDown();
+ ResolvedSmallChunksLatch.Wait();
- if (IgnoreMissingAttachments)
+ for (const IoHash& AttachmentHash : SmallChunkHashes)
+ {
+ auto ResolvedIt = ResolvedSmallChunks.find(AttachmentHash);
+ ZEN_ASSERT(ResolvedIt != ResolvedSmallChunks.end());
+ IoBuffer Payload = std::move(ResolvedIt->second);
+ auto It = Attachments.find(AttachmentHash);
+ ZEN_ASSERT(It != Attachments.end());
+ if (!Payload)
{
- continue;
+ std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second);
+ ZEN_ASSERT(Op.has_value());
+ ExtendableStringBuilder<1024> Sb;
+ Sb.Append("Failed to find attachment '");
+ Sb.Append(AttachmentHash.ToHexString());
+ Sb.Append("' for op: \n");
+ Op.value().ToJson(Sb);
+
+ ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView()));
+
+ if (IgnoreMissingAttachments)
+ {
+ ResolvedFailedCount++;
+ continue;
+ }
+ else
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
+ }
+ return {};
+ }
}
- else
+
+ uint64_t PayloadSize = Payload.GetSize();
+ if (PayloadSize > MaxChunkEmbedSize)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
- BlockCreateLatch.CountDown();
- while (!BlockCreateLatch.Wait(1000))
+ if (LargeChunkHashes.insert(AttachmentHash).second)
{
- ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
+ OnLargeAttachment(AttachmentHash, [Payload = std::move(Payload)](const IoHash&) { return std::move(Payload); });
+ LargeAttachmentCount++;
}
- return {};
+ ResolvedLargeCount++;
+ continue;
+ }
+ else
+ {
+ ResolvedSmallCount++;
}
- }
- uint64_t PayloadSize = Payload.GetSize();
- if (PayloadSize > MaxChunkEmbedSize)
- {
- if (LargeChunkHashes.insert(AttachmentHash).second)
+ if (!BlockAttachmentHashes.insert(AttachmentHash).second)
{
- OnLargeAttachment(AttachmentHash);
- LargeAttachmentCount++;
+ continue;
}
- continue;
- }
- if (!BlockAttachmentHashes.insert(AttachmentHash).second)
- {
- continue;
- }
+ const int CurrentOpLSN = It->second;
- const int CurrentOpLSN = It->second;
+ BlockSize += PayloadSize;
+ if (BuildBlocks)
+ {
+ ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload)));
+ }
+ else
+ {
+ Payload = {};
+ }
- BlockSize += PayloadSize;
- if (BuildBlocks)
- {
- ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload)));
- }
- else
- {
- Payload = {};
+ if (BlockSize >= MaxBlockSize && (CurrentOpLSN != LastLSNOp))
+ {
+ size_t BlockIndex = AddBlock(BlocksLock, Blocks);
+ if (BuildBlocks)
+ {
+ CreateBlock(WorkerPool,
+ BlockCreateLatch,
+ std::move(ChunksInBlock),
+ BlocksLock,
+ Blocks,
+ BlockIndex,
+ AsyncOnBlock,
+ RemoteResult);
+ ComposedBlocks++;
+ }
+ else
+ {
+ ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size());
+ OnBlockChunks(BlockAttachmentHashes);
+ }
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope _(BlocksLock);
+ Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
+ BlockAttachmentHashes.begin(),
+ BlockAttachmentHashes.end());
+ }
+ BlockAttachmentHashes.clear();
+ ChunksInBlock.clear();
+ BlockSize = 0;
+ GeneratedBlockCount++;
+ }
+ LastLSNOp = CurrentOpLSN;
}
+ }
- if (BlockSize >= MaxBlockSize && (CurrentOpLSN != LastLSNOp))
+ if (BlockSize > 0)
+ {
+ if (!IsCancelled(OptionalContext))
{
size_t BlockIndex = AddBlock(BlocksLock, Blocks);
if (BuildBlocks)
@@ -667,6 +754,7 @@ BuildContainer(CidStore& ChunkStore,
BlockIndex,
AsyncOnBlock,
RemoteResult);
+ ComposedBlocks++;
}
else
{
@@ -685,51 +773,15 @@ BuildContainer(CidStore& ChunkStore,
BlockSize = 0;
GeneratedBlockCount++;
}
- LastLSNOp = CurrentOpLSN;
- }
- if (BlockSize > 0)
- {
- size_t BlockIndex = AddBlock(BlocksLock, Blocks);
- if (BuildBlocks)
- {
- CreateBlock(WorkerPool,
- BlockCreateLatch,
- std::move(ChunksInBlock),
- BlocksLock,
- Blocks,
- BlockIndex,
- AsyncOnBlock,
- RemoteResult);
- }
- else
- {
- ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size());
- OnBlockChunks(BlockAttachmentHashes);
- }
- {
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope _(BlocksLock);
- Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
- BlockAttachmentHashes.begin(),
- BlockAttachmentHashes.end());
- }
- BlockAttachmentHashes.clear();
- ChunksInBlock.clear();
- BlockSize = 0;
- GeneratedBlockCount++;
}
- SectionOpsWriter.EndArray(); // "ops"
+ ReportProgress(OptionalContext,
+ fmt::format("Resolving attachments: {} large, {} small, {} blocks built",
+ ResolvedLargeCount,
+ ResolvedSmallCount,
+ ComposedBlocks),
+ SortedAttachments.size(),
+ 0);
- if (IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- BlockCreateLatch.CountDown();
- while (!BlockCreateLatch.Wait(1000))
- {
- ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
- }
- return {};
- }
ReportMessage(OptionalContext,
fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}",
SortedAttachments.size(),
@@ -738,7 +790,6 @@ BuildContainer(CidStore& ChunkStore,
LargeAttachmentCount,
NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs()))));
- CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer());
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
@@ -754,10 +805,6 @@ BuildContainer(CidStore& ChunkStore,
}
return {};
}
- ReportMessage(OptionalContext,
- fmt::format("Added oplog section {}, {}",
- CompressedOpsSection.DecodeRawHash(),
- NiceBytes(CompressedOpsSection.GetCompressedSize())));
BlockCreateLatch.CountDown();
while (!BlockCreateLatch.Wait(1000))
@@ -856,9 +903,9 @@ BuildContainer(CidStore& ChunkStore,
bool BuildBlocks,
bool IgnoreMissingAttachments,
const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
- const std::function<void(const IoHash&)>& OnLargeAttachment,
+ const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
- tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutOptionalTempAttachments)
+ bool EmbedLooseFiles)
{
WorkerThreadPool& WorkerPool = GetSmallWorkerPool();
@@ -875,7 +922,7 @@ BuildContainer(CidStore& ChunkStore,
AsyncOnBlock,
OnLargeAttachment,
OnBlockChunks,
- OutOptionalTempAttachments,
+ EmbedLooseFiles,
nullptr,
RemoteResult);
return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject};
@@ -891,18 +938,18 @@ struct UploadInfo
};
void
-UploadAttachments(WorkerThreadPool& WorkerPool,
- CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments,
- const std::vector<std::vector<IoHash>>& BlockChunks,
- const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks,
- const tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>& TempAttachments,
- const std::unordered_set<IoHash, IoHash::Hasher>& Needs,
- bool ForceAll,
- UploadInfo& Info,
- AsyncRemoteResult& RemoteResult,
- JobContext* OptionalContext)
+UploadAttachments(WorkerThreadPool& WorkerPool,
+ CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments,
+ const std::vector<std::vector<IoHash>>& BlockChunks,
+ const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks,
+ const tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LooseFileAttachments,
+ const std::unordered_set<IoHash, IoHash::Hasher>& Needs,
+ bool ForceAll,
+ UploadInfo& Info,
+ AsyncRemoteResult& RemoteResult,
+ JobContext* OptionalContext)
{
using namespace std::literals;
@@ -1004,16 +1051,6 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
continue;
}
- IoBuffer Payload;
- if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
- {
- Payload = BlockIt->second;
- }
- else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end())
- {
- Payload = LooseTmpFileIt->second;
- }
-
SaveAttachmentsLatch.AddCount(1);
AttachmentsToSave++;
WorkerPool.ScheduleWork([&ChunkStore,
@@ -1022,7 +1059,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
&RemoteResult,
RawHash,
&CreatedBlocks,
- TempPayload = std::move(Payload),
+ &LooseFileAttachments,
&Info,
OptionalContext]() {
auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
@@ -1030,7 +1067,19 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
{
return;
}
- IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash);
+ IoBuffer Payload;
+ if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
+ {
+ Payload = BlockIt->second;
+ }
+ else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
+ {
+ Payload = LooseTmpFileIt->second(RawHash);
+ }
+ else
+ {
+ Payload = ChunkStore.FindChunkByCid(RawHash);
+ }
if (!Payload)
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
@@ -1258,10 +1307,11 @@ SaveOplog(CidStore& ChunkStore,
CreateDirectories(AttachmentTempPath);
}
- AsyncRemoteResult RemoteResult;
- RwLock AttachmentsLock;
- std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments;
- std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks;
+ AsyncRemoteResult RemoteResult;
+ RwLock AttachmentsLock;
+ std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments;
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks;
+ tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles;
auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock,
const IoHash& BlockHash) {
@@ -1331,10 +1381,12 @@ SaveOplog(CidStore& ChunkStore,
ZEN_DEBUG("Found {} block chunks", Chunks.size());
};
- auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments](const IoHash& AttachmentHash) {
+ auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments, &LooseLargeFiles](const IoHash& AttachmentHash,
+ TGetAttachmentBufferFunc&& GetBufferFunc) {
{
RwLock::ExclusiveLockScope _(AttachmentsLock);
LargeAttachments.insert(AttachmentHash);
+ LooseLargeFiles.insert_or_assign(AttachmentHash, std::move(GetBufferFunc));
}
ZEN_DEBUG("Found attachment {}", AttachmentHash);
};
@@ -1394,8 +1446,7 @@ SaveOplog(CidStore& ChunkStore,
}
}
- tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> TempAttachments;
- CbObject OplogContainerObject = BuildContainer(ChunkStore,
+ CbObject OplogContainerObject = BuildContainer(ChunkStore,
Project,
Oplog,
MaxBlockSize,
@@ -1407,7 +1458,7 @@ SaveOplog(CidStore& ChunkStore,
OnBlock,
OnLargeAttachment,
OnBlockChunks,
- EmbedLooseFiles ? &TempAttachments : nullptr,
+ EmbedLooseFiles,
OptionalContext,
/* out */ RemoteResult);
if (!RemoteResult.IsError())
@@ -1451,7 +1502,7 @@ SaveOplog(CidStore& ChunkStore,
LargeAttachments,
BlockChunks,
CreatedBlocks,
- TempAttachments,
+ LooseLargeFiles,
ContainerSaveResult.Needs,
ForceUpload,
Info,
@@ -1510,7 +1561,7 @@ SaveOplog(CidStore& ChunkStore,
LargeAttachments,
BlockChunks,
CreatedBlocks,
- TempAttachments,
+ LooseLargeFiles,
ContainerFinalizeResult.Needs,
false,
Info,
@@ -1518,7 +1569,7 @@ SaveOplog(CidStore& ChunkStore,
OptionalContext);
}
- TempAttachments.clear();
+ LooseLargeFiles.clear();
CreatedBlocks.clear();
}
RemoteProjectStore::Result Result = RemoteResult.ConvertResult();
@@ -1612,54 +1663,65 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer();
- CbObject SectionObject = LoadCompactBinaryObject(SectionPayload);
- if (!SectionObject)
- {
- ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type"));
- return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest),
- Timer.GetElapsedTimeMs() / 1000.500,
- "Section has unexpected data type",
- "Failed to save oplog container"};
- }
-
- CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
- const uint64_t OpCount = OpsArray.Num();
- uint64_t OpsLoaded = 0;
- ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpsArray.Num()));
- BinaryWriter Writer;
- for (CbFieldView OpEntry : OpsArray)
{
- CbObjectView Op = OpEntry.AsObjectView();
- Op.CopyTo(Writer);
- CbObjectView TypedOp(Writer.GetData());
- const uint32_t OpLsn = Oplog.AppendNewOplogEntry(TypedOp);
- Writer.Reset();
- if (OpLsn == ProjectStore::Oplog::kInvalidOp)
+ CbObject SectionObject = LoadCompactBinaryObject(SectionPayload);
+ if (!SectionObject)
{
- ReportMessage(OptionalContext, fmt::format("Failed to save op {}", OpsLoaded));
+ ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type"));
return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest),
Timer.GetElapsedTimeMs() / 1000.500,
- "Failed saving op",
+ "Section has unexpected data type",
"Failed to save oplog container"};
}
- ZEN_DEBUG("oplog entry #{}", OpLsn);
- if (OpCount > 100000)
- {
- if (IsCancelled(OptionalContext))
+
+ CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
+ const uint64_t OpCount = OpsArray.Num();
+
+ ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount));
+
+ const size_t OpsBatchSize = 8192;
+ std::vector<uint8_t> OpsData;
+ std::vector<size_t> OpDataOffsets;
+ size_t OpsCompleteCount = 0;
+
+ OpsData.reserve(OpsBatchSize);
+
+ auto AppendBatch = [&]() {
+ std::vector<CbObjectView> Ops;
+ Ops.reserve(OpDataOffsets.size());
+ for (size_t OpDataOffset : OpDataOffsets)
{
- return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::OK),
- Timer.GetElapsedTimeMs() / 1000.500,
- "Operation cancelled",
- ""};
+ Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset]));
}
- if (OpsLoaded % 10000 == 0)
+ std::vector<uint32_t> OpLsns = Oplog.AppendNewOplogEntries(Ops);
+ OpsCompleteCount += OpLsns.size();
+ OpsData.clear();
+ OpDataOffsets.clear();
+ ReportProgress(OptionalContext,
+ fmt::format("Writing oplog, {} remaining...", OpCount - OpsCompleteCount),
+ OpCount,
+ OpCount - OpsCompleteCount);
+ };
+
+ BinaryWriter Writer;
+ for (CbFieldView OpEntry : OpsArray)
+ {
+ CbObjectView Op = OpEntry.AsObjectView();
+ Op.CopyTo(Writer);
+ OpDataOffsets.push_back(OpsData.size());
+ OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize());
+ Writer.Reset();
+
+ if (OpDataOffsets.size() == OpsBatchSize)
{
- ReportProgress(OptionalContext, "Writing oplog", OpCount, OpCount - OpsLoaded);
+ AppendBatch();
}
}
- OpsLoaded++;
+ if (!OpDataOffsets.empty())
+ {
+ AppendBatch();
+ }
}
- ReportProgress(OptionalContext, "Writing oplog", OpCount, 0);
return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500};
}
@@ -1975,7 +2037,7 @@ LoadOplog(CidStore& ChunkStore,
NiceBytes(Info.AttachmentBlockBytesDownloaded.load()),
Info.AttachmentsDownloaded.load(),
NiceBytes(Info.AttachmentBytesDownloaded.load()),
- NiceBytes(Info.AttachmentsStored.load()),
+ Info.AttachmentsStored.load(),
NiceBytes(Info.AttachmentBytesStored.load()),
Info.MissingAttachmentCount.load()));
return Result;