aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp2335
1 files changed, 1184 insertions, 1151 deletions
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index 406229dff..fcd5fc9c0 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -291,12 +291,12 @@ namespace remotestore_impl {
while (!CurrentOpChunkSizes.empty())
{
+ size_t CurrentOpAttachmentCount = CurrentOpChunkSizes.size();
+
ZEN_ASSERT(CurrentOpRawHashes.size() == CurrentOpChunkSizes.size());
ZEN_ASSERT(CurrentOpAttachmentsSize <= m_UsableBlockSize);
ZEN_ASSERT(CurrentOpAttachmentCount <= m_Config.MaxChunksPerBlock);
- size_t CurrentOpAttachmentCount = CurrentOpChunkSizes.size();
-
// Path A: gathered chunks exactly fill one block -- emit as a standalone block immediately.
if (CurrentOpFillFullBlock)
{
@@ -409,6 +409,401 @@ namespace remotestore_impl {
const uint64_t m_UsableBlockSize = 0;
};
+ struct FoundAttachment
+ {
+ std::filesystem::path RawPath; // If not stored in cid
+ uint64_t Size = 0;
+ Oid Key = Oid::Zero;
+ };
+
+ CbObject RewriteOplog(
+ ProjectStore::Project& Project,
+ ProjectStore::Oplog& Oplog,
+ bool IgnoreMissingAttachments,
+ bool EmbedLooseFiles,
+ const std::filesystem::path& AttachmentTempPath,
+ std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher>& UploadAttachments, // TODO: Rename to OutUploadAttachments
+ remotestore_impl::AsyncRemoteResult& RemoteResult,
+ JobContext* OptionalContext)
+ {
+ size_t OpCount = 0;
+ CreateDirectories(AttachmentTempPath);
+
+ auto RewriteOp = [&](const Oid& Key, CbObjectView Op, const std::function<void(CbObjectView)>& CB) {
+ bool OpRewritten = false;
+ CbArrayView Files = Op["files"sv].AsArrayView();
+ if (Files.Num() == 0)
+ {
+ CB(Op);
+ return;
+ }
+
+ CbWriter Cbo;
+ Cbo.BeginArray("files"sv);
+
+ for (CbFieldView& Field : Files)
+ {
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ CB(Op);
+ return;
+ }
+
+ bool CopyField = true;
+
+ if (CbObjectView View = Field.AsObjectView())
+ {
+ IoHash DataHash = View["data"sv].AsHash();
+
+ if (DataHash == IoHash::Zero)
+ {
+ std::string_view ServerPath = View["serverpath"sv].AsString();
+ std::filesystem::path FilePath = (Project.RootDir / ServerPath).make_preferred();
+ if (!IsFile(FilePath))
+ {
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Missing attachment '{}' for op '{}'", FilePath, View["id"sv].AsObjectId()));
+ if (IgnoreMissingAttachments)
+ {
+ continue;
+ }
+ else
+ {
+ ExtendableStringBuilder<1024> Sb;
+ Sb.Append("Failed to find attachment '");
+ Sb.Append(FilePath.string());
+ Sb.Append("' for op: \n");
+ View.ToJson(Sb);
+ throw std::runtime_error(Sb.ToString());
+ }
+ }
+
+ {
+ Stopwatch HashTimer;
+ SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath));
+ DataHash = IoHash::HashBuffer(CompositeBuffer(DataBuffer));
+ ZEN_INFO("Hashed loose file '{}' {}: {} in {}",
+ FilePath,
+ NiceBytes(DataBuffer.GetSize()),
+ DataHash,
+ NiceTimeSpanMs(HashTimer.GetElapsedTimeMs()));
+ }
+
+ // Rewrite file array entry with new data reference
+ CbObjectWriter Writer;
+ RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
+ if (Field.GetName() == "data"sv)
+ {
+ // omit this field as we will write it explicitly ourselves
+ return true;
+ }
+ return false;
+ });
+ Writer.AddBinaryAttachment("data"sv, DataHash);
+ UploadAttachments.insert_or_assign(DataHash, FoundAttachment{.RawPath = FilePath, .Key = Key});
+
+ CbObject RewrittenOp = Writer.Save();
+ Cbo.AddObject(std::move(RewrittenOp));
+ CopyField = false;
+ }
+ }
+
+ if (CopyField)
+ {
+ Cbo.AddField(Field);
+ }
+ else
+ {
+ OpRewritten = true;
+ }
+ }
+
+ if (!OpRewritten)
+ {
+ CB(Op);
+ return;
+ }
+
+ Cbo.EndArray();
+ CbArray FilesArray = Cbo.Save().AsArray();
+
+ CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool {
+ if (Field.GetName() == "files"sv)
+ {
+ NewWriter.AddArray("files"sv, FilesArray);
+
+ return true;
+ }
+
+ return false;
+ });
+ CB(RewrittenOp);
+ };
+
+ remotestore_impl::ReportMessage(OptionalContext, "Building exported oplog and collecting attachments");
+
+ Stopwatch Timer;
+
+ size_t TotalOpCount = Oplog.GetOplogEntryCount();
+ Stopwatch RewriteOplogTimer;
+ CbObjectWriter SectionOpsWriter;
+ SectionOpsWriter.BeginArray("ops"sv);
+ {
+ Stopwatch BuildingOplogProgressTimer;
+ Oplog.IterateOplogWithKey([&](int, const Oid& Key, CbObjectView Op) {
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ Op.IterateAttachments([&](CbFieldView FieldView) {
+ UploadAttachments.insert_or_assign(FieldView.AsAttachment(), FoundAttachment{.Key = Key});
+ });
+ if (EmbedLooseFiles)
+ {
+ RewriteOp(Key, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; });
+ }
+ else
+ {
+ SectionOpsWriter << Op;
+ }
+ OpCount++;
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return;
+ }
+ if (OpCount % 1000 == 0)
+ {
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Building oplog"sv,
+ fmt::format("{} ops processed", OpCount),
+ TotalOpCount,
+ TotalOpCount - OpCount,
+ BuildingOplogProgressTimer.GetElapsedTimeMs());
+ }
+ });
+ if (RemoteResult.IsError())
+ {
+ return {};
+ }
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
+ if (TotalOpCount > 0)
+ {
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Building oplog"sv,
+ fmt::format("{} ops processed", OpCount),
+ TotalOpCount,
+ 0,
+ BuildingOplogProgressTimer.GetElapsedTimeMs());
+ }
+ }
+ SectionOpsWriter.EndArray(); // "ops"
+
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Rewrote {} ops to new oplog in {}",
+ OpCount,
+ NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs()))));
+
+ return SectionOpsWriter.Save();
+ }
+
+ struct FoundChunkedFile
+ {
+ IoHash RawHash = IoHash::Zero;
+ IoBuffer Source;
+ uint64_t Offset = 0;
+ uint64_t Size = 0;
+ };
+
+ void FindChunkSizes(CidStore& ChunkStore,
+ WorkerThreadPool& WorkerPool,
+ size_t MaxChunkEmbedSize,
+ size_t ChunkFileSizeLimit,
+ bool AllowChunking,
+ const std::filesystem::path& AttachmentTempPath,
+ std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher>& UploadAttachments,
+ std::unordered_set<IoHash, IoHash::Hasher>& MissingHashes,
+ std::vector<FoundChunkedFile>& AttachmentsToChunk,
+ JobContext* OptionalContext,
+ remotestore_impl::AsyncRemoteResult& RemoteResult)
+ {
+ {
+ RwLock FindChunkSizesLock;
+ Latch FindChunkSizesLatch(1);
+ for (auto& It : UploadAttachments)
+ {
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ break;
+ }
+
+ FindChunkSizesLatch.AddCount(1);
+
+ WorkerPool.ScheduleWork(
+ [&ChunkStore,
+ UploadAttachment = &It.second,
+ RawHash = It.first,
+ &FindChunkSizesLatch,
+ &FindChunkSizesLock,
+ &MissingHashes,
+ AttachmentTempPath,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ AllowChunking,
+ &RemoteResult,
+ &AttachmentsToChunk,
+ OptionalContext]() {
+ ZEN_TRACE_CPU("PrepareChunk");
+
+ auto _ = MakeGuard([&FindChunkSizesLatch] { FindChunkSizesLatch.CountDown(); });
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ return;
+ }
+
+ try
+ {
+ if (!UploadAttachment->RawPath.empty())
+ {
+ const std::filesystem::path& FilePath = UploadAttachment->RawPath;
+ IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath);
+ if (RawData)
+ {
+ UploadAttachment->Size = RawData.GetSize();
+ if (AllowChunking && UploadAttachment->Size > ChunkFileSizeLimit)
+ {
+ FindChunkSizesLock.WithExclusiveLock([&]() {
+ AttachmentsToChunk.push_back(FoundChunkedFile{.RawHash = RawHash,
+ .Source = RawData,
+ .Offset = 0,
+ .Size = RawData.GetSize()});
+ });
+ }
+ }
+ else
+ {
+ FindChunkSizesLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
+ }
+ }
+ else
+ {
+ IoBuffer Data = ChunkStore.FindChunkByCid(RawHash);
+ if (Data)
+ {
+ UploadAttachment->Size = Data.GetSize();
+ if (AllowChunking && Data.IsWholeFile())
+ {
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize);
+ if (Compressed)
+ {
+ if (VerifyRawSize > ChunkFileSizeLimit)
+ {
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize;
+ if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ {
+ if (CompressionLevel == OodleCompressionLevel::None)
+ {
+ CompositeBuffer Decompressed = Compressed.DecompressToComposite();
+ if (Decompressed)
+ {
+ std::span<const SharedBuffer> Segments = Decompressed.GetSegments();
+ if (Segments.size() == 1)
+ {
+ IoBuffer DecompressedData = Segments[0].AsIoBuffer();
+ IoBufferFileReference DecompressedFileRef;
+ if (DecompressedData.GetFileReference(DecompressedFileRef))
+ {
+ // Are we still pointing to disk?
+ FindChunkSizesLock.WithExclusiveLock([&]() {
+ AttachmentsToChunk.push_back(
+ FoundChunkedFile{.RawHash = RawHash,
+ .Source = Data,
+ .Offset = DecompressedFileRef.FileChunkOffset,
+ .Size = DecompressedFileRef.FileChunkSize});
+ });
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ else
+ {
+ FindChunkSizesLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to resolve attachment {}", RawHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ FindChunkSizesLatch.CountDown();
+
+ Stopwatch ResolveAttachmentsProgressTimer;
+ while (!FindChunkSizesLatch.Wait(1000))
+ {
+ ptrdiff_t Remaining = FindChunkSizesLatch.Remaining();
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ while (!FindChunkSizesLatch.Wait(1000))
+ {
+ Remaining = FindChunkSizesLatch.Remaining();
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ fmt::format("Aborting, {} attachments remaining...", Remaining),
+ UploadAttachments.size(),
+ Remaining,
+ ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
+ }
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ "Aborted"sv,
+ UploadAttachments.size(),
+ 0,
+ ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
+ return;
+ }
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ fmt::format("{} remaining...", Remaining),
+ UploadAttachments.size(),
+ Remaining,
+ ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
+ }
+ }
+ }
+
RemoteProjectStore::Result WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext)
{
using namespace std::literals;
@@ -2003,1353 +2398,993 @@ BuildContainer(CidStore& ChunkStore,
std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(OptionalContext));
- size_t OpCount = 0;
+ Stopwatch Timer;
- CbObject OplogContainerObject;
+ CbObject OplogContainerObject;
+ CompressedBuffer CompressedOpsSection;
+ std::unordered_map<IoHash, remotestore_impl::FoundAttachment, IoHash::Hasher> UploadAttachments;
+ std::filesystem::path AttachmentTempPath = Oplog.TempPath();
+ AttachmentTempPath.append(".pending");
+
+ CbObject SectionOps = remotestore_impl::RewriteOplog(Project,
+ Oplog,
+ IgnoreMissingAttachments,
+ EmbedLooseFiles,
+ AttachmentTempPath,
+ UploadAttachments,
+ RemoteResult,
+ OptionalContext);
+
+ size_t TotalOpCount = Oplog.GetOplogEntryCount();
{
- struct FoundAttachment
- {
- std::filesystem::path RawPath; // If not stored in cid
- uint64_t Size = 0;
- Oid Key = Oid::Zero;
- };
-
- std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments;
-
- CompressedBuffer OpsBuffer;
-
- std::filesystem::path AttachmentTempPath = Oplog.TempPath();
- AttachmentTempPath.append(".pending");
- CreateDirectories(AttachmentTempPath);
-
- auto RewriteOp = [&](const Oid& Key, CbObjectView Op, const std::function<void(CbObjectView)>& CB) {
- bool OpRewritten = false;
- CbArrayView Files = Op["files"sv].AsArrayView();
- if (Files.Num() == 0)
- {
- CB(Op);
- return;
- }
-
- CbWriter Cbo;
- Cbo.BeginArray("files"sv);
-
- for (CbFieldView& Field : Files)
- {
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- CB(Op);
- return;
- }
-
- bool CopyField = true;
-
- if (CbObjectView View = Field.AsObjectView())
- {
- IoHash DataHash = View["data"sv].AsHash();
-
- if (DataHash == IoHash::Zero)
- {
- std::string_view ServerPath = View["serverpath"sv].AsString();
- std::filesystem::path FilePath = (Project.RootDir / ServerPath).make_preferred();
- if (!IsFile(FilePath))
- {
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Missing attachment '{}' for op '{}'", FilePath, View["id"sv].AsObjectId()));
- if (IgnoreMissingAttachments)
- {
- continue;
- }
- else
- {
- ExtendableStringBuilder<1024> Sb;
- Sb.Append("Failed to find attachment '");
- Sb.Append(FilePath.string());
- Sb.Append("' for op: \n");
- View.ToJson(Sb);
- throw std::runtime_error(Sb.ToString());
- }
- }
-
- {
- Stopwatch HashTimer;
- SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath));
- DataHash = IoHash::HashBuffer(CompositeBuffer(DataBuffer));
- ZEN_INFO("Hashed loose file '{}' {}: {} in {}",
- FilePath,
- NiceBytes(DataBuffer.GetSize()),
- DataHash,
- NiceTimeSpanMs(HashTimer.GetElapsedTimeMs()));
- }
-
- // Rewrite file array entry with new data reference
- CbObjectWriter Writer;
- RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
- if (Field.GetName() == "data"sv)
- {
- // omit this field as we will write it explicitly ourselves
- return true;
- }
- return false;
- });
- Writer.AddBinaryAttachment("data"sv, DataHash);
- UploadAttachments.insert_or_assign(DataHash, FoundAttachment{.RawPath = FilePath, .Key = Key});
-
- CbObject RewrittenOp = Writer.Save();
- Cbo.AddObject(std::move(RewrittenOp));
- CopyField = false;
- }
- }
-
- if (CopyField)
- {
- Cbo.AddField(Field);
- }
- else
- {
- OpRewritten = true;
- }
- }
-
- if (!OpRewritten)
- {
- CB(Op);
- return;
- }
-
- Cbo.EndArray();
- CbArray FilesArray = Cbo.Save().AsArray();
+ Stopwatch CompressOpsTimer;
+ CompressedOpsSection = CompressedBuffer::Compress(SectionOps.GetBuffer(), OodleCompressor::Mermaid, OodleCompressionLevel::Fast);
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Compressed oplog section {} ({} -> {}) in {}",
+ CompressedOpsSection.DecodeRawHash(),
+ NiceBytes(CompressedOpsSection.DecodeRawSize()),
+ NiceBytes(CompressedOpsSection.GetCompressedSize()),
+ NiceTimeSpanMs(static_cast<uint64_t>(CompressOpsTimer.GetElapsedTimeMs()))));
+ }
- CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool {
- if (Field.GetName() == "files"sv)
- {
- NewWriter.AddArray("files"sv, FilesArray);
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
- return true;
- }
+ std::unordered_set<IoHash, IoHash::Hasher> FoundHashes;
+ FoundHashes.reserve(UploadAttachments.size());
+ for (const auto& It : UploadAttachments)
+ {
+ FoundHashes.insert(It.first);
+ }
- return false;
- });
- CB(RewrittenOp);
- };
+ struct ChunkedFile
+ {
+ IoBuffer Source;
+ ChunkedInfoWithSource Chunked;
+ };
- remotestore_impl::ReportMessage(OptionalContext, "Building exported oplog and collecting attachments");
+ std::unordered_set<IoHash, IoHash::Hasher> MissingHashes;
+ std::vector<remotestore_impl::FoundChunkedFile> AttachmentsToChunk;
+
+ remotestore_impl::FindChunkSizes(ChunkStore,
+ WorkerPool,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ AllowChunking,
+ AttachmentTempPath,
+ UploadAttachments,
+ MissingHashes,
+ AttachmentsToChunk,
+ OptionalContext,
+ RemoteResult);
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
- Stopwatch Timer;
+ for (const IoHash& AttachmentHash : MissingHashes)
+ {
+ auto It = UploadAttachments.find(AttachmentHash);
+ ZEN_ASSERT(It != UploadAttachments.end());
+ std::optional<CbObject> Op = Oplog.GetOpByKey(It->second.Key);
+ ZEN_ASSERT(Op.has_value());
- size_t TotalOpCount = Oplog.GetOplogEntryCount();
- CompressedBuffer CompressedOpsSection;
+ if (IgnoreMissingAttachments)
{
- Stopwatch RewriteOplogTimer;
- CbObjectWriter SectionOpsWriter;
- SectionOpsWriter.BeginArray("ops"sv);
- {
- Stopwatch BuildingOplogProgressTimer;
- Oplog.IterateOplogWithKey([&](int, const Oid& Key, CbObjectView Op) {
- if (RemoteResult.IsError())
- {
- return;
- }
- Op.IterateAttachments([&](CbFieldView FieldView) {
- UploadAttachments.insert_or_assign(FieldView.AsAttachment(), FoundAttachment{.Key = Key});
- });
- if (EmbedLooseFiles)
- {
- RewriteOp(Key, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; });
- }
- else
- {
- SectionOpsWriter << Op;
- }
- OpCount++;
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return;
- }
- if (OpCount % 1000 == 0)
- {
- remotestore_impl::ReportProgress(OptionalContext,
- "Building oplog"sv,
- fmt::format("{} ops processed", OpCount),
- TotalOpCount,
- TotalOpCount - OpCount,
- BuildingOplogProgressTimer.GetElapsedTimeMs());
- }
- });
- if (RemoteResult.IsError())
- {
- return {};
- }
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return {};
- }
- if (TotalOpCount > 0)
- {
- remotestore_impl::ReportProgress(OptionalContext,
- "Building oplog"sv,
- fmt::format("{} ops processed", OpCount),
- TotalOpCount,
- 0,
- BuildingOplogProgressTimer.GetElapsedTimeMs());
- }
- }
- SectionOpsWriter.EndArray(); // "ops"
-
remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Rewrote {} ops to new oplog in {}",
- OpCount,
- NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs()))));
-
- {
- Stopwatch CompressOpsTimer;
- CompressedOpsSection =
- CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer(), OodleCompressor::Mermaid, OodleCompressionLevel::Fast);
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Compressed oplog section {} ({} -> {}) in {}",
- CompressedOpsSection.DecodeRawHash(),
- NiceBytes(CompressedOpsSection.DecodeRawSize()),
- NiceBytes(CompressedOpsSection.GetCompressedSize()),
- NiceTimeSpanMs(static_cast<uint64_t>(CompressOpsTimer.GetElapsedTimeMs()))));
- }
+ fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key));
}
-
- if (remotestore_impl::IsCancelled(OptionalContext))
+ else
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ ExtendableStringBuilder<1024> Sb;
+ Sb.Append("Failed to find attachment '");
+ Sb.Append(AttachmentHash.ToHexString());
+ Sb.Append("' for op: \n");
+ Op.value().ToJson(Sb);
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
return {};
}
+ UploadAttachments.erase(AttachmentHash);
+ }
- std::unordered_set<IoHash, IoHash::Hasher> FoundHashes;
- FoundHashes.reserve(UploadAttachments.size());
- for (const auto& It : UploadAttachments)
- {
- FoundHashes.insert(It.first);
- }
-
- struct ChunkedFile
- {
- IoBuffer Source;
-
- ChunkedInfoWithSource Chunked;
- };
-
- std::unordered_set<IoHash, IoHash::Hasher> MissingHashes;
+ std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes;
- struct FoundChunkedFile
- {
- IoHash RawHash = IoHash::Zero;
- IoBuffer Source;
- uint64_t Offset = 0;
- uint64_t Size = 0;
- };
+ std::vector<ChunkedFile> ChunkedFiles(AttachmentsToChunk.size());
- std::vector<FoundChunkedFile> AttachmentsToChunk;
+ {
+ Latch ChunkFilesLatch(1);
+ for (size_t ChunkFileIndexToChunk = 0; ChunkFileIndexToChunk < AttachmentsToChunk.size(); ChunkFileIndexToChunk++)
{
- RwLock FindChunkSizesLock;
- Latch FindChunkSizesLatch(1);
- for (auto& It : UploadAttachments)
- {
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return {};
- }
+ ChunkFilesLatch.AddCount(1);
- FindChunkSizesLatch.AddCount(1);
-
- WorkerPool.ScheduleWork(
- [&ChunkStore,
- UploadAttachment = &It.second,
- RawHash = It.first,
- &FindChunkSizesLatch,
- &FindChunkSizesLock,
- // &LooseUploadAttachments,
- &MissingHashes,
- &OnLargeAttachment,
- &AttachmentTempPath,
- MaxChunkEmbedSize,
- ChunkFileSizeLimit,
- AllowChunking,
- &RemoteResult,
- &AttachmentsToChunk,
- OptionalContext]() {
- ZEN_TRACE_CPU("PrepareChunk");
-
- auto _ = MakeGuard([&FindChunkSizesLatch] { FindChunkSizesLatch.CountDown(); });
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- return;
- }
-
- try
- {
- if (!UploadAttachment->RawPath.empty())
- {
- const std::filesystem::path& FilePath = UploadAttachment->RawPath;
- IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath);
- if (RawData)
- {
- UploadAttachment->Size = RawData.GetSize();
- if (AllowChunking && UploadAttachment->Size > ChunkFileSizeLimit)
- {
- FindChunkSizesLock.WithExclusiveLock([&]() {
- AttachmentsToChunk.push_back(FoundChunkedFile{.RawHash = RawHash,
- .Source = RawData,
- .Offset = 0,
- .Size = RawData.GetSize()});
- });
- }
- }
- else
- {
- FindChunkSizesLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
- }
- }
- else
- {
- IoBuffer Data = ChunkStore.FindChunkByCid(RawHash);
- if (Data)
- {
- UploadAttachment->Size = Data.GetSize();
- if (AllowChunking && Data.IsWholeFile())
- {
- IoHash VerifyRawHash;
- uint64_t VerifyRawSize;
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize);
- if (Compressed)
- {
- if (VerifyRawSize > ChunkFileSizeLimit)
- {
- OodleCompressor Compressor;
- OodleCompressionLevel CompressionLevel;
- uint64_t BlockSize;
- if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
- {
- if (CompressionLevel == OodleCompressionLevel::None)
- {
- CompositeBuffer Decompressed = Compressed.DecompressToComposite();
- if (Decompressed)
- {
- std::span<const SharedBuffer> Segments = Decompressed.GetSegments();
- if (Segments.size() == 1)
- {
- IoBuffer DecompressedData = Segments[0].AsIoBuffer();
- IoBufferFileReference DecompressedFileRef;
- if (DecompressedData.GetFileReference(DecompressedFileRef))
- {
- // Are we still pointing to disk?
- FindChunkSizesLock.WithExclusiveLock([&]() {
- AttachmentsToChunk.push_back(
- FoundChunkedFile{.RawHash = RawHash,
- .Source = Data,
- .Offset = DecompressedFileRef.FileChunkOffset,
- .Size = DecompressedFileRef.FileChunkSize});
- });
- }
- }
- }
- }
- }
- }
- }
- }
- }
- else
- {
- FindChunkSizesLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
- }
- }
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to resolve attachment {}", RawHash),
- Ex.what());
- }
- },
- WorkerThreadPool::EMode::DisableBacklog);
- }
- FindChunkSizesLatch.CountDown();
+ WorkerPool.ScheduleWork(
+ [&ChunkFilesLatch, &AttachmentsToChunk, ChunkFileIndexToChunk, &ChunkedFiles, OptionalContext]() {
+ ZEN_TRACE_CPU("ChunkFileAsync");
- Stopwatch ResolveAttachmentsProgressTimer;
- while (!FindChunkSizesLatch.Wait(1000))
- {
- ptrdiff_t Remaining = FindChunkSizesLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- while (!FindChunkSizesLatch.Wait(1000))
+ auto _ = MakeGuard([&ChunkFilesLatch] { ChunkFilesLatch.CountDown(); });
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
- Remaining = FindChunkSizesLatch.Remaining();
- remotestore_impl::ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- fmt::format("Aborting, {} attachments remaining...", Remaining),
- UploadAttachments.size(),
- Remaining,
- ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
+ return;
}
- remotestore_impl::ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- "Aborted"sv,
- UploadAttachments.size(),
- 0,
- ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
- return {};
- }
- remotestore_impl::ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- fmt::format("{} remaining...", Remaining),
- UploadAttachments.size(),
- Remaining,
- ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
- }
- }
- for (const IoHash& AttachmentHash : MissingHashes)
- {
- auto It = UploadAttachments.find(AttachmentHash);
- ZEN_ASSERT(It != UploadAttachments.end());
- std::optional<CbObject> Op = Oplog.GetOpByKey(It->second.Key);
- ZEN_ASSERT(Op.has_value());
+ const remotestore_impl::FoundChunkedFile& AttachmentToChunk = AttachmentsToChunk[ChunkFileIndexToChunk];
+ const IoHash& RawHash = AttachmentToChunk.RawHash;
- if (IgnoreMissingAttachments)
- {
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key));
- }
- else
- {
- ExtendableStringBuilder<1024> Sb;
- Sb.Append("Failed to find attachment '");
- Sb.Append(AttachmentHash.ToHexString());
- Sb.Append("' for op: \n");
- Op.value().ToJson(Sb);
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
- return {};
- }
- UploadAttachments.erase(AttachmentHash);
- }
+ const IoBuffer& Buffer = AttachmentToChunk.Source;
+ IoBufferFileReference FileRef;
+ bool IsFile = Buffer.GetFileReference(FileRef);
+ ZEN_ASSERT(IsFile);
- std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes;
+ Stopwatch ChunkOneTimer;
- std::vector<ChunkedFile> ChunkedFiles(AttachmentsToChunk.size());
-
- {
- Latch ChunkFilesLatch(1);
+ uint64_t Offset = AttachmentToChunk.Offset;
+ uint64_t Size = AttachmentToChunk.Size;
- for (size_t ChunkFileIndexToChunk = 0; ChunkFileIndexToChunk < AttachmentsToChunk.size(); ChunkFileIndexToChunk++)
- {
- ChunkFilesLatch.AddCount(1);
+ BasicFile SourceFile;
+ SourceFile.Attach(FileRef.FileHandle);
+ auto __ = MakeGuard([&SourceFile]() { SourceFile.Detach(); });
- WorkerPool.ScheduleWork(
- [&ChunkFilesLatch, &AttachmentsToChunk, ChunkFileIndexToChunk, &ChunkedFiles, OptionalContext]() {
- ZEN_TRACE_CPU("ChunkFileAsync");
+ ChunkedFile& Chunked = ChunkedFiles[ChunkFileIndexToChunk];
+ Chunked.Source = Buffer;
+ Chunked.Chunked = ChunkData(SourceFile, Offset, Size, UShaderByteCodeParams);
+ ZEN_ASSERT(Chunked.Chunked.Info.RawHash == RawHash);
- auto _ = MakeGuard([&ChunkFilesLatch] { ChunkFilesLatch.CountDown(); });
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- return;
- }
-
- const FoundChunkedFile& AttachmentToChunk = AttachmentsToChunk[ChunkFileIndexToChunk];
- const IoHash& RawHash = AttachmentToChunk.RawHash;
-
- const IoBuffer& Buffer = AttachmentToChunk.Source;
- IoBufferFileReference FileRef;
- bool IsFile = Buffer.GetFileReference(FileRef);
- ZEN_ASSERT(IsFile);
-
- Stopwatch ChunkOneTimer;
-
- uint64_t Offset = AttachmentToChunk.Offset;
- uint64_t Size = AttachmentToChunk.Size;
-
- BasicFile SourceFile;
- SourceFile.Attach(FileRef.FileHandle);
- auto __ = MakeGuard([&SourceFile]() { SourceFile.Detach(); });
-
- ChunkedFile& Chunked = ChunkedFiles[ChunkFileIndexToChunk];
- Chunked.Source = Buffer;
- Chunked.Chunked = ChunkData(SourceFile, Offset, Size, UShaderByteCodeParams);
- ZEN_ASSERT(Chunked.Chunked.Info.RawHash == RawHash);
-
- ZEN_INFO("Chunked large attachment '{}' {} into {} chunks in {}",
- RawHash,
- NiceBytes(Chunked.Chunked.Info.RawSize),
- Chunked.Chunked.Info.ChunkHashes.size(),
- NiceTimeSpanMs(ChunkOneTimer.GetElapsedTimeMs()));
- },
- WorkerThreadPool::EMode::DisableBacklog);
- }
- ChunkFilesLatch.CountDown();
- ChunkFilesLatch.Wait();
- // TODO: Progress (change WorkerThreadPool::EMode::DisableBacklog)
+ ZEN_INFO("Chunked large attachment '{}' {} into {} chunks in {}",
+ RawHash,
+ NiceBytes(Chunked.Chunked.Info.RawSize),
+ Chunked.Chunked.Info.ChunkHashes.size(),
+ NiceTimeSpanMs(ChunkOneTimer.GetElapsedTimeMs()));
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
+ ChunkFilesLatch.CountDown();
+ ChunkFilesLatch.Wait();
+ // TODO: Progress (change WorkerThreadPool::EMode::DisableBacklog)
+ }
- for (const ChunkedFile& Chunked : ChunkedFiles)
+ for (const ChunkedFile& Chunked : ChunkedFiles)
+ {
+ UploadAttachments.erase(Chunked.Chunked.Info.RawHash);
+ for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
{
- UploadAttachments.erase(Chunked.Chunked.Info.RawHash);
- for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
- {
- UploadAttachments.erase(ChunkHash);
- }
+ UploadAttachments.erase(ChunkHash);
}
+ }
- size_t ChunkedChunkCount =
- std::accumulate(ChunkedFiles.begin(), ChunkedFiles.end(), size_t(0), [](size_t Current, const ChunkedFile& Value) {
- return Current + Value.Chunked.Info.ChunkHashes.size();
- });
+ size_t ChunkedChunkCount =
+ std::accumulate(ChunkedFiles.begin(), ChunkedFiles.end(), size_t(0), [](size_t Current, const ChunkedFile& Value) {
+ return Current + Value.Chunked.Info.ChunkHashes.size();
+ });
- size_t ReusedAttachmentCount = 0;
- std::vector<size_t> ReusedBlockIndexes;
- {
- std::unordered_set<IoHash, IoHash::Hasher> UniqueChunkHashes;
- UniqueChunkHashes.reserve(FoundHashes.size() + ChunkedChunkCount);
+ size_t ReusedAttachmentCount = 0;
+ std::vector<size_t> ReusedBlockIndexes;
+ {
+ std::unordered_set<IoHash, IoHash::Hasher> UniqueChunkHashes;
+ UniqueChunkHashes.reserve(FoundHashes.size() + ChunkedChunkCount);
- UniqueChunkHashes.insert(FoundHashes.begin(), FoundHashes.end());
+ UniqueChunkHashes.insert(FoundHashes.begin(), FoundHashes.end());
- for (ChunkedFile& Chunked : ChunkedFiles)
- {
- UniqueChunkHashes.insert(Chunked.Chunked.Info.ChunkHashes.begin(), Chunked.Chunked.Info.ChunkHashes.end());
- }
- std::vector<IoHash> ChunkHashes(UniqueChunkHashes.begin(), UniqueChunkHashes.end());
-
- std::vector<uint32_t> ChunkIndexes;
- ChunkIndexes.resize(ChunkHashes.size());
- std::iota(ChunkIndexes.begin(), ChunkIndexes.end(), 0);
-
- std::vector<uint32_t> UnusedChunkIndexes;
- ReuseBlocksStatistics ReuseBlocksStats;
-
- ReusedBlockIndexes = FindReuseBlocks(*LogOutput,
- /*BlockReuseMinPercentLimit*/ 80,
- /*IsVerbose*/ false,
- ReuseBlocksStats,
- KnownBlocks,
- ChunkHashes,
- ChunkIndexes,
- UnusedChunkIndexes);
- for (size_t KnownBlockIndex : ReusedBlockIndexes)
+ for (ChunkedFile& Chunked : ChunkedFiles)
+ {
+ UniqueChunkHashes.insert(Chunked.Chunked.Info.ChunkHashes.begin(), Chunked.Chunked.Info.ChunkHashes.end());
+ }
+ std::vector<IoHash> ChunkHashes(UniqueChunkHashes.begin(), UniqueChunkHashes.end());
+
+ std::vector<uint32_t> ChunkIndexes;
+ ChunkIndexes.resize(ChunkHashes.size());
+ std::iota(ChunkIndexes.begin(), ChunkIndexes.end(), 0);
+
+ std::vector<uint32_t> UnusedChunkIndexes;
+ ReuseBlocksStatistics ReuseBlocksStats;
+
+ ReusedBlockIndexes = FindReuseBlocks(*LogOutput,
+ /*BlockReuseMinPercentLimit*/ 80,
+ /*IsVerbose*/ false,
+ ReuseBlocksStats,
+ KnownBlocks,
+ ChunkHashes,
+ ChunkIndexes,
+ UnusedChunkIndexes);
+ for (size_t KnownBlockIndex : ReusedBlockIndexes)
+ {
+ const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
+ for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes)
{
- const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
- for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes)
+ if (UploadAttachments.erase(KnownHash) == 1)
{
- if (UploadAttachments.erase(KnownHash) == 1)
- {
- ReusedAttachmentCount++;
- }
+ ReusedAttachmentCount++;
}
}
}
+ }
- RwLock ResolveLock;
- std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
- std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher> LooseUploadAttachments;
+ RwLock ResolveLock;
+ std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
+ std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher> LooseUploadAttachments;
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount));
- Stopwatch ResolveAttachmentsProgressTimer;
- Latch ResolveAttachmentsLatch(1);
- for (auto& It : UploadAttachments)
+ Stopwatch ResolveAttachmentsProgressTimer;
+ Latch ResolveAttachmentsLatch(1);
+ for (auto& It : UploadAttachments)
+ {
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return {};
- }
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
- ResolveAttachmentsLatch.AddCount(1);
+ ResolveAttachmentsLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&ChunkStore,
- UploadAttachment = &It.second,
- RawHash = It.first,
- &ResolveAttachmentsLatch,
- &ResolveLock,
- &ChunkedHashes,
- &LargeChunkHashes,
- &LooseUploadAttachments,
- &MissingHashes,
- &OnLargeAttachment,
- &AttachmentTempPath,
- MaxChunkEmbedSize,
- ChunkFileSizeLimit,
- AllowChunking,
- &RemoteResult,
- OptionalContext]() {
- ZEN_TRACE_CPU("PrepareChunk");
+ WorkerPool.ScheduleWork(
+ [&ChunkStore,
+ UploadAttachment = &It.second,
+ RawHash = It.first,
+ &ResolveAttachmentsLatch,
+ &ResolveLock,
+ &ChunkedHashes,
+ &LargeChunkHashes,
+ &LooseUploadAttachments,
+ &MissingHashes,
+ &OnLargeAttachment,
+ &AttachmentTempPath,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ AllowChunking,
+ &RemoteResult,
+ OptionalContext]() {
+ ZEN_TRACE_CPU("PrepareChunk");
- auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); });
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- return;
- }
+ auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); });
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ return;
+ }
- try
+ try
+ {
+ ZEN_ASSERT(UploadAttachment->Size != 0);
+ if (!UploadAttachment->RawPath.empty())
{
- ZEN_ASSERT(UploadAttachment->Size != 0);
- if (!UploadAttachment->RawPath.empty())
+ if (UploadAttachment->Size > (MaxChunkEmbedSize * 2))
{
- if (UploadAttachment->Size > (MaxChunkEmbedSize * 2))
- {
- // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't
- // it will be a loose attachment instead of going into a block
- OnLargeAttachment(
- RawHash,
- [RawPath = UploadAttachment->RawPath, AttachmentTempPath, UploadAttachment](
- const IoHash& RawHash) -> CompositeBuffer {
- IoBuffer RawData = IoBufferBuilder::MakeFromFile(RawPath);
- if (!RawData)
- {
- throw std::runtime_error(
- fmt::format("Failed to read source file for blob {} from '{}'", RawHash, RawPath));
- }
-
- std::filesystem::path AttachmentPath = AttachmentTempPath;
- AttachmentPath.append(RawHash.ToHexString());
-
- IoBuffer TempAttachmentBuffer =
- remotestore_impl::CompressToTempFile(RawHash,
- RawData,
- AttachmentPath,
- OodleCompressor::Mermaid,
- OodleCompressionLevel::VeryFast);
- TempAttachmentBuffer.SetDeleteOnClose(true);
-
-#if ZEN_BUILD_DEBUG
- IoHash VerifyRawHash;
- uint64_t VerifyRawSize;
- ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer),
- VerifyRawHash,
- VerifyRawSize));
- ZEN_ASSERT_SLOW(VerifyRawHash == RawHash);
- ZEN_ASSERT_SLOW(VerifyRawSize == RawData.GetSize());
-#endif // ZEN_BUILD_DEBUG
-
- ZEN_INFO("Saved temp attachment to '{}', {} ({})",
- AttachmentPath,
- NiceBytes(UploadAttachment->Size),
- NiceBytes(TempAttachmentBuffer.GetSize()));
- return CompositeBuffer(SharedBuffer(std::move(TempAttachmentBuffer)));
- });
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
- }
- else
- {
- // Compress inline - check compressed size to see if it should go into a block or not
- IoBuffer RawData = IoBufferBuilder::MakeFromFile(UploadAttachment->RawPath);
- if (!RawData)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to read attachment {}", UploadAttachment->RawPath),
- "");
- return;
- }
+ // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't
+ // it will be a loose attachment instead of going into a block
+ OnLargeAttachment(
+ RawHash,
+ [RawPath = UploadAttachment->RawPath, AttachmentTempPath, UploadAttachment](
+ const IoHash& RawHash) -> CompositeBuffer {
+ IoBuffer RawData = IoBufferBuilder::MakeFromFile(RawPath);
+ if (!RawData)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed to read source file for blob {} from '{}'", RawHash, RawPath));
+ }
- std::filesystem::path TempFilePath = AttachmentTempPath;
- TempFilePath.append(RawHash.ToHexString());
+ std::filesystem::path AttachmentPath = AttachmentTempPath;
+ AttachmentPath.append(RawHash.ToHexString());
- try
- {
IoBuffer TempAttachmentBuffer = remotestore_impl::CompressToTempFile(RawHash,
RawData,
- TempFilePath,
+ AttachmentPath,
OodleCompressor::Mermaid,
OodleCompressionLevel::VeryFast);
TempAttachmentBuffer.SetDeleteOnClose(true);
+
#if ZEN_BUILD_DEBUG
IoHash VerifyRawHash;
uint64_t VerifyRawSize;
ZEN_ASSERT_SLOW(
CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer), VerifyRawHash, VerifyRawSize));
- ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer))
- .CompressedBuffer::Decompress());
ZEN_ASSERT_SLOW(VerifyRawHash == RawHash);
ZEN_ASSERT_SLOW(VerifyRawSize == RawData.GetSize());
#endif // ZEN_BUILD_DEBUG
- uint64_t CompressedSize = TempAttachmentBuffer.GetSize();
-
ZEN_INFO("Saved temp attachment to '{}', {} ({})",
- TempFilePath,
+ AttachmentPath,
NiceBytes(UploadAttachment->Size),
- NiceBytes(CompressedSize));
+ NiceBytes(TempAttachmentBuffer.GetSize()));
+ return CompositeBuffer(SharedBuffer(std::move(TempAttachmentBuffer)));
+ });
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ }
+ else
+ {
+ // Compress inline - check compressed size to see if it should go into a block or not
+ IoBuffer RawData = IoBufferBuilder::MakeFromFile(UploadAttachment->RawPath);
+ if (!RawData)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to read attachment {}", UploadAttachment->RawPath),
+ "");
+ return;
+ }
- if (CompressedSize > MaxChunkEmbedSize)
- {
- OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) {
- return CompositeBuffer(SharedBuffer(std::move(Data)));
- });
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
- }
- else
- {
- UploadAttachment->Size = CompressedSize;
- {
- IoHash VerifyRawHash2;
- uint64_t VerifyRawSize2;
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer),
- VerifyRawHash2,
- VerifyRawSize2);
- ZEN_ASSERT_SLOW(Compressed.Decompress());
- ZEN_ASSERT_SLOW(VerifyRawHash2 == RawHash);
- ZEN_ASSERT_SLOW(VerifyRawSize2 == RawData.GetSize());
- }
+ std::filesystem::path TempFilePath = AttachmentTempPath;
+ TempFilePath.append(RawHash.ToHexString());
- ResolveLock.WithExclusiveLock([RawHash,
- RawSize = RawData.GetSize(),
- &LooseUploadAttachments,
- Data = std::move(TempAttachmentBuffer)]() {
- LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data)));
- });
- }
- }
- catch (const std::system_error& SysEx)
+ try
+ {
+ IoBuffer TempAttachmentBuffer = remotestore_impl::CompressToTempFile(RawHash,
+ RawData,
+ TempFilePath,
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::VeryFast);
+ TempAttachmentBuffer.SetDeleteOnClose(true);
+#if ZEN_BUILD_DEBUG
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ ZEN_ASSERT_SLOW(
+ CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer), VerifyRawHash, VerifyRawSize));
+ ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer))
+ .CompressedBuffer::Decompress());
+ ZEN_ASSERT_SLOW(VerifyRawHash == RawHash);
+ ZEN_ASSERT_SLOW(VerifyRawSize == RawData.GetSize());
+#endif // ZEN_BUILD_DEBUG
+
+ uint64_t CompressedSize = TempAttachmentBuffer.GetSize();
+
+ ZEN_INFO("Saved temp attachment to '{}', {} ({})",
+ TempFilePath,
+ NiceBytes(UploadAttachment->Size),
+ NiceBytes(CompressedSize));
+
+ if (CompressedSize > MaxChunkEmbedSize)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to compress blob {} to temporary file '{}', reason: ({}) {}",
- RawHash,
- TempFilePath,
- SysEx.code().value(),
- SysEx.what()),
- "");
+ OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) {
+ return CompositeBuffer(SharedBuffer(std::move(Data)));
+ });
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
}
- catch (const std::exception& Ex)
+ else
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to compress blob {} to temporary file '{}', reason: {}",
- RawHash,
- TempFilePath,
- Ex.what()),
- "");
+ UploadAttachment->Size = CompressedSize;
+ {
+ IoHash VerifyRawHash2;
+ uint64_t VerifyRawSize2;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer),
+ VerifyRawHash2,
+ VerifyRawSize2);
+ ZEN_ASSERT_SLOW(Compressed.Decompress());
+ ZEN_ASSERT_SLOW(VerifyRawHash2 == RawHash);
+ ZEN_ASSERT_SLOW(VerifyRawSize2 == RawData.GetSize());
+ }
+
+ ResolveLock.WithExclusiveLock([RawHash,
+ RawSize = RawData.GetSize(),
+ &LooseUploadAttachments,
+ Data = std::move(TempAttachmentBuffer)]() {
+ LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data)));
+ });
}
}
- }
- else
- {
- if (UploadAttachment->Size > MaxChunkEmbedSize)
+ catch (const std::system_error& SysEx)
{
- OnLargeAttachment(RawHash, [&ChunkStore](const IoHash& RawHash) {
- return CompositeBuffer(SharedBuffer(std::move(ChunkStore.FindChunkByCid(RawHash))));
- });
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to compress blob {} to temporary file '{}', reason: ({}) {}",
+ RawHash,
+ TempFilePath,
+ SysEx.code().value(),
+ SysEx.what()),
+ "");
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to compress blob {} to temporary file '{}', reason: {}",
+ RawHash,
+ TempFilePath,
+ Ex.what()),
+ "");
}
}
}
- catch (const std::exception& Ex)
+ else
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to resolve attachment {}", RawHash),
- Ex.what());
+ if (UploadAttachment->Size > MaxChunkEmbedSize)
+ {
+ OnLargeAttachment(RawHash, [&ChunkStore](const IoHash& RawHash) {
+ return CompositeBuffer(SharedBuffer(std::move(ChunkStore.FindChunkByCid(RawHash))));
+ });
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ }
}
- },
- WorkerThreadPool::EMode::EnableBacklog);
- }
- ResolveAttachmentsLatch.CountDown();
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to resolve attachment {}", RawHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ ResolveAttachmentsLatch.CountDown();
+ {
+ while (!ResolveAttachmentsLatch.Wait(1000))
{
- while (!ResolveAttachmentsLatch.Wait(1000))
+ ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining();
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
- ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ while (!ResolveAttachmentsLatch.Wait(1000))
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- while (!ResolveAttachmentsLatch.Wait(1000))
- {
- Remaining = ResolveAttachmentsLatch.Remaining();
- remotestore_impl::ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- "Aborted"sv,
- UploadAttachments.size(),
- Remaining,
- ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
- }
+ Remaining = ResolveAttachmentsLatch.Remaining();
remotestore_impl::ReportProgress(OptionalContext,
"Resolving attachments"sv,
"Aborted"sv,
UploadAttachments.size(),
- 0,
+ Remaining,
ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
- return {};
}
remotestore_impl::ReportProgress(OptionalContext,
"Resolving attachments"sv,
- fmt::format("{} remaining...", Remaining),
- UploadAttachments.size(),
- Remaining,
- ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
- }
- if (UploadAttachments.size() > 0)
- {
- remotestore_impl::ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- ""sv,
+ "Aborted"sv,
UploadAttachments.size(),
0,
ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
+ return {};
}
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ fmt::format("{} remaining...", Remaining),
+ UploadAttachments.size(),
+ Remaining,
+ ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
}
-
- if (remotestore_impl::IsCancelled(OptionalContext))
+ if (UploadAttachments.size() > 0)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return {};
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ ""sv,
+ UploadAttachments.size(),
+ 0,
+ ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
}
+ }
- for (const auto& It : LargeChunkHashes)
- {
- UploadAttachments.erase(It);
- }
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
- RwLock BlocksLock;
- std::vector<ChunkBlockDescription> Blocks;
+ for (const auto& It : LargeChunkHashes)
+ {
+ UploadAttachments.erase(It);
+ }
- std::vector<std::pair<IoHash, Oid>> SortedUploadAttachments;
- SortedUploadAttachments.reserve(UploadAttachments.size());
- for (const auto& It : UploadAttachments)
- {
- SortedUploadAttachments.push_back(std::make_pair(It.first, It.second.Key));
- }
+ RwLock BlocksLock;
+ std::vector<ChunkBlockDescription> Blocks;
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return {};
- }
+ std::vector<std::pair<IoHash, Oid>> SortedUploadAttachments;
+ SortedUploadAttachments.reserve(UploadAttachments.size());
+ for (const auto& It : UploadAttachments)
+ {
+ SortedUploadAttachments.push_back(std::make_pair(It.first, It.second.Key));
+ }
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Sorting {} attachments from {} ops", SortedUploadAttachments.size(), TotalOpCount));
-
- // Sort attachments so we get predictable blocks for the same oplog upload
- std::sort(SortedUploadAttachments.begin(),
- SortedUploadAttachments.end(),
- [](const std::pair<IoHash, Oid>& Lhs, const std::pair<IoHash, Oid>& Rhs) {
- if (Lhs.second == Rhs.second)
- {
- // Same key, sort by raw hash
- return Lhs.first < Rhs.first;
- }
- // Sort by key
- return Lhs.second < Rhs.second;
- });
-
- std::vector<size_t> ChunkedFilesOrder;
- ChunkedFilesOrder.reserve(ChunkedFiles.size());
- for (size_t Index = 0; Index < ChunkedFiles.size(); Index++)
- {
- ChunkedFilesOrder.push_back(Index);
- }
- std::sort(ChunkedFilesOrder.begin(), ChunkedFilesOrder.end(), [&ChunkedFiles](size_t Lhs, size_t Rhs) {
- return ChunkedFiles[Lhs].Chunked.Info.RawHash < ChunkedFiles[Rhs].Chunked.Info.RawHash;
- });
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return {};
- }
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Assembling {} attachments and {} chunked parts from {} ops into blocks and loose attachments",
- SortedUploadAttachments.size(),
- ChunkedHashes.size(),
- TotalOpCount));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Sorting {} attachments from {} ops", SortedUploadAttachments.size(), TotalOpCount));
+
+ // Sort attachments so we get predictable blocks for the same oplog upload
+ std::sort(SortedUploadAttachments.begin(),
+ SortedUploadAttachments.end(),
+ [](const std::pair<IoHash, Oid>& Lhs, const std::pair<IoHash, Oid>& Rhs) {
+ if (Lhs.second == Rhs.second)
+ {
+ // Same key, sort by raw hash
+ return Lhs.first < Rhs.first;
+ }
+ // Sort by key
+ return Lhs.second < Rhs.second;
+ });
+
+ std::vector<size_t> ChunkedFilesOrder;
+ ChunkedFilesOrder.reserve(ChunkedFiles.size());
+ for (size_t Index = 0; Index < ChunkedFiles.size(); Index++)
+ {
+ ChunkedFilesOrder.push_back(Index);
+ }
+ std::sort(ChunkedFilesOrder.begin(), ChunkedFilesOrder.end(), [&ChunkedFiles](size_t Lhs, size_t Rhs) {
+ return ChunkedFiles[Lhs].Chunked.Info.RawHash < ChunkedFiles[Rhs].Chunked.Info.RawHash;
+ });
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return {};
- }
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Assembling {} attachments and {} chunked parts from {} ops into blocks and loose attachments",
+ SortedUploadAttachments.size(),
+ ChunkedHashes.size(),
+ TotalOpCount));
- size_t ChunkAssembleCount = SortedUploadAttachments.size() + ChunkedHashes.size();
- size_t ChunksAssembled = 0;
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount));
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
- Latch BlockCreateLatch(1);
- uint32_t ComposedBlocks = 0;
+ size_t ChunkAssembleCount = SortedUploadAttachments.size() + ChunkedHashes.size();
+ size_t ChunksAssembled = 0;
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount));
- uint64_t CreateBlocksStartMS = Timer.GetElapsedTimeMs();
- try
- {
- Stopwatch AssembleBlocksProgressTimer;
- remotestore_impl::BlockComposer Composer(remotestore_impl::BlockComposer::Configuration{
- .MaxBlockSize = MaxBlockSize,
- .MaxChunksPerBlock = MaxChunksPerBlock,
- .MaxChunkEmbedSize = MaxChunkEmbedSize,
- .IsCancelledFunc = [OptionalContext]() { return remotestore_impl::IsCancelled(OptionalContext); }});
-
- auto OnNewBlock = [&WorkerPool,
- BuildBlocks,
- &AssembleBlocksProgressTimer,
- &BlockCreateLatch,
- &BlocksLock,
- &Blocks,
- &AsyncOnBlock,
- &OnBlockChunks,
- ChunkAssembleCount,
- &ChunksAssembled,
- &ComposedBlocks,
- OptionalContext,
- &RemoteResult](std::vector<IoHash>&& ChunkRawHashes,
- const std::function<FetchChunkFunc(const IoHash& AttachmentHash)>& FetchAttachmentResolver) {
- size_t ChunkCount = ChunkRawHashes.size();
- std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock;
- ChunksInBlock.reserve(ChunkCount);
-
- for (const IoHash& AttachmentHash : ChunkRawHashes)
- {
- ChunksInBlock.emplace_back(std::make_pair(AttachmentHash, FetchAttachmentResolver(AttachmentHash)));
- }
+ Latch BlockCreateLatch(1);
+ uint32_t ComposedBlocks = 0;
- size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks);
- if (BuildBlocks)
- {
- remotestore_impl::AsyncCreateBlock(WorkerPool,
- BlockCreateLatch,
- std::move(ChunksInBlock),
- BlocksLock,
- Blocks,
- BlockIndex,
- AsyncOnBlock,
- RemoteResult);
- }
- else
- {
- ZEN_INFO("Bulk group {} attachments", ChunkCount);
+ uint64_t CreateBlocksStartMS = Timer.GetElapsedTimeMs();
+ try
+ {
+ Stopwatch AssembleBlocksProgressTimer;
+ remotestore_impl::BlockComposer Composer(remotestore_impl::BlockComposer::Configuration{
+ .MaxBlockSize = MaxBlockSize,
+ .MaxChunksPerBlock = MaxChunksPerBlock,
+ .MaxChunkEmbedSize = MaxChunkEmbedSize,
+ .IsCancelledFunc = [OptionalContext]() { return remotestore_impl::IsCancelled(OptionalContext); }});
+
+ auto OnNewBlock = [&WorkerPool,
+ BuildBlocks,
+ &AssembleBlocksProgressTimer,
+ &BlockCreateLatch,
+ &BlocksLock,
+ &Blocks,
+ &AsyncOnBlock,
+ &OnBlockChunks,
+ ChunkAssembleCount,
+ &ChunksAssembled,
+ &ComposedBlocks,
+ OptionalContext,
+ &RemoteResult](std::vector<IoHash>&& ChunkRawHashes,
+ const std::function<FetchChunkFunc(const IoHash& AttachmentHash)>& FetchAttachmentResolver) {
+ size_t ChunkCount = ChunkRawHashes.size();
+ std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock;
+ ChunksInBlock.reserve(ChunkCount);
+
+ for (const IoHash& AttachmentHash : ChunkRawHashes)
+ {
+ ChunksInBlock.emplace_back(std::make_pair(AttachmentHash, FetchAttachmentResolver(AttachmentHash)));
+ }
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope _(BlocksLock);
- Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes);
- OnBlockChunks(std::move(ChunksInBlock));
- }
+ size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks);
+ if (BuildBlocks)
+ {
+ remotestore_impl::AsyncCreateBlock(WorkerPool,
+ BlockCreateLatch,
+ std::move(ChunksInBlock),
+ BlocksLock,
+ Blocks,
+ BlockIndex,
+ AsyncOnBlock,
+ RemoteResult);
+ }
+ else
+ {
+ ZEN_INFO("Bulk group {} attachments", ChunkCount);
- ChunksAssembled += ChunkCount;
- ComposedBlocks++;
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope _(BlocksLock);
+ Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes);
+ OnBlockChunks(std::move(ChunksInBlock));
+ }
- if (ChunksAssembled % 1000 == 0)
- {
- remotestore_impl::ReportProgress(
- OptionalContext,
- "Assembling blocks"sv,
- fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
- ChunkAssembleCount,
- ChunkAssembleCount - ChunksAssembled,
- AssembleBlocksProgressTimer.GetElapsedTimeMs());
- }
- };
+ ChunksAssembled += ChunkCount;
+ ComposedBlocks++;
+
+ if (ChunksAssembled % 1000 == 0)
+ {
+ remotestore_impl::ReportProgress(
+ OptionalContext,
+ "Assembling blocks"sv,
+ fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
+ ChunkAssembleCount,
+ ChunkAssembleCount - ChunksAssembled,
+ AssembleBlocksProgressTimer.GetElapsedTimeMs());
+ }
+ };
+ {
+ std::vector<IoHash> AttachmentHashes;
+ AttachmentHashes.reserve(SortedUploadAttachments.size());
+ std::vector<uint64_t> AttachmentSizes;
+ AttachmentSizes.reserve(SortedUploadAttachments.size());
+ std::vector<Oid> AttachmentKeys;
+ AttachmentKeys.reserve(SortedUploadAttachments.size());
+
+ for (const std::pair<IoHash, Oid>& Attachment : SortedUploadAttachments)
{
- std::vector<IoHash> AttachmentHashes;
- AttachmentHashes.reserve(SortedUploadAttachments.size());
- std::vector<uint64_t> AttachmentSizes;
- AttachmentSizes.reserve(SortedUploadAttachments.size());
- std::vector<Oid> AttachmentKeys;
- AttachmentKeys.reserve(SortedUploadAttachments.size());
-
- for (const std::pair<IoHash, Oid>& Attachment : SortedUploadAttachments)
+ AttachmentHashes.push_back(Attachment.first);
+ if (auto It = UploadAttachments.find(Attachment.first); It != UploadAttachments.end())
{
- AttachmentHashes.push_back(Attachment.first);
- if (auto It = UploadAttachments.find(Attachment.first); It != UploadAttachments.end())
- {
- AttachmentSizes.push_back(It->second.Size);
- }
- else
- {
- RemoteResult.SetError(
- gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Attachment to upload state inconsistent, could not find attachment {}", Attachment.first),
- "");
- return {};
- }
- AttachmentKeys.push_back(Attachment.second);
+ AttachmentSizes.push_back(It->second.Size);
}
+ else
+ {
+ RemoteResult.SetError(
+ gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Attachment to upload state inconsistent, could not find attachment {}", Attachment.first),
+ "");
+ return {};
+ }
+ AttachmentKeys.push_back(Attachment.second);
+ }
- auto FetchWholeAttachmentResolver = [&LooseUploadAttachments, &ChunkStore](const IoHash& AttachmentHash) -> FetchChunkFunc {
- if (auto It = LooseUploadAttachments.find(AttachmentHash); It != LooseUploadAttachments.end())
- {
- uint64_t RawSize = It->second.first;
- IoBuffer Payload = It->second.second;
-
+ auto FetchWholeAttachmentResolver = [&LooseUploadAttachments, &ChunkStore](const IoHash& AttachmentHash) -> FetchChunkFunc {
+ if (auto It = LooseUploadAttachments.find(AttachmentHash); It != LooseUploadAttachments.end())
+ {
+ uint64_t RawSize = It->second.first;
+ IoBuffer Payload = It->second.second;
+
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), VerifyRawHash, VerifyRawSize);
+ ZEN_ASSERT_SLOW(Compressed.Decompress());
+ ZEN_ASSERT_SLOW(VerifyRawHash == AttachmentHash);
+ ZEN_ASSERT_SLOW(VerifyRawSize == RawSize);
+
+ LooseUploadAttachments.erase(It);
+ return [RawSize = RawSize,
+ Payload = SharedBuffer(Payload)](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> {
+ ZEN_UNUSED(ChunkHash);
+ // CompressedBuffer Compressed =
+ // CompressedBuffer::FromCompressedNoValidate(Payload.AsIoBuffer());
IoHash VerifyRawHash;
uint64_t VerifyRawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), VerifyRawHash, VerifyRawSize);
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Payload, VerifyRawHash, VerifyRawSize);
ZEN_ASSERT_SLOW(Compressed.Decompress());
- ZEN_ASSERT_SLOW(VerifyRawHash == AttachmentHash);
+ ZEN_ASSERT_SLOW(VerifyRawHash == ChunkHash);
ZEN_ASSERT_SLOW(VerifyRawSize == RawSize);
+ return {RawSize, Compressed};
+ };
+ }
+ else
+ {
+ ZEN_ASSERT_SLOW(ChunkStore.ContainsChunk(AttachmentHash));
+ return [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> {
+ IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash);
+ if (!Chunk)
+ {
+ throw std::runtime_error(fmt::format("Failed to find chunk {} in cid store", RawHash));
+ }
+ IoHash ValidateRawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), ValidateRawHash, RawSize);
+ if (!Compressed)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} in cid store is malformed (not a compressed buffer)", RawHash));
+ }
+ if (RawHash != ValidateRawHash)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} in cid store is malformed (mismatching raw hash)", RawHash));
+ }
+ return {RawSize, Compressed};
+ };
+ }
+ };
- LooseUploadAttachments.erase(It);
- return [RawSize = RawSize,
- Payload = SharedBuffer(Payload)](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> {
- ZEN_UNUSED(ChunkHash);
- // CompressedBuffer Compressed =
- // CompressedBuffer::FromCompressedNoValidate(Payload.AsIoBuffer());
- IoHash VerifyRawHash;
- uint64_t VerifyRawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Payload, VerifyRawHash, VerifyRawSize);
- ZEN_ASSERT_SLOW(Compressed.Decompress());
- ZEN_ASSERT_SLOW(VerifyRawHash == ChunkHash);
- ZEN_ASSERT_SLOW(VerifyRawSize == RawSize);
- return {RawSize, Compressed};
- };
- }
- else
- {
- ZEN_ASSERT_SLOW(ChunkStore.ContainsChunk(AttachmentHash));
- return [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> {
- IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash);
- if (!Chunk)
- {
- throw std::runtime_error(fmt::format("Failed to find chunk {} in cid store", RawHash));
- }
- IoHash ValidateRawHash;
- uint64_t RawSize = 0;
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), ValidateRawHash, RawSize);
- if (!Compressed)
- {
- throw std::runtime_error(
- fmt::format("Chunk {} in cid store is malformed (not a compressed buffer)", RawHash));
- }
- if (RawHash != ValidateRawHash)
- {
- throw std::runtime_error(fmt::format("Chunk {} in cid store is malformed (mismatching raw hash)", RawHash));
- }
- return {RawSize, Compressed};
- };
- }
- };
-
- Composer.Compose(AttachmentHashes,
- AttachmentSizes,
- AttachmentKeys,
- [&OnNewBlock, &FetchWholeAttachmentResolver](std::vector<IoHash>&& ChunkRawHashes) {
- OnNewBlock(std::move(ChunkRawHashes), FetchWholeAttachmentResolver);
- });
- }
+ Composer.Compose(AttachmentHashes,
+ AttachmentSizes,
+ AttachmentKeys,
+ [&OnNewBlock, &FetchWholeAttachmentResolver](std::vector<IoHash>&& ChunkRawHashes) {
+ OnNewBlock(std::move(ChunkRawHashes), FetchWholeAttachmentResolver);
+ });
+ }
- {
- std::vector<IoHash> AttachmentHashes;
- AttachmentHashes.reserve(ChunkedChunkCount);
- std::vector<uint64_t> AttachmentSizes;
- AttachmentSizes.reserve(ChunkedChunkCount);
- std::vector<Oid> AttachmentKeys;
- AttachmentKeys.reserve(ChunkedChunkCount);
+ {
+ std::vector<IoHash> AttachmentHashes;
+ AttachmentHashes.reserve(ChunkedChunkCount);
+ std::vector<uint64_t> AttachmentSizes;
+ AttachmentSizes.reserve(ChunkedChunkCount);
+ std::vector<Oid> AttachmentKeys;
+ AttachmentKeys.reserve(ChunkedChunkCount);
- tsl::robin_map<IoHash, std::pair<size_t, size_t>, IoHash::Hasher> ChunkHashToChunkFileIndexAndChunkIndex;
+ tsl::robin_map<IoHash, std::pair<size_t, size_t>, IoHash::Hasher> ChunkHashToChunkFileIndexAndChunkIndex;
- for (size_t ChunkedFileIndex : ChunkedFilesOrder)
+ for (size_t ChunkedFileIndex : ChunkedFilesOrder)
+ {
+ const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex];
+ const ChunkedInfoWithSource& Chunked = ChunkedFile.Chunked;
+ size_t ChunkCount = Chunked.Info.ChunkHashes.size();
+ Oid ChunkedFileOid = Oid::NewOid();
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++)
{
- const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex];
- const ChunkedInfoWithSource& Chunked = ChunkedFile.Chunked;
- size_t ChunkCount = Chunked.Info.ChunkHashes.size();
- Oid ChunkedFileOid = Oid::NewOid();
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++)
+ const IoHash& ChunkHash = Chunked.Info.ChunkHashes[ChunkIndex];
+ uint64_t ChunkSize = Chunked.ChunkSources[ChunkIndex].Size;
{
- const IoHash& ChunkHash = Chunked.Info.ChunkHashes[ChunkIndex];
- uint64_t ChunkSize = Chunked.ChunkSources[ChunkIndex].Size;
+ if (ChunkHashToChunkFileIndexAndChunkIndex
+ .insert(std::make_pair(ChunkHash, std::make_pair(ChunkedFileIndex, ChunkIndex)))
+ .second)
{
- if (ChunkHashToChunkFileIndexAndChunkIndex
- .insert(std::make_pair(ChunkHash, std::make_pair(ChunkedFileIndex, ChunkIndex)))
- .second)
+ if (ChunkSize > MaxChunkEmbedSize)
{
- if (ChunkSize > MaxChunkEmbedSize)
- {
- OnLargeAttachment(ChunkHash,
- [SourceBuffer = ChunkedFile.Source,
- ChunkSource = Chunked.ChunkSources[ChunkIndex],
- ChunkHash](const IoHash& RawHash) -> CompositeBuffer {
- ZEN_ASSERT(RawHash == ChunkHash);
- CompressedBuffer Compressed = CompressedBuffer::Compress(
- SharedBuffer(IoBuffer(SourceBuffer, ChunkSource.Offset, ChunkSource.Size)),
- OodleCompressor::Mermaid,
- OodleCompressionLevel::None);
- return Compressed.GetCompressed();
- });
-
- ResolveLock.WithExclusiveLock([ChunkHash, &LargeChunkHashes]() { LargeChunkHashes.insert(ChunkHash); });
- }
- else
- {
- AttachmentHashes.push_back(ChunkHash);
- AttachmentSizes.push_back(ChunkSize);
- AttachmentKeys.push_back(ChunkedFileOid);
- }
+ OnLargeAttachment(ChunkHash,
+ [SourceBuffer = ChunkedFile.Source,
+ ChunkSource = Chunked.ChunkSources[ChunkIndex],
+ ChunkHash](const IoHash& RawHash) -> CompositeBuffer {
+ ZEN_ASSERT(RawHash == ChunkHash);
+ CompressedBuffer Compressed = CompressedBuffer::Compress(
+ SharedBuffer(IoBuffer(SourceBuffer, ChunkSource.Offset, ChunkSource.Size)),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::None);
+ return Compressed.GetCompressed();
+ });
+
+ ResolveLock.WithExclusiveLock([ChunkHash, &LargeChunkHashes]() { LargeChunkHashes.insert(ChunkHash); });
+ }
+ else
+ {
+ AttachmentHashes.push_back(ChunkHash);
+ AttachmentSizes.push_back(ChunkSize);
+ AttachmentKeys.push_back(ChunkedFileOid);
}
}
}
}
-
- auto ChunkedFileAttachmentResolver = [&ChunkHashToChunkFileIndexAndChunkIndex,
- &ChunkedFiles](const IoHash& AttachmentHash) -> FetchChunkFunc {
- if (auto It = ChunkHashToChunkFileIndexAndChunkIndex.find(AttachmentHash);
- It != ChunkHashToChunkFileIndexAndChunkIndex.end())
- {
- const std::pair<size_t, size_t>& ChunkFileIndexAndChunkIndex = It->second;
- size_t ChunkedFileIndex = ChunkFileIndexAndChunkIndex.first;
- size_t ChunkIndex = ChunkFileIndexAndChunkIndex.second;
- const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex];
-
- const ChunkSource& Source = ChunkedFile.Chunked.ChunkSources[ChunkIndex];
- ZEN_ASSERT(Source.Offset + Source.Size <= ChunkedFile.Source.GetSize());
-
- return [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](
- const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
- return {Size,
- CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)),
- OodleCompressor::Mermaid,
- OodleCompressionLevel::None)};
- };
- }
- else
- {
- ZEN_ASSERT(false);
- }
- };
-
- Composer.Compose(AttachmentHashes,
- AttachmentSizes,
- AttachmentKeys,
- [&OnNewBlock, &ChunkedFileAttachmentResolver](std::vector<IoHash>&& ChunkRawHashes) {
- OnNewBlock(std::move(ChunkRawHashes), ChunkedFileAttachmentResolver);
- });
}
- if (ChunkAssembleCount > 0)
- {
- remotestore_impl::ReportProgress(
- OptionalContext,
- "Assembling blocks"sv,
- fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
- ChunkAssembleCount,
- 0,
- AssembleBlocksProgressTimer.GetElapsedTimeMs());
- }
-
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}",
- ChunkAssembleCount,
- TotalOpCount,
- ComposedBlocks,
- LargeChunkHashes.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs()))));
-
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- BlockCreateLatch.CountDown();
- while (!BlockCreateLatch.Wait(1000))
+ auto ChunkedFileAttachmentResolver = [&ChunkHashToChunkFileIndexAndChunkIndex,
+ &ChunkedFiles](const IoHash& AttachmentHash) -> FetchChunkFunc {
+ if (auto It = ChunkHashToChunkFileIndexAndChunkIndex.find(AttachmentHash);
+ It != ChunkHashToChunkFileIndexAndChunkIndex.end())
{
- ptrdiff_t Remaining = BlockCreateLatch.Remaining();
- remotestore_impl::ReportProgress(OptionalContext,
- "Assembling blocks"sv,
- fmt::format("Aborting, {} blocks remaining...", Remaining),
- ComposedBlocks,
- Remaining,
- AssembleBlocksProgressTimer.GetElapsedTimeMs());
+ const std::pair<size_t, size_t>& ChunkFileIndexAndChunkIndex = It->second;
+ size_t ChunkedFileIndex = ChunkFileIndexAndChunkIndex.first;
+ size_t ChunkIndex = ChunkFileIndexAndChunkIndex.second;
+ const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex];
+
+ const ChunkSource& Source = ChunkedFile.Chunked.ChunkSources[ChunkIndex];
+ ZEN_ASSERT(Source.Offset + Source.Size <= ChunkedFile.Source.GetSize());
+
+ return [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](
+ const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
+ return {Size,
+ CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::None)};
+ };
}
- if (ComposedBlocks > 0)
+ else
{
- remotestore_impl::ReportProgress(OptionalContext,
- "Assembling blocks"sv,
- fmt::format("Aborting, {} blocks remaining...", 0),
- ComposedBlocks,
- 0,
- AssembleBlocksProgressTimer.GetElapsedTimeMs());
+ ZEN_ASSERT(false);
}
- return {};
- }
+ };
+
+ Composer.Compose(AttachmentHashes,
+ AttachmentSizes,
+ AttachmentKeys,
+ [&OnNewBlock, &ChunkedFileAttachmentResolver](std::vector<IoHash>&& ChunkRawHashes) {
+ OnNewBlock(std::move(ChunkRawHashes), ChunkedFileAttachmentResolver);
+ });
}
- catch (const std::exception& Ex)
+
+ if (ChunkAssembleCount > 0)
{
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Assembling blocks"sv,
+ fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
+ ChunkAssembleCount,
+ 0,
+ AssembleBlocksProgressTimer.GetElapsedTimeMs());
+ }
+
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}",
+ ChunkAssembleCount,
+ TotalOpCount,
+ ComposedBlocks,
+ LargeChunkHashes.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs()))));
+
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
BlockCreateLatch.CountDown();
while (!BlockCreateLatch.Wait(1000))
{
+ ptrdiff_t Remaining = BlockCreateLatch.Remaining();
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Assembling blocks"sv,
+ fmt::format("Aborting, {} blocks remaining...", Remaining),
+ ComposedBlocks,
+ Remaining,
+ AssembleBlocksProgressTimer.GetElapsedTimeMs());
+ }
+ if (ComposedBlocks > 0)
+ {
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Assembling blocks"sv,
+ fmt::format("Aborting, {} blocks remaining...", 0),
+ ComposedBlocks,
+ 0,
+ AssembleBlocksProgressTimer.GetElapsedTimeMs());
}
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), "Block creation failed", Ex.what());
- throw;
+ return {};
}
-
- Stopwatch BlockCreateProgressTimer;
+ }
+ catch (const std::exception& Ex)
+ {
BlockCreateLatch.CountDown();
while (!BlockCreateLatch.Wait(1000))
{
- ptrdiff_t Remaining = BlockCreateLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
+ }
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), "Block creation failed", Ex.what());
+ throw;
+ }
+
+ Stopwatch BlockCreateProgressTimer;
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ptrdiff_t Remaining = BlockCreateLatch.Remaining();
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ while (!BlockCreateLatch.Wait(1000))
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- while (!BlockCreateLatch.Wait(1000))
- {
- Remaining = BlockCreateLatch.Remaining();
- remotestore_impl::ReportProgress(OptionalContext,
- "Creating blocks"sv,
- fmt::format("Aborting, {} blocks remaining...", Remaining),
- ComposedBlocks,
- Remaining,
- BlockCreateProgressTimer.GetElapsedTimeMs());
- }
+ Remaining = BlockCreateLatch.Remaining();
remotestore_impl::ReportProgress(OptionalContext,
"Creating blocks"sv,
- "Aborted"sv,
+ fmt::format("Aborting, {} blocks remaining...", Remaining),
ComposedBlocks,
- 0,
+ Remaining,
BlockCreateProgressTimer.GetElapsedTimeMs());
- return {};
}
remotestore_impl::ReportProgress(OptionalContext,
"Creating blocks"sv,
- fmt::format("{} remaining...", Remaining),
- ComposedBlocks,
- Remaining,
- BlockCreateProgressTimer.GetElapsedTimeMs());
- }
-
- if (ComposedBlocks > 0)
- {
- uint64_t NowMS = Timer.GetElapsedTimeMs();
- remotestore_impl::ReportProgress(OptionalContext,
- "Creating blocks"sv,
- ""sv,
+ "Aborted"sv,
ComposedBlocks,
0,
BlockCreateProgressTimer.GetElapsedTimeMs());
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Created {} blocks in {}", ComposedBlocks, NiceTimeSpanMs(NowMS - CreateBlocksStartMS)));
+ return {};
}
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Creating blocks"sv,
+ fmt::format("{} remaining...", Remaining),
+ ComposedBlocks,
+ Remaining,
+ BlockCreateProgressTimer.GetElapsedTimeMs());
+ }
- if (!RemoteResult.IsError())
+ if (ComposedBlocks > 0)
+ {
+ uint64_t NowMS = Timer.GetElapsedTimeMs();
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Creating blocks"sv,
+ ""sv,
+ ComposedBlocks,
+ 0,
+ BlockCreateProgressTimer.GetElapsedTimeMs());
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Created {} blocks in {}", ComposedBlocks, NiceTimeSpanMs(NowMS - CreateBlocksStartMS)));
+ }
+
+ if (!RemoteResult.IsError())
+ {
+ CbObjectWriter OplogContinerWriter;
+ RwLock::SharedLockScope _(BlocksLock);
+ OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer());
+ OplogContinerWriter.BeginArray("blocks"sv);
{
- CbObjectWriter OplogContinerWriter;
- RwLock::SharedLockScope _(BlocksLock);
- OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer());
- OplogContinerWriter.BeginArray("blocks"sv);
+ for (const ChunkBlockDescription& B : Blocks)
{
- for (const ChunkBlockDescription& B : Blocks)
+ ZEN_ASSERT(!B.ChunkRawHashes.empty());
+ if (BuildBlocks)
{
- ZEN_ASSERT(!B.ChunkRawHashes.empty());
- if (BuildBlocks)
- {
- ZEN_ASSERT(B.BlockHash != IoHash::Zero);
+ ZEN_ASSERT(B.BlockHash != IoHash::Zero);
- OplogContinerWriter.BeginObject();
- {
- OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash);
- OplogContinerWriter.BeginArray("chunks"sv);
- {
- for (const IoHash& RawHash : B.ChunkRawHashes)
- {
- OplogContinerWriter.AddHash(RawHash);
- }
- }
- OplogContinerWriter.EndArray(); // "chunks"
- }
- OplogContinerWriter.EndObject();
- continue;
- }
-
- ZEN_ASSERT(B.BlockHash == IoHash::Zero);
OplogContinerWriter.BeginObject();
{
+ OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash);
OplogContinerWriter.BeginArray("chunks"sv);
{
for (const IoHash& RawHash : B.ChunkRawHashes)
{
- OplogContinerWriter.AddBinaryAttachment(RawHash);
+ OplogContinerWriter.AddHash(RawHash);
}
}
- OplogContinerWriter.EndArray();
+ OplogContinerWriter.EndArray(); // "chunks"
}
OplogContinerWriter.EndObject();
+ continue;
}
+
+ ZEN_ASSERT(B.BlockHash == IoHash::Zero);
+ OplogContinerWriter.BeginObject();
+ {
+ OplogContinerWriter.BeginArray("chunks"sv);
+ {
+ for (const IoHash& RawHash : B.ChunkRawHashes)
+ {
+ OplogContinerWriter.AddBinaryAttachment(RawHash);
+ }
+ }
+ OplogContinerWriter.EndArray();
+ }
+ OplogContinerWriter.EndObject();
}
- OplogContinerWriter.EndArray(); // "blocks"sv
- OplogContinerWriter.BeginArray("chunkedfiles"sv);
+ }
+ OplogContinerWriter.EndArray(); // "blocks"sv
+ OplogContinerWriter.BeginArray("chunkedfiles"sv);
+ {
+ for (const ChunkedFile& F : ChunkedFiles)
{
- for (const ChunkedFile& F : ChunkedFiles)
+ OplogContinerWriter.BeginObject();
{
- OplogContinerWriter.BeginObject();
+ OplogContinerWriter.AddHash("rawhash"sv, F.Chunked.Info.RawHash);
+ OplogContinerWriter.AddInteger("rawsize"sv, F.Chunked.Info.RawSize);
+ OplogContinerWriter.BeginArray("chunks"sv);
{
- OplogContinerWriter.AddHash("rawhash"sv, F.Chunked.Info.RawHash);
- OplogContinerWriter.AddInteger("rawsize"sv, F.Chunked.Info.RawSize);
- OplogContinerWriter.BeginArray("chunks"sv);
+ for (const IoHash& RawHash : F.Chunked.Info.ChunkHashes)
{
- for (const IoHash& RawHash : F.Chunked.Info.ChunkHashes)
- {
- OplogContinerWriter.AddHash(RawHash);
- }
+ OplogContinerWriter.AddHash(RawHash);
}
- OplogContinerWriter.EndArray(); // "chunks"
- OplogContinerWriter.BeginArray("sequence"sv);
+ }
+ OplogContinerWriter.EndArray(); // "chunks"
+ OplogContinerWriter.BeginArray("sequence"sv);
+ {
+ for (uint32_t ChunkIndex : F.Chunked.Info.ChunkSequence)
{
- for (uint32_t ChunkIndex : F.Chunked.Info.ChunkSequence)
- {
- OplogContinerWriter.AddInteger(ChunkIndex);
- }
+ OplogContinerWriter.AddInteger(ChunkIndex);
}
- OplogContinerWriter.EndArray(); // "sequence"
}
- OplogContinerWriter.EndObject();
+ OplogContinerWriter.EndArray(); // "sequence"
}
+ OplogContinerWriter.EndObject();
}
- OplogContinerWriter.EndArray(); // "chunkedfiles"sv
+ }
+ OplogContinerWriter.EndArray(); // "chunkedfiles"sv
- OplogContinerWriter.BeginArray("chunks"sv);
+ OplogContinerWriter.BeginArray("chunks"sv);
+ {
+ for (const IoHash& AttachmentHash : LargeChunkHashes)
{
- for (const IoHash& AttachmentHash : LargeChunkHashes)
- {
- OplogContinerWriter.AddBinaryAttachment(AttachmentHash);
- }
+ OplogContinerWriter.AddBinaryAttachment(AttachmentHash);
}
- OplogContinerWriter.EndArray(); // "chunks"
-
- OplogContainerObject = OplogContinerWriter.Save();
}
+ OplogContinerWriter.EndArray(); // "chunks"
+
+ OplogContainerObject = OplogContinerWriter.Save();
}
+
return OplogContainerObject;
}
@@ -3370,8 +3405,6 @@ BuildContainer(CidStore& ChunkStore,
const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles)
{
- // WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background);
-
remotestore_impl::AsyncRemoteResult RemoteResult;
CbObject ContainerObject = BuildContainer(ChunkStore,
Project,