aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-12-10 09:09:51 +0100
committerGitHub Enterprise <[email protected]>2024-12-10 09:09:51 +0100
commitf714751342e996697e136dbf027cd12393fad493 (patch)
tree7c43858db1d2f36013960647460ae9237fa0c3a6 /src
parentauth fixes (#260) (diff)
downloadzen-f714751342e996697e136dbf027cd12393fad493.tar.xz
zen-f714751342e996697e136dbf027cd12393fad493.zip
improved payload validation in HttpClient (#259)
* improved payload validation in HttpClient * separate error messages for FromCompressed and Decompress * refactor so we can do retry if decompression of block fails
Diffstat (limited to 'src')
-rw-r--r--src/zenhttp/httpclient.cpp78
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp2691
2 files changed, 1510 insertions, 1259 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index 9af909fcf..d8ce25304 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -375,37 +375,75 @@ ShouldRetry(const cpr::Response& Response)
static bool
ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile>& PayloadFile)
{
- if (IsHttpSuccessCode(Response.status_code))
+ IoBuffer ResponseBuffer = (Response.text.empty() && PayloadFile) ? PayloadFile->BorrowIoBuffer()
+ : IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size());
+
+ if (auto ContentLength = Response.header.find("Content-Length"); ContentLength != Response.header.end())
+ {
+ std::optional<uint64_t> ExpectedContentSize = ParseInt<uint64_t>(ContentLength->second);
+ if (!ExpectedContentSize.has_value())
+ {
+ Response.error =
+ cpr::Error(/*CURLE_READ_ERROR*/ 26, fmt::format("Can not parse Content-Length header. Value: '{}'", ContentLength->second));
+ return false;
+ }
+ if (ExpectedContentSize.value() != ResponseBuffer.GetSize())
+ {
+ Response.error = cpr::Error(
+ /*CURLE_READ_ERROR*/ 26,
+ fmt::format("Payload size {} does not match Content-Length {}", ResponseBuffer.GetSize(), ContentLength->second));
+ return false;
+ }
+ }
+
+ if (auto JupiterHash = Response.header.find("X-Jupiter-IoHash"); JupiterHash != Response.header.end())
{
- if (auto ContentType = Response.header.find("Content-Type");
- ContentType != Response.header.end() && ContentType->second == "application/x-ue-comp")
+ IoHash ExpectedPayloadHash;
+ if (IoHash::TryParse(JupiterHash->second, ExpectedPayloadHash))
+ {
+ IoHash PayloadHash = IoHash::HashBuffer(ResponseBuffer);
+ if (PayloadHash != ExpectedPayloadHash)
+ {
+ Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26,
+ fmt::format("Payload hash {} does not match X-Jupiter-IoHash {}",
+ PayloadHash.ToHexString(),
+ ExpectedPayloadHash.ToHexString()));
+ return false;
+ }
+ }
+ }
+
+ if (auto ContentType = Response.header.find("Content-Type"); ContentType != Response.header.end())
+ {
+ if (ContentType->second == "application/x-ue-comp")
{
- IoBuffer ResponseBuffer = (Response.text.empty() && PayloadFile)
- ? PayloadFile->BorrowIoBuffer()
- : IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size());
IoHash RawHash;
uint64_t RawSize;
- if (!CompressedBuffer::ValidateCompressedHeader(ResponseBuffer, RawHash, RawSize))
+ if (CompressedBuffer::ValidateCompressedHeader(ResponseBuffer, RawHash, RawSize))
+ {
+ return true;
+ }
+ else
{
Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, "Compressed binary failed validation");
return false;
}
}
- else if (auto JupiterHash = Response.header.find("X-Jupiter-IoHash"); JupiterHash != Response.header.end())
+ if (ContentType->second == "application/x-ue-cb")
{
- IoHash ExpectedPayloadHash;
- if (IoHash::TryParse(JupiterHash->second, ExpectedPayloadHash))
+ if (CbValidateError Error = ValidateCompactBinary(ResponseBuffer.GetView(), CbValidateMode::Default);
+ Error == CbValidateError::None)
{
- IoBuffer ResponseBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size());
- IoHash PayloadHash = IoHash::HashBuffer(ResponseBuffer);
- if (PayloadHash != ExpectedPayloadHash)
- {
- Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, "Compressed binary failed validation");
- return false;
- }
+ return true;
+ }
+ else
+ {
+ Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, fmt::format("Compact binary failed validation: {}", ToString(Error)));
+ return false;
}
}
}
+
return true;
}
@@ -433,7 +471,11 @@ DoWithRetry(std::function<cpr::Response()>&& Func, std::unique_ptr<detail::TempP
{
if (!ShouldRetry(Result))
{
- if (Result.error || ValidatePayload(Result, PayloadFile))
+ if (Result.error || !IsHttpSuccessCode(Result.status_code))
+ {
+ break;
+ }
+ if (ValidatePayload(Result, PayloadFile))
{
break;
}
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 137db255c..49403d39c 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -54,94 +54,1086 @@ namespace zen {
uint8_t[chunksize])[ChunkCount]
}
*/
+namespace remotestore_impl {
+ ////////////////////////////// AsyncRemoteResult
-////////////////////////////// AsyncRemoteResult
+ struct AsyncRemoteResult
+ {
+ void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText)
+ {
+ int32_t Expected = 0;
+ if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1))
+ {
+ m_ErrorReason = ErrorReason;
+ m_ErrorText = ErrorText;
+ }
+ }
+ bool IsError() const { return m_ErrorCode.load() != 0; }
+ int GetError() const { return m_ErrorCode.load(); };
+ const std::string& GetErrorReason() const { return m_ErrorReason; };
+ const std::string& GetErrorText() const { return m_ErrorText; };
+ RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const
+ {
+ return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText};
+ }
-struct AsyncRemoteResult
-{
- void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText)
+ private:
+ std::atomic<int32_t> m_ErrorCode = 0;
+ std::string m_ErrorReason;
+ std::string m_ErrorText;
+ };
+
+ void ReportProgress(JobContext* OptionalContext,
+ std::string_view CurrentOp,
+ std::string_view Details,
+ ptrdiff_t Total,
+ ptrdiff_t Remaining)
{
- int32_t Expected = 0;
- if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1))
+ if (OptionalContext)
{
- m_ErrorReason = ErrorReason;
- m_ErrorText = ErrorText;
+ ZEN_ASSERT(Total > 0);
+ OptionalContext->ReportProgress(CurrentOp, Details, Total, Remaining);
}
}
- bool IsError() const { return m_ErrorCode.load() != 0; }
- int GetError() const { return m_ErrorCode.load(); };
- const std::string& GetErrorReason() const { return m_ErrorReason; };
- const std::string& GetErrorText() const { return m_ErrorText; };
- RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const
+
+ void ReportMessage(JobContext* OptionalContext, std::string_view Message)
{
- return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText};
+ if (OptionalContext)
+ {
+ OptionalContext->ReportMessage(Message);
+ }
+ ZEN_INFO("{}", Message);
}
-private:
- std::atomic<int32_t> m_ErrorCode = 0;
- std::string m_ErrorReason;
- std::string m_ErrorText;
-};
+ bool IsCancelled(JobContext* OptionalContext)
+ {
+ if (!OptionalContext)
+ {
+ return false;
+ }
+ return OptionalContext->IsCancelled();
+ }
-void
-ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, std::string_view Details, ptrdiff_t Total, ptrdiff_t Remaining)
-{
- if (OptionalContext)
+ std::string GetStats(const RemoteProjectStore::Stats& Stats, uint64_t ElapsedWallTimeMS)
{
- ZEN_ASSERT(Total > 0);
- OptionalContext->ReportProgress(CurrentOp, Details, Total, Remaining);
+ return fmt::format(
+ "Sent: {} ({}/s) Recv: {} ({}/s)",
+ NiceBytes(Stats.m_SentBytes),
+ NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000) / ElapsedWallTimeMS) : 0u),
+ NiceBytes(Stats.m_ReceivedBytes),
+ NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000) / ElapsedWallTimeMS) : 0u));
}
-}
-void
-ReportMessage(JobContext* OptionalContext, std::string_view Message)
-{
- if (OptionalContext)
+ void LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats)
{
- OptionalContext->ReportMessage(Message);
+ ZEN_INFO("Oplog request count: {}. Average request size: {}. Average request time: {}, Peak request speed: {}",
+ Stats.m_RequestCount,
+ NiceBytes(Stats.m_RequestCount > 0 ? (Stats.m_ReceivedBytes + Stats.m_SentBytes) / Stats.m_RequestCount : 0u),
+ NiceLatencyNs(Stats.m_RequestCount > 0 ? (Stats.m_RequestTimeNS / Stats.m_RequestCount) : 0u),
+ NiceBytes(Stats.m_PeakBytesPerSec));
+ ZEN_INFO(
+ "Oplog sent request avg: {} ({}/s). Peak: {}",
+ NiceBytes(Stats.m_RequestCount > 0u ? static_cast<uint64_t>((Stats.m_SentBytes) / Stats.m_RequestCount) : 0u),
+ NiceBytes(Stats.m_RequestTimeNS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000000000) / Stats.m_RequestTimeNS) : 0u),
+ NiceBytes(Stats.m_PeakSentBytes));
+ ZEN_INFO("Oplog recv request avg: {} ({}/s). Peak: {}",
+ NiceBytes(Stats.m_RequestCount > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes) / Stats.m_RequestCount) : 0u),
+ NiceBytes(Stats.m_RequestTimeNS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000000000) / Stats.m_RequestTimeNS)
+ : 0u),
+ NiceBytes(Stats.m_PeakReceivedBytes));
}
- ZEN_INFO("{}", Message);
-}
-bool
-IsCancelled(JobContext* OptionalContext)
-{
- if (!OptionalContext)
+ struct Block
{
- return false;
+ IoHash BlockHash;
+ std::vector<IoHash> ChunksInBlock;
+ };
+
+ size_t AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks)
+ {
+ size_t BlockIndex;
+ {
+ RwLock::ExclusiveLockScope _(BlocksLock);
+ BlockIndex = Blocks.size();
+ Blocks.resize(BlockIndex + 1);
+ }
+ return BlockIndex;
}
- return OptionalContext->IsCancelled();
-}
-std::string
-GetStats(const RemoteProjectStore::Stats& Stats, uint64_t ElapsedWallTimeMS)
-{
- return fmt::format("Sent: {} ({}/s) Recv: {} ({}/s)",
- NiceBytes(Stats.m_SentBytes),
- NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000) / ElapsedWallTimeMS) : 0u),
- NiceBytes(Stats.m_ReceivedBytes),
- NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000) / ElapsedWallTimeMS) : 0u));
-}
+ IoBuffer WriteToTempFile(CompressedBuffer&& CompressedBuffer, std::filesystem::path Path)
+ {
+ if (std::filesystem::is_regular_file(Path))
+ {
+ IoBuffer ExistingTempFile = IoBuffer(IoBufferBuilder::MakeFromFile(Path));
+ if (ExistingTempFile && ExistingTempFile.GetSize() == CompressedBuffer.GetCompressedSize())
+ {
+ ExistingTempFile.SetDeleteOnClose(true);
+ return ExistingTempFile;
+ }
+ }
+ IoBuffer BlockBuffer;
+ BasicFile BlockFile;
+ uint32_t RetriesLeft = 3;
+ BlockFile.Open(Path, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) {
+ if (RetriesLeft == 0)
+ {
+ return false;
+ }
+ ZEN_WARN("Failed to create temporary oplog block '{}': '{}', retries left: {}.", Path, Ec.message(), RetriesLeft);
+ Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms
+ RetriesLeft--;
+ return true;
+ });
+ uint64_t Offset = 0;
+ {
+ CompositeBuffer Compressed = std::move(CompressedBuffer).GetCompressed();
+ for (const SharedBuffer& Segment : Compressed.GetSegments())
+ {
+ size_t SegmentSize = Segment.GetSize();
+ static const uint64_t BufferingSize = 256u * 1024u;
-void
-LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats)
-{
- ZEN_INFO("Oplog request count: {}. Average request size: {}. Average request time: {}, Peak request speed: {}",
- Stats.m_RequestCount,
- NiceBytes(Stats.m_RequestCount > 0 ? (Stats.m_ReceivedBytes + Stats.m_SentBytes) / Stats.m_RequestCount : 0u),
- NiceLatencyNs(Stats.m_RequestCount > 0 ? (Stats.m_RequestTimeNS / Stats.m_RequestCount) : 0u),
- NiceBytes(Stats.m_PeakBytesPerSec));
- ZEN_INFO("Oplog sent request avg: {} ({}/s). Peak: {}",
- NiceBytes(Stats.m_RequestCount > 0u ? static_cast<uint64_t>((Stats.m_SentBytes) / Stats.m_RequestCount) : 0u),
- NiceBytes(Stats.m_RequestTimeNS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000000000) / Stats.m_RequestTimeNS) : 0u),
- NiceBytes(Stats.m_PeakSentBytes));
- ZEN_INFO(
- "Oplog recv request avg: {} ({}/s). Peak: {}",
- NiceBytes(Stats.m_RequestCount > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes) / Stats.m_RequestCount) : 0u),
- NiceBytes(Stats.m_RequestTimeNS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000000000) / Stats.m_RequestTimeNS) : 0u),
- NiceBytes(Stats.m_PeakReceivedBytes));
-}
+ IoBufferFileReference FileRef;
+ if (SegmentSize >= (BufferingSize + BufferingSize / 2) && Segment.GetFileReference(FileRef))
+ {
+ ScanFile(FileRef.FileHandle,
+ FileRef.FileChunkOffset,
+ FileRef.FileChunkSize,
+ BufferingSize,
+ [&BlockFile, &Offset](const void* Data, size_t Size) {
+ BlockFile.Write(Data, Size, Offset);
+ Offset += Size;
+ });
+ }
+ else
+ {
+ BlockFile.Write(Segment.GetData(), SegmentSize, Offset);
+ Offset += SegmentSize;
+ }
+ }
+ }
+ void* FileHandle = BlockFile.Detach();
+ BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true);
+ BlockBuffer.SetDeleteOnClose(true);
+ return BlockBuffer;
+ }
+
+ RemoteProjectStore::Result WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext)
+ {
+ using namespace std::literals;
+
+ Stopwatch Timer;
+
+ CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
+ const uint64_t OpCount = OpsArray.Num();
+
+ ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount));
+
+ const size_t OpsBatchSize = 8192;
+ std::vector<uint8_t> OpsData;
+ std::vector<size_t> OpDataOffsets;
+ size_t OpsCompleteCount = 0;
+
+ OpsData.reserve(OpsBatchSize);
+
+ auto AppendBatch = [&]() {
+ std::vector<CbObjectView> Ops;
+ Ops.reserve(OpDataOffsets.size());
+ for (size_t OpDataOffset : OpDataOffsets)
+ {
+ Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset]));
+ }
+ std::vector<uint32_t> OpLsns = Oplog.AppendNewOplogEntries(Ops);
+ OpsCompleteCount += OpLsns.size();
+ OpsData.clear();
+ OpDataOffsets.clear();
+ ReportProgress(OptionalContext,
+ "Writing oplog"sv,
+ fmt::format("{} remaining...", OpCount - OpsCompleteCount),
+ OpCount,
+ OpCount - OpsCompleteCount);
+ };
+
+ BinaryWriter Writer;
+ for (CbFieldView OpEntry : OpsArray)
+ {
+ CbObjectView Op = OpEntry.AsObjectView();
+ Op.CopyTo(Writer);
+ OpDataOffsets.push_back(OpsData.size());
+ OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize());
+ Writer.Reset();
+
+ if (OpDataOffsets.size() == OpsBatchSize)
+ {
+ AppendBatch();
+ }
+ }
+ if (!OpDataOffsets.empty())
+ {
+ AppendBatch();
+ }
+
+ ReportProgress(OptionalContext, "Writing oplog"sv, ""sv, OpCount, 0);
+
+ return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0};
+ }
+
+ struct DownloadInfo
+ {
+ uint64_t OplogSizeBytes = 0;
+ std::atomic<uint64_t> AttachmentsDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBlocksDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBytesDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0;
+ std::atomic<uint64_t> AttachmentsStored = 0;
+ std::atomic<uint64_t> AttachmentBytesStored = 0;
+ std::atomic_size_t MissingAttachmentCount = 0;
+ };
+
+ void DownloadAndSaveBlockChunks(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ bool IgnoreMissingAttachments,
+ JobContext* OptionalContext,
+ WorkerThreadPool& NetworkWorkerPool,
+ WorkerThreadPool& WorkerPool,
+ Latch& AttachmentsDownloadLatch,
+ Latch& AttachmentsWriteLatch,
+ AsyncRemoteResult& RemoteResult,
+ DownloadInfo& Info,
+ Stopwatch& LoadAttachmentsTimer,
+ std::atomic_uint64_t& DownloadStartMS,
+ const std::vector<IoHash>& Chunks)
+ {
+ AttachmentsDownloadLatch.AddCount(1);
+ NetworkWorkerPool.ScheduleWork([&RemoteStore,
+ &ChunkStore,
+ &WorkerPool,
+ &AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ &RemoteResult,
+ Chunks = Chunks,
+ &Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ IgnoreMissingAttachments,
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ try
+ {
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
+ RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
+ if (Result.ErrorCode)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to load attachments with {} chunks ({}): {}",
+ Chunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ }
+ return;
+ }
+ Info.AttachmentsDownloaded.fetch_add(Chunks.size());
+ ZEN_INFO("Loaded {} bulk attachments in {}",
+ Chunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ if (!Chunks.empty())
+ {
+ try
+ {
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ WriteAttachmentBuffers.reserve(Chunks.size());
+ WriteRawHashes.reserve(Chunks.size());
+
+ for (const auto& It : Chunks)
+ {
+ uint64_t ChunkSize = It.second.GetCompressedSize();
+ Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(It.first);
+ }
+ std::vector<CidStore::InsertResult> InsertResults =
+ ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
+
+ for (size_t Index = 0; Index < InsertResults.size(); Index++)
+ {
+ if (InsertResults[Index].New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to bulk save {} attachments", Chunks.size()),
+ Ex.what());
+ }
+ }
+ });
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to bulk load {} attachments", Chunks.size()),
+ Ex.what());
+ }
+ });
+ };
+
+ void DownloadAndSaveBlock(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ bool IgnoreMissingAttachments,
+ JobContext* OptionalContext,
+ WorkerThreadPool& NetworkWorkerPool,
+ WorkerThreadPool& WorkerPool,
+ Latch& AttachmentsDownloadLatch,
+ Latch& AttachmentsWriteLatch,
+ AsyncRemoteResult& RemoteResult,
+ DownloadInfo& Info,
+ Stopwatch& LoadAttachmentsTimer,
+ std::atomic_uint64_t& DownloadStartMS,
+ const IoHash& BlockHash,
+ const std::vector<IoHash>& Chunks,
+ uint32_t RetriesLeft)
+ {
+ AttachmentsDownloadLatch.AddCount(1);
+ NetworkWorkerPool.ScheduleWork([&AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ &ChunkStore,
+ &RemoteStore,
+ &NetworkWorkerPool,
+ &WorkerPool,
+ BlockHash,
+ &RemoteResult,
+ &Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ RetriesLeft,
+ Chunks = Chunks]() {
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ try
+ {
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
+ RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
+ if (BlockResult.ErrorCode)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to download block attachment {} ({}): {}",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
+ }
+ return;
+ }
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ uint64_t BlockSize = BlockResult.Bytes.GetSize();
+ Info.AttachmentBlocksDownloaded.fetch_add(1);
+ ZEN_INFO("Loaded block attachment '{}' in {} ({})",
+ BlockHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
+ NiceBytes(BlockSize));
+ Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
+
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ &ChunkStore,
+ &RemoteStore,
+ &NetworkWorkerPool,
+ &WorkerPool,
+ BlockHash,
+ &RemoteResult,
+ &Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ RetriesLeft,
+ Chunks = Chunks,
+ Bytes = std::move(BlockResult.Bytes)]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ try
+ {
+ ZEN_ASSERT(Bytes.Size() > 0);
+ std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
+ WantedChunks.reserve(Chunks.size());
+ WantedChunks.insert(Chunks.begin(), Chunks.end());
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize);
+ if (!Compressed)
+ {
+ if (RetriesLeft > 0)
+ {
+ ReportMessage(
+ OptionalContext,
+ fmt::format("Block attachment {} is malformed, can't parse as compressed binary, retrying download",
+ BlockHash));
+ return DownloadAndSaveBlock(ChunkStore,
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ NetworkWorkerPool,
+ WorkerPool,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockHash,
+ std::move(Chunks),
+ RetriesLeft - 1);
+ }
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash));
+ RemoteResult.SetError(
+ gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash),
+ {});
+ return;
+ }
+ SharedBuffer BlockPayload = Compressed.Decompress();
+ if (!BlockPayload)
+ {
+ if (RetriesLeft > 0)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} is malformed, can't decompress payload, retrying download",
+ BlockHash));
+ return DownloadAndSaveBlock(ChunkStore,
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ NetworkWorkerPool,
+ WorkerPool,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockHash,
+ std::move(Chunks),
+ RetriesLeft - 1);
+ }
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash));
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash),
+ {});
+ return;
+ }
+ if (RawHash != BlockHash)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash));
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash),
+ {});
+ return;
+ }
+
+ bool StoreChunksOK = IterateBlock(
+ BlockPayload,
+ [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk,
+ const IoHash& AttachmentRawHash) {
+ if (WantedChunks.contains(AttachmentRawHash))
+ {
+ WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ IoHash RawHash;
+ uint64_t RawSize;
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize));
+ ZEN_ASSERT(RawHash == AttachmentRawHash);
+ WriteRawHashes.emplace_back(AttachmentRawHash);
+ WantedChunks.erase(AttachmentRawHash);
+ }
+ });
+
+ if (!StoreChunksOK)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} has invalid format ({}): {}",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Invalid format for block {}", BlockHash),
+ {});
+ return;
+ }
+
+ ZEN_ASSERT(WantedChunks.empty());
+
+ if (!WriteAttachmentBuffers.empty())
+ {
+ auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < Results.size(); Index++)
+ {
+ const auto& Result = Results[Index];
+ if (Result.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed save block attachment {}", BlockHash),
+ Ex.what());
+ }
+ });
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to block attachment {}", BlockHash),
+ Ex.what());
+ }
+ });
+ };
+
+ void DownloadAndSaveAttachment(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ bool IgnoreMissingAttachments,
+ JobContext* OptionalContext,
+ WorkerThreadPool& NetworkWorkerPool,
+ WorkerThreadPool& WorkerPool,
+ Latch& AttachmentsDownloadLatch,
+ Latch& AttachmentsWriteLatch,
+ AsyncRemoteResult& RemoteResult,
+ DownloadInfo& Info,
+ Stopwatch& LoadAttachmentsTimer,
+ std::atomic_uint64_t& DownloadStartMS,
+ const IoHash& RawHash)
+ {
+ AttachmentsDownloadLatch.AddCount(1);
+ NetworkWorkerPool.ScheduleWork([&RemoteStore,
+ &ChunkStore,
+ &WorkerPool,
+ &RemoteResult,
+ &AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ RawHash,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ &Info,
+ IgnoreMissingAttachments,
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ try
+ {
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
+ if (AttachmentResult.ErrorCode)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to download large attachment {}: '{}', error code : {}",
+ RawHash,
+ AttachmentResult.Reason,
+ AttachmentResult.ErrorCode));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ }
+ return;
+ }
+ uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
+ ZEN_INFO("Loaded large attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
+ NiceBytes(AttachmentSize));
+ Info.AttachmentsDownloaded.fetch_add(1);
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
+
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&AttachmentsWriteLatch,
+ &RemoteResult,
+ &Info,
+ &ChunkStore,
+ RawHash,
+ AttachmentSize,
+ Bytes = std::move(AttachmentResult.Bytes),
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ try
+ {
+ CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(AttachmentSize);
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Saving attachment {} failed", RawHash),
+ Ex.what());
+ }
+ });
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Loading attachment {} failed", RawHash),
+ Ex.what());
+ }
+ });
+ };
+
+ void CreateBlock(WorkerThreadPool& WorkerPool,
+ Latch& OpSectionsLatch,
+ std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock,
+ RwLock& SectionsLock,
+ std::vector<Block>& Blocks,
+ size_t BlockIndex,
+ const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ AsyncRemoteResult& RemoteResult)
+ {
+ OpSectionsLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&Blocks,
+ &SectionsLock,
+ &OpSectionsLatch,
+ BlockIndex,
+ Chunks = std::move(ChunksInBlock),
+ &AsyncOnBlock,
+ &RemoteResult]() mutable {
+ auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ size_t ChunkCount = Chunks.size();
+ try
+ {
+ ZEN_ASSERT(ChunkCount > 0);
+ Stopwatch Timer;
+ CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks));
+ IoHash BlockHash = CompressedBlock.DecodeRawHash();
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope __(SectionsLock);
+ Blocks[BlockIndex].BlockHash = BlockHash;
+ }
+ uint64_t BlockSize = CompressedBlock.GetCompressedSize();
+ AsyncOnBlock(std::move(CompressedBlock), BlockHash);
+ ZEN_INFO("Generated block with {} attachments in {} ({})",
+ ChunkCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ NiceBytes(BlockSize));
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount),
+ Ex.what());
+ }
+ });
+ }
+
+ struct UploadInfo
+ {
+ uint64_t OplogSizeBytes = 0;
+ std::atomic<uint64_t> AttachmentsUploaded = 0;
+ std::atomic<uint64_t> AttachmentBlocksUploaded = 0;
+ std::atomic<uint64_t> AttachmentBytesUploaded = 0;
+ std::atomic<uint64_t> AttachmentBlockBytesUploaded = 0;
+ };
+
+ void UploadAttachments(WorkerThreadPool& WorkerPool,
+ CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments,
+ const std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>>& BlockChunks,
+ const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks,
+ const tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LooseFileAttachments,
+ const std::unordered_set<IoHash, IoHash::Hasher>& Needs,
+ bool ForceAll,
+ UploadInfo& Info,
+ AsyncRemoteResult& RemoteResult,
+ JobContext* OptionalContext)
+ {
+ using namespace std::literals;
+
+ if (Needs.empty() && !ForceAll)
+ {
+ return;
+ }
+
+ ReportMessage(OptionalContext, "Filtering needed attachments for upload...");
+
+ std::unordered_set<IoHash, IoHash::Hasher> AttachmentsToUpload;
+ std::unordered_map<IoHash, FetchChunkFunc, IoHash::Hasher> BulkBlockAttachmentsToUpload;
+
+ size_t BlockAttachmentCountToUpload = 0;
+ size_t LargeAttachmentCountToUpload = 0;
+ size_t BulkAttachmentCountToUpload = 0;
+ AttachmentsToUpload.reserve(ForceAll ? CreatedBlocks.size() + LargeAttachments.size() : Needs.size());
+
+ std::unordered_set<IoHash, IoHash::Hasher> UnknownAttachments(Needs);
+
+ for (const auto& CreatedBlock : CreatedBlocks)
+ {
+ if (ForceAll || Needs.contains(CreatedBlock.first))
+ {
+ AttachmentsToUpload.insert(CreatedBlock.first);
+ BlockAttachmentCountToUpload++;
+ UnknownAttachments.erase(CreatedBlock.first);
+ }
+ }
+ for (const IoHash& LargeAttachment : LargeAttachments)
+ {
+ if (ForceAll || Needs.contains(LargeAttachment))
+ {
+ AttachmentsToUpload.insert(LargeAttachment);
+ LargeAttachmentCountToUpload++;
+ UnknownAttachments.erase(LargeAttachment);
+ }
+ }
+ for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& BlockHashes : BlockChunks)
+ {
+ for (const std::pair<IoHash, FetchChunkFunc>& Chunk : BlockHashes)
+ {
+ if (ForceAll || Needs.contains(Chunk.first))
+ {
+ BulkBlockAttachmentsToUpload.insert(std::make_pair(Chunk.first, Chunk.second));
+ BulkAttachmentCountToUpload++;
+ UnknownAttachments.erase(Chunk.first);
+ }
+ }
+ }
+
+ if (AttachmentsToUpload.empty() && BulkBlockAttachmentsToUpload.empty())
+ {
+ ReportMessage(OptionalContext, "No attachments needed");
+ return;
+ }
+
+ if (!UnknownAttachments.empty())
+ {
+ RemoteResult.SetError(
+ gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Upload requested of {} missing attachments, the base container referenced blocks that are no longer available",
+ UnknownAttachments.size()),
+ "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return;
+ }
+
+ if (IsCancelled(OptionalContext))
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ }
+ return;
+ }
+
+ ReportMessage(OptionalContext,
+ fmt::format("Saving {} attachments ({} blocks, {} attachments, {} bulk attachments)",
+ AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(),
+ BlockAttachmentCountToUpload,
+ LargeAttachmentCountToUpload,
+ BulkAttachmentCountToUpload));
+
+ Stopwatch Timer;
+
+ ptrdiff_t AttachmentsToSave(0);
+ Latch SaveAttachmentsLatch(1);
+
+ for (const IoHash& RawHash : AttachmentsToUpload)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+
+ SaveAttachmentsLatch.AddCount(1);
+ AttachmentsToSave++;
+ WorkerPool.ScheduleWork([&ChunkStore,
+ &RemoteStore,
+ &SaveAttachmentsLatch,
+ &RemoteResult,
+ RawHash,
+ &CreatedBlocks,
+ &LooseFileAttachments,
+ &Info,
+ OptionalContext]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ try
+ {
+ bool IsBlock = false;
+ IoBuffer Payload;
+ if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
+ {
+ Payload = BlockIt->second;
+ IsBlock = true;
+ }
+ else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
+ {
+ Payload = LooseTmpFileIt->second(RawHash);
+ }
+ else
+ {
+ Payload = ChunkStore.FindChunkByCid(RawHash);
+ }
+ if (!Payload)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to find attachment {}", RawHash),
+ {});
+ ZEN_WARN("Failed to save attachment '{}' ({}): {}",
+ RawHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ size_t PayloadSize = Payload.GetSize();
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to save attachment '{}', {} ({}): {}",
+ RawHash,
+ NiceBytes(PayloadSize),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ return;
+ }
+ if (IsBlock)
+ {
+ Info.AttachmentBlocksUploaded.fetch_add(1);
+ Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize);
+ ZEN_INFO("Saved block attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(PayloadSize));
+ }
+ else
+ {
+ Info.AttachmentsUploaded.fetch_add(1);
+ Info.AttachmentBytesUploaded.fetch_add(PayloadSize);
+ ZEN_INFO("Saved large attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(PayloadSize));
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("To upload attachment {}", RawHash),
+ Ex.what());
+ }
+ });
+ }
+
+ if (IsCancelled(OptionalContext))
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ }
+ return;
+ }
+
+ if (!BulkBlockAttachmentsToUpload.empty())
+ {
+ for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& Chunks : BlockChunks)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+
+ std::vector<IoHash> NeededChunks;
+ NeededChunks.reserve(Chunks.size());
+ for (const std::pair<IoHash, FetchChunkFunc>& Chunk : Chunks)
+ {
+ const IoHash& ChunkHash = Chunk.first;
+ if (BulkBlockAttachmentsToUpload.contains(ChunkHash) && !AttachmentsToUpload.contains(ChunkHash))
+ {
+ NeededChunks.push_back(Chunk.first);
+ }
+ }
+ if (NeededChunks.empty())
+ {
+ continue;
+ }
+
+ SaveAttachmentsLatch.AddCount(1);
+ AttachmentsToSave++;
+ WorkerPool.ScheduleWork([&RemoteStore,
+ &ChunkStore,
+ &SaveAttachmentsLatch,
+ &RemoteResult,
+ NeededChunks = std::move(NeededChunks),
+ &BulkBlockAttachmentsToUpload,
+ &Info,
+ OptionalContext]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ try
+ {
+ size_t ChunksSize = 0;
+ std::vector<SharedBuffer> ChunkBuffers;
+ ChunkBuffers.reserve(NeededChunks.size());
+ for (const IoHash& Chunk : NeededChunks)
+ {
+ auto It = BulkBlockAttachmentsToUpload.find(Chunk);
+ ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end());
+ CompositeBuffer ChunkPayload = It->second(It->first);
+ if (!ChunkPayload)
+ {
+ RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
+ fmt::format("Missing chunk {}"sv, Chunk),
+ fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
+ ChunkBuffers.clear();
+ break;
+ }
+ ChunksSize += ChunkPayload.GetSize();
+ ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer()));
+ }
+ RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to save attachments with {} chunks ({}): {}",
+ NeededChunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ return;
+ }
+ Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size());
+ Info.AttachmentBytesUploaded.fetch_add(ChunksSize);
+
+ ZEN_INFO("Saved {} bulk attachments in {} ({})",
+ NeededChunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(ChunksSize));
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to buck upload {} attachments", NeededChunks.size()),
+ Ex.what());
+ }
+ });
+ }
+ }
+
+ SaveAttachmentsLatch.CountDown();
+ while (!SaveAttachmentsLatch.Wait(1000))
+ {
+ ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining();
+ if (IsCancelled(OptionalContext))
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ }
+ }
+ uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs();
+ ReportProgress(OptionalContext,
+ "Saving attachments"sv,
+ fmt::format("{} remaining... {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)),
+ AttachmentsToSave,
+ Remaining);
+ }
+ uint64_t ElapsedTimeMS = Timer.GetElapsedTimeMs();
+ if (AttachmentsToSave > 0)
+ {
+ ReportProgress(OptionalContext,
+ "Saving attachments"sv,
+ fmt::format("{}", GetStats(RemoteStore.GetStats(), ElapsedTimeMS)),
+ AttachmentsToSave,
+ 0);
+ }
+ ReportMessage(OptionalContext,
+ fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {} {}",
+ AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(),
+ BlockAttachmentCountToUpload,
+ LargeAttachmentCountToUpload,
+ BulkAttachmentCountToUpload,
+ NiceTimeSpanMs(ElapsedTimeMS),
+ GetStats(RemoteStore.GetStats(), ElapsedTimeMS)));
+ }
+
+} // namespace remotestore_impl
bool
IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
{
@@ -219,128 +1211,6 @@ GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks)
return CompressedBlock;
}
-struct Block
-{
- IoHash BlockHash;
- std::vector<IoHash> ChunksInBlock;
-};
-
-void
-CreateBlock(WorkerThreadPool& WorkerPool,
- Latch& OpSectionsLatch,
- std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock,
- RwLock& SectionsLock,
- std::vector<Block>& Blocks,
- size_t BlockIndex,
- const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
- AsyncRemoteResult& RemoteResult)
-{
- OpSectionsLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&Blocks, &SectionsLock, &OpSectionsLatch, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable {
- auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- size_t ChunkCount = Chunks.size();
- try
- {
- ZEN_ASSERT(ChunkCount > 0);
- Stopwatch Timer;
- CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks));
- IoHash BlockHash = CompressedBlock.DecodeRawHash();
- {
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope __(SectionsLock);
- Blocks[BlockIndex].BlockHash = BlockHash;
- }
- uint64_t BlockSize = CompressedBlock.GetCompressedSize();
- AsyncOnBlock(std::move(CompressedBlock), BlockHash);
- ZEN_INFO("Generated block with {} attachments in {} ({})",
- ChunkCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- NiceBytes(BlockSize));
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount),
- Ex.what());
- }
- });
-}
-
-static size_t
-AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks)
-{
- size_t BlockIndex;
- {
- RwLock::ExclusiveLockScope _(BlocksLock);
- BlockIndex = Blocks.size();
- Blocks.resize(BlockIndex + 1);
- }
- return BlockIndex;
-}
-
-static IoBuffer
-WriteToTempFile(CompressedBuffer&& CompressedBuffer, std::filesystem::path Path)
-{
- if (std::filesystem::is_regular_file(Path))
- {
- IoBuffer ExistingTempFile = IoBuffer(IoBufferBuilder::MakeFromFile(Path));
- if (ExistingTempFile && ExistingTempFile.GetSize() == CompressedBuffer.GetCompressedSize())
- {
- ExistingTempFile.SetDeleteOnClose(true);
- return ExistingTempFile;
- }
- }
- IoBuffer BlockBuffer;
- BasicFile BlockFile;
- uint32_t RetriesLeft = 3;
- BlockFile.Open(Path, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) {
- if (RetriesLeft == 0)
- {
- return false;
- }
- ZEN_WARN("Failed to create temporary oplog block '{}': '{}', retries left: {}.", Path, Ec.message(), RetriesLeft);
- Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms
- RetriesLeft--;
- return true;
- });
- uint64_t Offset = 0;
- {
- CompositeBuffer Compressed = std::move(CompressedBuffer).GetCompressed();
- for (const SharedBuffer& Segment : Compressed.GetSegments())
- {
- size_t SegmentSize = Segment.GetSize();
- static const uint64_t BufferingSize = 256u * 1024u;
-
- IoBufferFileReference FileRef;
- if (SegmentSize >= (BufferingSize + BufferingSize / 2) && Segment.GetFileReference(FileRef))
- {
- ScanFile(FileRef.FileHandle,
- FileRef.FileChunkOffset,
- FileRef.FileChunkSize,
- BufferingSize,
- [&BlockFile, &Offset](const void* Data, size_t Size) {
- BlockFile.Write(Data, Size, Offset);
- Offset += Size;
- });
- }
- else
- {
- BlockFile.Write(Segment.GetData(), SegmentSize, Offset);
- Offset += SegmentSize;
- }
- }
- }
- void* FileHandle = BlockFile.Detach();
- BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true);
- BlockBuffer.SetDeleteOnClose(true);
- return BlockBuffer;
-}
-
CbObject
BuildContainer(CidStore& ChunkStore,
ProjectStore::Project& Project,
@@ -351,14 +1221,14 @@ BuildContainer(CidStore& ChunkStore,
bool BuildBlocks,
bool IgnoreMissingAttachments,
bool AllowChunking,
- const std::vector<Block>& KnownBlocks,
+ const std::vector<remotestore_impl::Block>& KnownBlocks,
WorkerThreadPool& WorkerPool,
const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles,
JobContext* OptionalContext,
- AsyncRemoteResult& RemoteResult)
+ remotestore_impl::AsyncRemoteResult& RemoteResult)
{
using namespace std::literals;
@@ -375,9 +1245,9 @@ BuildContainer(CidStore& ChunkStore,
std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments;
- RwLock BlocksLock;
- std::vector<Block> Blocks;
- CompressedBuffer OpsBuffer;
+ RwLock BlocksLock;
+ std::vector<remotestore_impl::Block> Blocks;
+ CompressedBuffer OpsBuffer;
std::filesystem::path AttachmentTempPath = Oplog.TempPath();
AttachmentTempPath.append(".pending");
@@ -397,11 +1267,12 @@ BuildContainer(CidStore& ChunkStore,
for (CbFieldView& Field : Files)
{
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
CB(Op);
return;
}
@@ -418,8 +1289,9 @@ BuildContainer(CidStore& ChunkStore,
std::filesystem::path FilePath = Project.RootDir / ServerPath;
if (!std::filesystem::is_regular_file(FilePath))
{
- ReportMessage(OptionalContext,
- fmt::format("Missing attachment '{}' for op '{}'", FilePath, View["id"sv].AsObjectId()));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Missing attachment '{}' for op '{}'", FilePath, View["id"sv].AsObjectId()));
if (IgnoreMissingAttachments)
{
continue;
@@ -497,7 +1369,7 @@ BuildContainer(CidStore& ChunkStore,
CB(RewrittenOp);
};
- ReportMessage(OptionalContext, "Building exported oplog and collecting attachments");
+ remotestore_impl::ReportMessage(OptionalContext, "Building exported oplog and collecting attachments");
Stopwatch Timer;
@@ -525,80 +1397,87 @@ BuildContainer(CidStore& ChunkStore,
SectionOpsWriter << Op;
}
OpCount++;
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return;
}
if (OpCount % 1000 == 0)
{
- ReportProgress(OptionalContext,
- "Building oplog"sv,
- fmt::format("{} ops processed", OpCount),
- TotalOpCount,
- TotalOpCount - OpCount);
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Building oplog"sv,
+ fmt::format("{} ops processed", OpCount),
+ TotalOpCount,
+ TotalOpCount - OpCount);
}
});
if (RemoteResult.IsError())
{
return {};
}
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return {};
}
if (TotalOpCount > 0)
{
- ReportProgress(OptionalContext, "Building oplog"sv, fmt::format("{} ops processed", OpCount), TotalOpCount, 0);
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Building oplog"sv,
+ fmt::format("{} ops processed", OpCount),
+ TotalOpCount,
+ 0);
}
}
SectionOpsWriter.EndArray(); // "ops"
- ReportMessage(OptionalContext,
- fmt::format("Rewrote {} ops to new oplog in {}",
- OpCount,
- NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs()))));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Rewrote {} ops to new oplog in {}",
+ OpCount,
+ NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs()))));
{
Stopwatch CompressOpsTimer;
CompressedOpsSection =
CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer(), OodleCompressor::Mermaid, OodleCompressionLevel::Fast);
- ReportMessage(OptionalContext,
- fmt::format("Compressed oplog section {} ({} -> {}) in {}",
- CompressedOpsSection.DecodeRawHash(),
- NiceBytes(CompressedOpsSection.DecodeRawSize()),
- NiceBytes(CompressedOpsSection.GetCompressedSize()),
- NiceTimeSpanMs(static_cast<uint64_t>(CompressOpsTimer.GetElapsedTimeMs()))));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Compressed oplog section {} ({} -> {}) in {}",
+ CompressedOpsSection.DecodeRawHash(),
+ NiceBytes(CompressedOpsSection.DecodeRawSize()),
+ NiceBytes(CompressedOpsSection.GetCompressedSize()),
+ NiceTimeSpanMs(static_cast<uint64_t>(CompressOpsTimer.GetElapsedTimeMs()))));
}
}
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return {};
}
- auto FindReuseBlocks = [](const std::vector<Block>& KnownBlocks,
+ auto FindReuseBlocks = [](const std::vector<remotestore_impl::Block>& KnownBlocks,
const std::unordered_set<IoHash, IoHash::Hasher>& Attachments,
JobContext* OptionalContext) -> std::vector<size_t> {
std::vector<size_t> ReuseBlockIndexes;
if (!Attachments.empty() && !KnownBlocks.empty())
{
- ReportMessage(
+ remotestore_impl::ReportMessage(
OptionalContext,
fmt::format("Checking {} Attachments against {} known blocks for reuse", Attachments.size(), KnownBlocks.size()));
Stopwatch ReuseTimer;
for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++)
{
- const Block& KnownBlock = KnownBlocks[KnownBlockIndex];
- size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size();
+ const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size();
if (BlockAttachmentCount == 0)
{
continue;
@@ -645,7 +1524,7 @@ BuildContainer(CidStore& ChunkStore,
std::vector<size_t> ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext);
for (size_t KnownBlockIndex : ReusedBlockIndexes)
{
- const Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
{
if (UploadAttachments.erase(KnownHash) == 1)
@@ -698,15 +1577,17 @@ BuildContainer(CidStore& ChunkStore,
std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> LooseUploadAttachments;
std::unordered_set<IoHash, IoHash::Hasher> MissingHashes;
- ReportMessage(OptionalContext, fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount));
Latch ResolveAttachmentsLatch(1);
for (auto& It : UploadAttachments)
{
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return {};
}
@@ -732,7 +1613,7 @@ BuildContainer(CidStore& ChunkStore,
&RemoteResult,
OptionalContext]() {
auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); });
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
return;
}
@@ -775,7 +1656,8 @@ BuildContainer(CidStore& ChunkStore,
std::filesystem::path AttachmentPath = AttachmentTempPath;
AttachmentPath.append(RawHash.ToHexString());
- IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed), AttachmentPath);
+ IoBuffer TempAttachmentBuffer =
+ remotestore_impl::WriteToTempFile(std::move(Compressed), AttachmentPath);
ZEN_INFO("Saved temp attachment to '{}', {} ({})",
AttachmentPath,
NiceBytes(RawSize),
@@ -794,7 +1676,7 @@ BuildContainer(CidStore& ChunkStore,
std::filesystem::path AttachmentPath = AttachmentTempPath;
AttachmentPath.append(RawHash.ToHexString());
- IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed), AttachmentPath);
+ IoBuffer TempAttachmentBuffer = remotestore_impl::WriteToTempFile(std::move(Compressed), AttachmentPath);
ZEN_INFO("Saved temp attachment to '{}', {} ({})",
AttachmentPath,
NiceBytes(RawSize),
@@ -910,37 +1792,39 @@ BuildContainer(CidStore& ChunkStore,
while (!ResolveAttachmentsLatch.Wait(1000))
{
ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining();
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
while (!ResolveAttachmentsLatch.Wait(1000))
{
Remaining = ResolveAttachmentsLatch.Remaining();
- ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- fmt::format("Aborting, {} attachments remaining...", Remaining),
- UploadAttachments.size(),
- Remaining);
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ fmt::format("Aborting, {} attachments remaining...", Remaining),
+ UploadAttachments.size(),
+ Remaining);
}
- ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), 0);
+ remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), 0);
return {};
}
- ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- fmt::format("{} remaining...", Remaining),
- UploadAttachments.size(),
- Remaining);
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ fmt::format("{} remaining...", Remaining),
+ UploadAttachments.size(),
+ Remaining);
}
if (UploadAttachments.size() > 0)
{
- ReportProgress(OptionalContext, "Resolving attachments"sv, ""sv, UploadAttachments.size(), 0);
+ remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, ""sv, UploadAttachments.size(), 0);
}
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return {};
}
@@ -953,7 +1837,8 @@ BuildContainer(CidStore& ChunkStore,
if (IgnoreMissingAttachments)
{
- ReportMessage(OptionalContext, fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key));
}
else
{
@@ -980,7 +1865,7 @@ BuildContainer(CidStore& ChunkStore,
std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext);
for (size_t KnownBlockIndex : ReusedBlockIndexes)
{
- const Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ const remotestore_impl::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
{
if (ChunkedHashes.erase(KnownHash) == 1)
@@ -1001,7 +1886,8 @@ BuildContainer(CidStore& ChunkStore,
{
Blocks.push_back(KnownBlocks[*It]);
}
- ReportMessage(OptionalContext, fmt::format("Reused {} attachments from {} blocks", ReusedAttachmentCount, ReuseBlockCount));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Reused {} attachments from {} blocks", ReusedAttachmentCount, ReuseBlockCount));
}
std::vector<std::pair<IoHash, Oid>> SortedUploadAttachments;
@@ -1011,14 +1897,16 @@ BuildContainer(CidStore& ChunkStore,
SortedUploadAttachments.push_back(std::make_pair(It.first, It.second.Key));
}
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return {};
}
- ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", SortedUploadAttachments.size(), TotalOpCount));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Sorting {} attachments from {} ops", SortedUploadAttachments.size(), TotalOpCount));
// Sort attachments so we get predictable blocks for the same oplog upload
std::sort(SortedUploadAttachments.begin(),
@@ -1046,22 +1934,25 @@ BuildContainer(CidStore& ChunkStore,
// SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded
// ChunkedHashes contains all chunked up chunks to be composed into blocks
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return {};
}
- ReportMessage(OptionalContext,
- fmt::format("Assembling {} attachments and {} chunked parts from {} ops into blocks and loose attachments",
- SortedUploadAttachments.size(),
- ChunkedHashes.size(),
- TotalOpCount));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Assembling {} attachments and {} chunked parts from {} ops into blocks and loose attachments",
+ SortedUploadAttachments.size(),
+ ChunkedHashes.size(),
+ TotalOpCount));
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return {};
}
@@ -1070,7 +1961,8 @@ BuildContainer(CidStore& ChunkStore,
size_t ChunkAssembleCount = SortedUploadAttachments.size() + ChunkedHashes.size();
size_t ChunksAssembled = 0;
- ReportMessage(OptionalContext, fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount));
Latch BlockCreateLatch(1);
size_t GeneratedBlockCount = 0;
@@ -1090,14 +1982,14 @@ BuildContainer(CidStore& ChunkStore,
size_t ChunkCount = ChunksInBlock.size();
if (BuildBlocks)
{
- CreateBlock(WorkerPool,
- BlockCreateLatch,
- std::move(ChunksInBlock),
- BlocksLock,
- Blocks,
- BlockIndex,
- AsyncOnBlock,
- RemoteResult);
+ remotestore_impl::CreateBlock(WorkerPool,
+ BlockCreateLatch,
+ std::move(ChunksInBlock),
+ BlocksLock,
+ Blocks,
+ BlockIndex,
+ AsyncOnBlock,
+ RemoteResult);
ComposedBlocks++;
}
else
@@ -1127,20 +2019,22 @@ BuildContainer(CidStore& ChunkStore,
for (auto HashIt = SortedUploadAttachments.begin(); HashIt != SortedUploadAttachments.end(); HashIt++)
{
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
break;
}
if (ChunksAssembled % 1000 == 0)
{
- ReportProgress(OptionalContext,
- "Assembling blocks"sv,
- fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
- ChunkAssembleCount,
- ChunkAssembleCount - ChunksAssembled);
+ remotestore_impl::ReportProgress(
+ OptionalContext,
+ "Assembling blocks"sv,
+ fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
+ ChunkAssembleCount,
+ ChunkAssembleCount - ChunksAssembled);
}
const IoHash& RawHash(HashIt->first);
const Oid CurrentOpKey = HashIt->second;
@@ -1190,20 +2084,22 @@ BuildContainer(CidStore& ChunkStore,
size_t ChunkCount = Chunked.Info.ChunkHashes.size();
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++)
{
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
break;
}
if (ChunksAssembled % 1000 == 0)
{
- ReportProgress(OptionalContext,
- "Assembling blocks"sv,
- fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
- ChunkAssembleCount,
- ChunkAssembleCount - ChunksAssembled);
+ remotestore_impl::ReportProgress(
+ OptionalContext,
+ "Assembling blocks"sv,
+ fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
+ ChunkAssembleCount,
+ ChunkAssembleCount - ChunksAssembled);
}
const IoHash& ChunkHash = ChunkedFile.Chunked.Info.ChunkHashes[ChunkIndex];
if (auto FindIt = ChunkedHashes.find(ChunkHash); FindIt != ChunkedHashes.end())
@@ -1237,7 +2133,7 @@ BuildContainer(CidStore& ChunkStore,
if (BlockSize > 0 && !RemoteResult.IsError())
{
- if (!IsCancelled(OptionalContext))
+ if (!remotestore_impl::IsCancelled(OptionalContext))
{
NewBlock();
}
@@ -1245,41 +2141,43 @@ BuildContainer(CidStore& ChunkStore,
if (ChunkAssembleCount > 0)
{
- ReportProgress(OptionalContext,
- "Assembling blocks"sv,
- fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
- ChunkAssembleCount,
- 0);
+ remotestore_impl::ReportProgress(
+ OptionalContext,
+ "Assembling blocks"sv,
+ fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
+ ChunkAssembleCount,
+ 0);
}
- ReportMessage(OptionalContext,
- fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and in {}",
- ChunkAssembleCount,
- TotalOpCount,
- GeneratedBlockCount,
- NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs()))));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and in {}",
+ ChunkAssembleCount,
+ TotalOpCount,
+ GeneratedBlockCount,
+ NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs()))));
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
BlockCreateLatch.CountDown();
while (!BlockCreateLatch.Wait(1000))
{
ptrdiff_t Remaining = BlockCreateLatch.Remaining();
- ReportProgress(OptionalContext,
- "Assembling blocks"sv,
- fmt::format("Aborting, {} blocks remaining...", Remaining),
- GeneratedBlockCount,
- Remaining);
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Assembling blocks"sv,
+ fmt::format("Aborting, {} blocks remaining...", Remaining),
+ GeneratedBlockCount,
+ Remaining);
}
if (GeneratedBlockCount > 0)
{
- ReportProgress(OptionalContext,
- "Assembling blocks"sv,
- fmt::format("Aborting, {} blocks remaining...", 0),
- GeneratedBlockCount,
- 0);
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Assembling blocks"sv,
+ fmt::format("Aborting, {} blocks remaining...", 0),
+ GeneratedBlockCount,
+ 0);
}
return {};
}
@@ -1298,31 +2196,37 @@ BuildContainer(CidStore& ChunkStore,
while (!BlockCreateLatch.Wait(1000))
{
ptrdiff_t Remaining = BlockCreateLatch.Remaining();
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
while (!BlockCreateLatch.Wait(1000))
{
Remaining = BlockCreateLatch.Remaining();
- ReportProgress(OptionalContext,
- "Creating blocks"sv,
- fmt::format("Aborting, {} blocks remaining...", Remaining),
- GeneratedBlockCount,
- Remaining);
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Creating blocks"sv,
+ fmt::format("Aborting, {} blocks remaining...", Remaining),
+ GeneratedBlockCount,
+ Remaining);
}
- ReportProgress(OptionalContext, "Creating blocks"sv, "Aborted"sv, GeneratedBlockCount, 0);
+ remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, "Aborted"sv, GeneratedBlockCount, 0);
return {};
}
- ReportProgress(OptionalContext, "Creating blocks"sv, fmt::format("{} remaining...", Remaining), GeneratedBlockCount, Remaining);
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Creating blocks"sv,
+ fmt::format("{} remaining...", Remaining),
+ GeneratedBlockCount,
+ Remaining);
}
if (GeneratedBlockCount > 0)
{
uint64_t NowMS = Timer.GetElapsedTimeMs();
- ReportProgress(OptionalContext, "Creating blocks"sv, ""sv, GeneratedBlockCount, 0);
- ReportMessage(OptionalContext,
- fmt::format("Created {} blocks in {}", GeneratedBlockCount, NiceTimeSpanMs(NowMS - CreateBlocksStartMS)));
+ remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, ""sv, GeneratedBlockCount, 0);
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Created {} blocks in {}", GeneratedBlockCount, NiceTimeSpanMs(NowMS - CreateBlocksStartMS)));
}
if (!RemoteResult.IsError())
@@ -1333,7 +2237,7 @@ BuildContainer(CidStore& ChunkStore,
OplogContinerWriter.BeginArray("blocks"sv);
{
- for (const Block& B : Blocks)
+ for (const remotestore_impl::Block& B : Blocks)
{
ZEN_ASSERT(!B.ChunksInBlock.empty());
if (BuildBlocks)
@@ -1434,366 +2338,27 @@ BuildContainer(CidStore& ChunkStore,
{
WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background);
- AsyncRemoteResult RemoteResult;
- CbObject ContainerObject = BuildContainer(ChunkStore,
- Project,
- Oplog,
- MaxBlockSize,
- MaxChunkEmbedSize,
- ChunkFileSizeLimit,
- BuildBlocks,
- IgnoreMissingAttachments,
- AllowChunking,
- {},
- WorkerPool,
- AsyncOnBlock,
- OnLargeAttachment,
- OnBlockChunks,
- EmbedLooseFiles,
- nullptr,
- RemoteResult);
+ remotestore_impl::AsyncRemoteResult RemoteResult;
+ CbObject ContainerObject = BuildContainer(ChunkStore,
+ Project,
+ Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ BuildBlocks,
+ IgnoreMissingAttachments,
+ AllowChunking,
+ {},
+ WorkerPool,
+ AsyncOnBlock,
+ OnLargeAttachment,
+ OnBlockChunks,
+ EmbedLooseFiles,
+ nullptr,
+ RemoteResult);
return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject};
}
-struct UploadInfo
-{
- uint64_t OplogSizeBytes = 0;
- std::atomic<uint64_t> AttachmentsUploaded = 0;
- std::atomic<uint64_t> AttachmentBlocksUploaded = 0;
- std::atomic<uint64_t> AttachmentBytesUploaded = 0;
- std::atomic<uint64_t> AttachmentBlockBytesUploaded = 0;
-};
-
-void
-UploadAttachments(WorkerThreadPool& WorkerPool,
- CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments,
- const std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>>& BlockChunks,
- const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks,
- const tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LooseFileAttachments,
- const std::unordered_set<IoHash, IoHash::Hasher>& Needs,
- bool ForceAll,
- UploadInfo& Info,
- AsyncRemoteResult& RemoteResult,
- JobContext* OptionalContext)
-{
- using namespace std::literals;
-
- if (Needs.empty() && !ForceAll)
- {
- return;
- }
-
- ReportMessage(OptionalContext, "Filtering needed attachments for upload...");
-
- std::unordered_set<IoHash, IoHash::Hasher> AttachmentsToUpload;
- std::unordered_map<IoHash, FetchChunkFunc, IoHash::Hasher> BulkBlockAttachmentsToUpload;
-
- size_t BlockAttachmentCountToUpload = 0;
- size_t LargeAttachmentCountToUpload = 0;
- size_t BulkAttachmentCountToUpload = 0;
- AttachmentsToUpload.reserve(ForceAll ? CreatedBlocks.size() + LargeAttachments.size() : Needs.size());
-
- std::unordered_set<IoHash, IoHash::Hasher> UnknownAttachments(Needs);
-
- for (const auto& CreatedBlock : CreatedBlocks)
- {
- if (ForceAll || Needs.contains(CreatedBlock.first))
- {
- AttachmentsToUpload.insert(CreatedBlock.first);
- BlockAttachmentCountToUpload++;
- UnknownAttachments.erase(CreatedBlock.first);
- }
- }
- for (const IoHash& LargeAttachment : LargeAttachments)
- {
- if (ForceAll || Needs.contains(LargeAttachment))
- {
- AttachmentsToUpload.insert(LargeAttachment);
- LargeAttachmentCountToUpload++;
- UnknownAttachments.erase(LargeAttachment);
- }
- }
- for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& BlockHashes : BlockChunks)
- {
- for (const std::pair<IoHash, FetchChunkFunc>& Chunk : BlockHashes)
- {
- if (ForceAll || Needs.contains(Chunk.first))
- {
- BulkBlockAttachmentsToUpload.insert(std::make_pair(Chunk.first, Chunk.second));
- BulkAttachmentCountToUpload++;
- UnknownAttachments.erase(Chunk.first);
- }
- }
- }
-
- if (AttachmentsToUpload.empty() && BulkBlockAttachmentsToUpload.empty())
- {
- ReportMessage(OptionalContext, "No attachments needed");
- return;
- }
-
- if (!UnknownAttachments.empty())
- {
- RemoteResult.SetError(
- gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Upload requested of {} missing attachments, the base container referenced blocks that are no longer available",
- UnknownAttachments.size()),
- "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return;
- }
-
- if (IsCancelled(OptionalContext))
- {
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- }
- return;
- }
-
- ReportMessage(OptionalContext,
- fmt::format("Saving {} attachments ({} blocks, {} attachments, {} bulk attachments)",
- AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(),
- BlockAttachmentCountToUpload,
- LargeAttachmentCountToUpload,
- BulkAttachmentCountToUpload));
-
- Stopwatch Timer;
-
- ptrdiff_t AttachmentsToSave(0);
- Latch SaveAttachmentsLatch(1);
-
- for (const IoHash& RawHash : AttachmentsToUpload)
- {
- if (RemoteResult.IsError())
- {
- break;
- }
-
- SaveAttachmentsLatch.AddCount(1);
- AttachmentsToSave++;
- WorkerPool.ScheduleWork([&ChunkStore,
- &RemoteStore,
- &SaveAttachmentsLatch,
- &RemoteResult,
- RawHash,
- &CreatedBlocks,
- &LooseFileAttachments,
- &Info,
- OptionalContext]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- bool IsBlock = false;
- IoBuffer Payload;
- if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
- {
- Payload = BlockIt->second;
- IsBlock = true;
- }
- else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
- {
- Payload = LooseTmpFileIt->second(RawHash);
- }
- else
- {
- Payload = ChunkStore.FindChunkByCid(RawHash);
- }
- if (!Payload)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to find attachment {}", RawHash),
- {});
- ZEN_WARN("Failed to save attachment '{}' ({}): {}", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason());
- return;
- }
- size_t PayloadSize = Payload.GetSize();
- RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash);
- if (Result.ErrorCode)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachment '{}', {} ({}): {}",
- RawHash,
- NiceBytes(PayloadSize),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- return;
- }
- if (IsBlock)
- {
- Info.AttachmentBlocksUploaded.fetch_add(1);
- Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize);
- ZEN_INFO("Saved block attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(PayloadSize));
- }
- else
- {
- Info.AttachmentsUploaded.fetch_add(1);
- Info.AttachmentBytesUploaded.fetch_add(PayloadSize);
- ZEN_INFO("Saved large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(PayloadSize));
- }
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("To upload attachment {}", RawHash),
- Ex.what());
- }
- });
- }
-
- if (IsCancelled(OptionalContext))
- {
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- }
- return;
- }
-
- if (!BulkBlockAttachmentsToUpload.empty())
- {
- for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& Chunks : BlockChunks)
- {
- if (RemoteResult.IsError())
- {
- break;
- }
-
- std::vector<IoHash> NeededChunks;
- NeededChunks.reserve(Chunks.size());
- for (const std::pair<IoHash, FetchChunkFunc>& Chunk : Chunks)
- {
- const IoHash& ChunkHash = Chunk.first;
- if (BulkBlockAttachmentsToUpload.contains(ChunkHash) && !AttachmentsToUpload.contains(ChunkHash))
- {
- NeededChunks.push_back(Chunk.first);
- }
- }
- if (NeededChunks.empty())
- {
- continue;
- }
-
- SaveAttachmentsLatch.AddCount(1);
- AttachmentsToSave++;
- WorkerPool.ScheduleWork([&RemoteStore,
- &ChunkStore,
- &SaveAttachmentsLatch,
- &RemoteResult,
- NeededChunks = std::move(NeededChunks),
- &BulkBlockAttachmentsToUpload,
- &Info,
- OptionalContext]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- size_t ChunksSize = 0;
- std::vector<SharedBuffer> ChunkBuffers;
- ChunkBuffers.reserve(NeededChunks.size());
- for (const IoHash& Chunk : NeededChunks)
- {
- auto It = BulkBlockAttachmentsToUpload.find(Chunk);
- ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end());
- CompositeBuffer ChunkPayload = It->second(It->first);
- if (!ChunkPayload)
- {
- RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
- fmt::format("Missing chunk {}"sv, Chunk),
- fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
- ChunkBuffers.clear();
- break;
- }
- ChunksSize += ChunkPayload.GetSize();
- ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer()));
- }
- RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
- if (Result.ErrorCode)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachments with {} chunks ({}): {}",
- NeededChunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- return;
- }
- Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size());
- Info.AttachmentBytesUploaded.fetch_add(ChunksSize);
-
- ZEN_INFO("Saved {} bulk attachments in {} ({})",
- NeededChunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(ChunksSize));
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to buck upload {} attachments", NeededChunks.size()),
- Ex.what());
- }
- });
- }
- }
-
- SaveAttachmentsLatch.CountDown();
- while (!SaveAttachmentsLatch.Wait(1000))
- {
- ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining();
- if (IsCancelled(OptionalContext))
- {
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- }
- }
- uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs();
- ReportProgress(OptionalContext,
- "Saving attachments"sv,
- fmt::format("{} remaining... {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)),
- AttachmentsToSave,
- Remaining);
- }
- uint64_t ElapsedTimeMS = Timer.GetElapsedTimeMs();
- if (AttachmentsToSave > 0)
- {
- ReportProgress(OptionalContext,
- "Saving attachments"sv,
- fmt::format("{}", GetStats(RemoteStore.GetStats(), ElapsedTimeMS)),
- AttachmentsToSave,
- 0);
- }
- ReportMessage(OptionalContext,
- fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {} {}",
- AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(),
- BlockAttachmentCountToUpload,
- LargeAttachmentCountToUpload,
- BulkAttachmentCountToUpload,
- NiceTimeSpanMs(ElapsedTimeMS),
- GetStats(RemoteStore.GetStats(), ElapsedTimeMS)));
-}
-
RemoteProjectStore::Result
SaveOplog(CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
@@ -1811,7 +2376,7 @@ SaveOplog(CidStore& ChunkStore,
Stopwatch Timer;
- UploadInfo Info;
+ remotestore_impl::UploadInfo Info;
WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background);
WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(EWorkloadType::Background);
@@ -1826,7 +2391,7 @@ SaveOplog(CidStore& ChunkStore,
CreateDirectories(AttachmentTempPath);
}
- AsyncRemoteResult RemoteResult;
+ remotestore_impl::AsyncRemoteResult RemoteResult;
RwLock AttachmentsLock;
std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments;
std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks;
@@ -1838,7 +2403,7 @@ SaveOplog(CidStore& ChunkStore,
BlockPath.append(BlockHash.ToHexString());
try
{
- IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock), BlockPath);
+ IoBuffer BlockBuffer = remotestore_impl::WriteToTempFile(std::move(CompressedBlock), BlockPath);
RwLock::ExclusiveLockScope __(AttachmentsLock);
CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)});
ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize()));
@@ -1857,8 +2422,9 @@ SaveOplog(CidStore& ChunkStore,
if (Result.ErrorCode)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachment ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Failed to save attachment ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return;
}
Info.AttachmentBlocksUploaded.fetch_add(1);
@@ -1892,13 +2458,14 @@ SaveOplog(CidStore& ChunkStore,
OnBlock = UploadBlock;
}
- std::vector<Block> KnownBlocks;
+ std::vector<remotestore_impl::Block> KnownBlocks;
uint64_t TransferWallTimeMS = 0;
if (RemoteStoreInfo.CreateBlocks && !RemoteStoreInfo.BaseContainerName.empty())
{
- ReportMessage(OptionalContext, fmt::format("Loading oplog base container '{}'", RemoteStoreInfo.BaseContainerName));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Loading oplog base container '{}'", RemoteStoreInfo.BaseContainerName));
Stopwatch LoadBaseContainerTimer;
RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer();
TransferWallTimeMS += LoadBaseContainerTimer.GetElapsedTimeMs();
@@ -1907,17 +2474,18 @@ SaveOplog(CidStore& ChunkStore,
{
if (BaseContainerResult.ErrorCode)
{
- ReportMessage(OptionalContext,
- fmt::format("Failed to load oplog base container '{}' ({}): {}, uploading all attachments",
- RemoteStoreInfo.BaseContainerName,
- BaseContainerResult.ErrorCode,
- BaseContainerResult.Reason));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Failed to load oplog base container '{}' ({}): {}, uploading all attachments",
+ RemoteStoreInfo.BaseContainerName,
+ BaseContainerResult.ErrorCode,
+ BaseContainerResult.Reason));
}
else
{
- ReportMessage(OptionalContext,
- fmt::format("Loaded oplog base container in {}",
- NiceTimeSpanMs(static_cast<uint64_t>(BaseContainerResult.ElapsedSeconds * 1000.0))));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Loaded oplog base container in {}",
+ NiceTimeSpanMs(static_cast<uint64_t>(BaseContainerResult.ElapsedSeconds * 1000.0))));
CbArrayView BlocksArray = BaseContainerResult.ContainerObject["blocks"sv].AsArrayView();
@@ -1933,12 +2501,13 @@ SaveOplog(CidStore& ChunkStore,
RemoteProjectStore::HasAttachmentsResult HasResult = RemoteStore.HasAttachments(BlockHashes);
if (HasResult.ErrorCode == 0)
{
- ReportMessage(OptionalContext,
- fmt::format("Checked the existance of {} block{} in remote store, found {} existing blocks in {}",
- BlockHashes.size(),
- BlockHashes.size() > 1 ? "s"sv : ""sv,
- BlockHashes.size() - HasResult.Needs.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(HasResult.ElapsedSeconds * 1000.0))));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Checked the existance of {} block{} in remote store, found {} existing blocks in {}",
+ BlockHashes.size(),
+ BlockHashes.size() > 1 ? "s"sv : ""sv,
+ BlockHashes.size() - HasResult.Needs.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(HasResult.ElapsedSeconds * 1000.0))));
if (HasResult.Needs.size() < BlocksArray.Num())
{
KnownBlocks.reserve(BlocksArray.Num() - HasResult.Needs.size());
@@ -1970,11 +2539,12 @@ SaveOplog(CidStore& ChunkStore,
}
else
{
- ReportMessage(OptionalContext,
- fmt::format("Unable to determine which blocks in base container exist in remote store, assuming none "
- "does: '{}', error code : {}",
- HasResult.Reason,
- HasResult.ErrorCode));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Unable to determine which blocks in base container exist in remote store, assuming none "
+ "does: '{}', error code : {}",
+ HasResult.Reason,
+ HasResult.ErrorCode));
}
}
}
@@ -2001,37 +2571,40 @@ SaveOplog(CidStore& ChunkStore,
{
Info.OplogSizeBytes = OplogContainerObject.GetSize();
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteProjectStore::Result Result = {.ErrorCode = 0,
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
.Text = "Operation cancelled"};
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return Result;
}
uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num();
uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num();
- ReportMessage(OptionalContext,
- fmt::format("Saving oplog container '{}' with {} attachments and {} blocks...",
- RemoteStoreInfo.ContainerName,
- ChunkCount,
- BlockCount));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Saving oplog container '{}' with {} attachments and {} blocks...",
+ RemoteStoreInfo.ContainerName,
+ ChunkCount,
+ BlockCount));
Stopwatch SaveContainerTimer;
RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer());
TransferWallTimeMS += SaveContainerTimer.GetElapsedTimeMs();
if (ContainerSaveResult.ErrorCode)
{
RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container");
- ReportMessage(OptionalContext,
- fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
}
else
{
- ReportMessage(OptionalContext,
- fmt::format("Saved container '{}' in {}",
- RemoteStoreInfo.ContainerName,
- NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000.0))));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Saved container '{}' in {}",
+ RemoteStoreInfo.ContainerName,
+ NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000.0))));
}
{
@@ -2054,39 +2627,40 @@ SaveOplog(CidStore& ChunkStore,
uint32_t Try = 0;
while (!RemoteResult.IsError())
{
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteProjectStore::Result Result = {.ErrorCode = 0,
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
.Text = "Operation cancelled"};
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Text));
+ remotestore_impl::ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Text));
return Result;
}
- ReportMessage(OptionalContext, "Finalizing oplog container...");
+ remotestore_impl::ReportMessage(OptionalContext, "Finalizing oplog container...");
RemoteProjectStore::FinalizeResult ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash);
if (ContainerFinalizeResult.ErrorCode)
{
RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to finalize oplog container {} ({}): {}",
- ContainerSaveResult.RawHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Failed to finalize oplog container {} ({}): {}",
+ ContainerSaveResult.RawHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
RemoteProjectStore::Result Result = RemoteResult.ConvertResult();
return Result;
}
- ReportMessage(OptionalContext,
- fmt::format("Finalized container '{}' in {}",
- RemoteStoreInfo.ContainerName,
- NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000.0))));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Finalized container '{}' in {}",
+ RemoteStoreInfo.ContainerName,
+ NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000.0))));
if (ContainerFinalizeResult.Needs.empty())
{
break;
}
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteProjectStore::Result Result = {.ErrorCode = 0,
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
@@ -2099,7 +2673,7 @@ SaveOplog(CidStore& ChunkStore,
{
Try++;
- ReportMessage(
+ remotestore_impl::ReportMessage(
OptionalContext,
fmt::format("Finalize of container '{}' reported {} missing attachments. Uploading missing attachements. Try {}",
RemoteStoreInfo.ContainerName,
@@ -2129,11 +2703,11 @@ SaveOplog(CidStore& ChunkStore,
fmt::format("Giving up finalize oplog container {} after {} retries, still getting reports of missing attachments",
ContainerSaveResult.RawHash,
ContainerFinalizeResult.Needs.size()));
- ReportMessage(OptionalContext,
- fmt::format("Failed to finalize oplog container container {} ({}): {}",
- ContainerSaveResult.RawHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Failed to finalize oplog container container {} ({}): {}",
+ ContainerSaveResult.RawHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
break;
}
}
@@ -2144,19 +2718,19 @@ SaveOplog(CidStore& ChunkStore,
RemoteProjectStore::Result Result = RemoteResult.ConvertResult();
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- LogRemoteStoreStatsDetails(RemoteStore.GetStats());
+ remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats());
- ReportMessage(OptionalContext,
- fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}",
- RemoteStoreInfo.ContainerName,
- RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
- NiceBytes(Info.OplogSizeBytes),
- Info.AttachmentBlocksUploaded.load(),
- NiceBytes(Info.AttachmentBlockBytesUploaded.load()),
- Info.AttachmentsUploaded.load(),
- NiceBytes(Info.AttachmentBytesUploaded.load()),
- GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}",
+ RemoteStoreInfo.ContainerName,
+ RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
+ NiceBytes(Info.OplogSizeBytes),
+ Info.AttachmentBlocksUploaded.load(),
+ NiceBytes(Info.AttachmentBlockBytesUploaded.load()),
+ Info.AttachmentsUploaded.load(),
+ NiceBytes(Info.AttachmentBytesUploaded.load()),
+ remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
return Result;
};
@@ -2183,7 +2757,8 @@ ParseOplogContainer(const CbObject& ContainerObject,
CbObject SectionObject = LoadCompactBinaryObject(SectionPayload);
if (!SectionObject)
{
- ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type"));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type"));
return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest),
Timer.GetElapsedTimeMs() / 1000.0,
"Section has unexpected data type",
@@ -2203,7 +2778,7 @@ ParseOplogContainer(const CbObject& ContainerObject,
std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end());
OnReferencedAttachments(ReferencedAttachments);
}
- ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size()));
+ remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size()));
CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView();
for (CbFieldView ChunkedFileField : ChunkedFilesArray)
@@ -2283,7 +2858,8 @@ ParseOplogContainer(const CbObject& ContainerObject,
}
}
}
- ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
size_t NeedAttachmentCount = 0;
CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
@@ -2296,68 +2872,8 @@ ParseOplogContainer(const CbObject& ContainerObject,
NeedAttachmentCount++;
}
};
- ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num()));
-
- return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0};
-}
-
-static RemoteProjectStore::Result
-WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext)
-{
- using namespace std::literals;
-
- Stopwatch Timer;
-
- CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
- const uint64_t OpCount = OpsArray.Num();
-
- ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount));
-
- const size_t OpsBatchSize = 8192;
- std::vector<uint8_t> OpsData;
- std::vector<size_t> OpDataOffsets;
- size_t OpsCompleteCount = 0;
-
- OpsData.reserve(OpsBatchSize);
-
- auto AppendBatch = [&]() {
- std::vector<CbObjectView> Ops;
- Ops.reserve(OpDataOffsets.size());
- for (size_t OpDataOffset : OpDataOffsets)
- {
- Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset]));
- }
- std::vector<uint32_t> OpLsns = Oplog.AppendNewOplogEntries(Ops);
- OpsCompleteCount += OpLsns.size();
- OpsData.clear();
- OpDataOffsets.clear();
- ReportProgress(OptionalContext,
- "Writing oplog"sv,
- fmt::format("{} remaining...", OpCount - OpsCompleteCount),
- OpCount,
- OpCount - OpsCompleteCount);
- };
-
- BinaryWriter Writer;
- for (CbFieldView OpEntry : OpsArray)
- {
- CbObjectView Op = OpEntry.AsObjectView();
- Op.CopyTo(Writer);
- OpDataOffsets.push_back(OpsData.size());
- OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize());
- Writer.Reset();
-
- if (OpDataOffsets.size() == OpsBatchSize)
- {
- AppendBatch();
- }
- }
- if (!OpDataOffsets.empty())
- {
- AppendBatch();
- }
-
- ReportProgress(OptionalContext, "Writing oplog"sv, ""sv, OpCount, 0);
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num()));
return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0};
}
@@ -2388,7 +2904,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
{
return Result;
}
- Result = WriteOplogSection(Oplog, OplogSection, OptionalContext);
+ Result = remotestore_impl::WriteOplogSection(Oplog, OplogSection, OptionalContext);
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
@@ -2404,19 +2920,7 @@ LoadOplog(CidStore& ChunkStore,
{
using namespace std::literals;
- struct DownloadInfo
- {
- uint64_t OplogSizeBytes = 0;
- std::atomic<uint64_t> AttachmentsDownloaded = 0;
- std::atomic<uint64_t> AttachmentBlocksDownloaded = 0;
- std::atomic<uint64_t> AttachmentBytesDownloaded = 0;
- std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0;
- std::atomic<uint64_t> AttachmentsStored = 0;
- std::atomic<uint64_t> AttachmentBytesStored = 0;
- std::atomic_size_t MissingAttachmentCount = 0;
- };
-
- DownloadInfo Info;
+ remotestore_impl::DownloadInfo Info;
Stopwatch Timer;
@@ -2427,7 +2931,7 @@ LoadOplog(CidStore& ChunkStore,
uint64_t BlockCountToDownload = 0;
RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo();
- ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName));
+ remotestore_impl::ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName));
uint64_t TransferWallTimeMS = 0;
@@ -2436,7 +2940,7 @@ LoadOplog(CidStore& ChunkStore,
TransferWallTimeMS += LoadContainerTimer.GetElapsedTimeMs();
if (LoadContainerResult.ErrorCode)
{
- ReportMessage(
+ remotestore_impl::ReportMessage(
OptionalContext,
fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode));
return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode,
@@ -2444,16 +2948,16 @@ LoadOplog(CidStore& ChunkStore,
.Reason = LoadContainerResult.Reason,
.Text = LoadContainerResult.Text};
}
- ReportMessage(OptionalContext,
- fmt::format("Loaded container in {} ({})",
- NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)),
- NiceBytes(LoadContainerResult.ContainerObject.GetSize())));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Loaded container in {} ({})",
+ NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)),
+ NiceBytes(LoadContainerResult.ContainerObject.GetSize())));
Info.OplogSizeBytes = LoadContainerResult.ContainerObject.GetSize();
- AsyncRemoteResult RemoteResult;
- Latch AttachmentsDownloadLatch(1);
- Latch AttachmentsWriteLatch(1);
- std::atomic_size_t AttachmentCount = 0;
+ remotestore_impl::AsyncRemoteResult RemoteResult;
+ Latch AttachmentsDownloadLatch(1);
+ Latch AttachmentsWriteLatch(1);
+ std::atomic_size_t AttachmentCount = 0;
Stopwatch LoadAttachmentsTimer;
std::atomic_uint64_t DownloadStartMS = (std::uint64_t)-1;
@@ -2469,6 +2973,7 @@ LoadOplog(CidStore& ChunkStore,
}
return false;
};
+
auto OnNeedBlock = [&RemoteStore,
&ChunkStore,
&NetworkWorkerPool,
@@ -2489,265 +2994,41 @@ LoadOplog(CidStore& ChunkStore,
}
BlockCountToDownload++;
+ AttachmentCount.fetch_add(1);
if (BlockHash == IoHash::Zero)
{
- AttachmentsDownloadLatch.AddCount(1);
- AttachmentCount.fetch_add(1);
- NetworkWorkerPool.ScheduleWork([&RemoteStore,
- &ChunkStore,
- &WorkerPool,
- &AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- &RemoteResult,
- Chunks = std::move(Chunks),
- &Info,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- uint64_t Unset = (std::uint64_t)-1;
- DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
- if (Result.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to load attachments with {} chunks ({}): {}",
- Chunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- Info.MissingAttachmentCount.fetch_add(1);
- if (IgnoreMissingAttachments)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- }
- return;
- }
- Info.AttachmentsDownloaded.fetch_add(Chunks.size());
- ZEN_INFO("Loaded {} bulk attachments in {}",
- Chunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
- if (RemoteResult.IsError())
- {
- return;
- }
- AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- if (!Chunks.empty())
- {
- try
- {
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
- WriteAttachmentBuffers.reserve(Chunks.size());
- WriteRawHashes.reserve(Chunks.size());
-
- for (const auto& It : Chunks)
- {
- uint64_t ChunkSize = It.second.GetCompressedSize();
- Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
- WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
- WriteRawHashes.push_back(It.first);
- }
- std::vector<CidStore::InsertResult> InsertResults =
- ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
-
- for (size_t Index = 0; Index < InsertResults.size(); Index++)
- {
- if (InsertResults[Index].New)
- {
- Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
- Info.AttachmentsStored.fetch_add(1);
- }
- }
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to bulk save {} attachments", Chunks.size()),
- Ex.what());
- }
- }
- });
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to bulk load {} attachments", Chunks.size()),
- Ex.what());
- }
- });
- return;
+ DownloadAndSaveBlockChunks(ChunkStore,
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ NetworkWorkerPool,
+ WorkerPool,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ Chunks);
+ }
+ else
+ {
+ DownloadAndSaveBlock(ChunkStore,
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ NetworkWorkerPool,
+ WorkerPool,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockHash,
+ Chunks,
+ 3);
}
- AttachmentsDownloadLatch.AddCount(1);
- AttachmentCount.fetch_add(1);
- NetworkWorkerPool.ScheduleWork([&AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- &ChunkStore,
- &RemoteStore,
- &WorkerPool,
- BlockHash,
- &RemoteResult,
- Chunks = std::move(Chunks),
- &Info,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- uint64_t Unset = (std::uint64_t)-1;
- DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
- if (BlockResult.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to download block attachment {} ({}): {}",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
- }
- return;
- }
- if (RemoteResult.IsError())
- {
- return;
- }
- uint64_t BlockSize = BlockResult.Bytes.GetSize();
- Info.AttachmentBlocksDownloaded.fetch_add(1);
- ZEN_INFO("Loaded block attachment '{}' in {} ({})",
- BlockHash,
- NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
- NiceBytes(BlockSize));
- Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
-
- AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork([&AttachmentsWriteLatch,
- &RemoteResult,
- &Info,
- &ChunkStore,
- BlockHash,
- Chunks = std::move(Chunks),
- Bytes = std::move(BlockResult.Bytes),
- OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- ZEN_ASSERT(Bytes.Size() > 0);
- std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
- WantedChunks.reserve(Chunks.size());
- WantedChunks.insert(Chunks.begin(), Chunks.end());
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
-
- IoHash RawHash;
- uint64_t RawSize;
- SharedBuffer BlockPayload = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize).Decompress();
- if (!BlockPayload)
- {
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash));
- RemoteResult.SetError(
- gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash),
- {});
- return;
- }
- if (RawHash != BlockHash)
- {
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash));
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash),
- {});
- return;
- }
-
- bool StoreChunksOK = IterateBlock(
- BlockPayload,
- [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk,
- const IoHash& AttachmentRawHash) {
- if (WantedChunks.contains(AttachmentRawHash))
- {
- WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
- IoHash RawHash;
- uint64_t RawSize;
- ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize));
- ZEN_ASSERT(RawHash == AttachmentRawHash);
- WriteRawHashes.emplace_back(AttachmentRawHash);
- WantedChunks.erase(AttachmentRawHash);
- }
- });
-
- if (!StoreChunksOK)
- {
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has invalid format ({}): {}",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Invalid format for block {}", BlockHash),
- {});
- return;
- }
-
- ZEN_ASSERT(WantedChunks.empty());
-
- if (!WriteAttachmentBuffers.empty())
- {
- auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
- for (size_t Index = 0; Index < Results.size(); Index++)
- {
- const auto& Result = Results[Index];
- if (Result.New)
- {
- Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
- Info.AttachmentsStored.fetch_add(1);
- }
- }
- }
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed save block attachment {}", BlockHash),
- Ex.what());
- }
- });
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to block attachment {}", BlockHash),
- Ex.what());
- }
- });
};
auto OnNeedAttachment = [&RemoteStore,
@@ -2773,95 +3054,20 @@ LoadOplog(CidStore& ChunkStore,
{
return;
}
-
- AttachmentsDownloadLatch.AddCount(1);
AttachmentCount.fetch_add(1);
- NetworkWorkerPool.ScheduleWork([&RemoteStore,
- &ChunkStore,
- &WorkerPool,
- &RemoteResult,
- &AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- RawHash,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- &Info,
- IgnoreMissingAttachments,
- OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- uint64_t Unset = (std::uint64_t)-1;
- DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
- if (AttachmentResult.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to download large attachment {}: '{}', error code : {}",
- RawHash,
- AttachmentResult.Reason,
- AttachmentResult.ErrorCode));
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
- }
- return;
- }
- uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
- ZEN_INFO("Loaded large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
- NiceBytes(AttachmentSize));
- Info.AttachmentsDownloaded.fetch_add(1);
- if (RemoteResult.IsError())
- {
- return;
- }
- Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
-
- AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork([&AttachmentsWriteLatch,
- &RemoteResult,
- &Info,
- &ChunkStore,
- RawHash,
- AttachmentSize,
- Bytes = std::move(AttachmentResult.Bytes),
- OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash);
- if (InsertResult.New)
- {
- Info.AttachmentBytesStored.fetch_add(AttachmentSize);
- Info.AttachmentsStored.fetch_add(1);
- }
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Saving attachment {} failed", RawHash),
- Ex.what());
- }
- });
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Loading attachment {} failed", RawHash),
- Ex.what());
- }
- });
+ DownloadAndSaveAttachment(ChunkStore,
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ NetworkWorkerPool,
+ WorkerPool,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ RawHash);
};
std::vector<ChunkedInfo> FilesToDechunk;
@@ -2891,18 +3097,18 @@ LoadOplog(CidStore& ChunkStore,
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
}
- ReportMessage(OptionalContext,
- fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download",
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
- Attachments.size(),
- BlockCountToDownload,
- FilesToDechunk.size()));
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
+ Attachments.size(),
+ BlockCountToDownload,
+ FilesToDechunk.size()));
AttachmentsDownloadLatch.CountDown();
while (!AttachmentsDownloadLatch.Wait(1000))
{
ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining();
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
if (!RemoteResult.IsError())
{
@@ -2914,11 +3120,12 @@ LoadOplog(CidStore& ChunkStore,
{
PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
}
- ReportProgress(OptionalContext,
- "Loading attachments"sv,
- fmt::format("{} remaining. {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)),
- AttachmentCount.load(),
- Remaining);
+ remotestore_impl::ReportProgress(
+ OptionalContext,
+ "Loading attachments"sv,
+ fmt::format("{} remaining. {}", Remaining, remotestore_impl::GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)),
+ AttachmentCount.load(),
+ Remaining);
}
if (DownloadStartMS != (uint64_t)-1)
{
@@ -2927,41 +3134,41 @@ LoadOplog(CidStore& ChunkStore,
if (AttachmentCount.load() > 0)
{
- ReportProgress(OptionalContext,
- "Loading attachments"sv,
- fmt::format("{}", GetStats(RemoteStore.GetStats(), TransferWallTimeMS)),
- AttachmentCount.load(),
- 0);
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Loading attachments"sv,
+ fmt::format("{}", remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)),
+ AttachmentCount.load(),
+ 0);
}
AttachmentsWriteLatch.CountDown();
while (!AttachmentsWriteLatch.Wait(1000))
{
ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining();
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
if (!RemoteResult.IsError())
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
}
}
- ReportProgress(OptionalContext,
- "Writing attachments"sv,
- fmt::format("{} remaining.", Remaining),
- AttachmentCount.load(),
- Remaining);
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Writing attachments"sv,
+ fmt::format("{} remaining.", Remaining),
+ AttachmentCount.load(),
+ Remaining);
}
if (AttachmentCount.load() > 0)
{
- ReportProgress(OptionalContext, "Writing attachments", ""sv, AttachmentCount.load(), 0);
+ remotestore_impl::ReportProgress(OptionalContext, "Writing attachments", ""sv, AttachmentCount.load(), 0);
}
if (Result.ErrorCode == 0)
{
if (!FilesToDechunk.empty())
{
- ReportMessage(OptionalContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size()));
+ remotestore_impl::ReportMessage(OptionalContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size()));
Latch DechunkLatch(1);
std::filesystem::path TempFilePath = Oplog.TempPath();
@@ -3011,7 +3218,7 @@ LoadOplog(CidStore& ChunkStore,
IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash);
if (!Chunk)
{
- ReportMessage(
+ remotestore_impl::ReportMessage(
OptionalContext,
fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
@@ -3070,22 +3277,23 @@ LoadOplog(CidStore& ChunkStore,
while (!DechunkLatch.Wait(1000))
{
ptrdiff_t Remaining = DechunkLatch.Remaining();
- if (IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
if (!RemoteResult.IsError())
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
}
}
- ReportProgress(OptionalContext,
- "Dechunking attachments"sv,
- fmt::format("{} remaining...", Remaining),
- FilesToDechunk.size(),
- Remaining);
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Dechunking attachments"sv,
+ fmt::format("{} remaining...", Remaining),
+ FilesToDechunk.size(),
+ Remaining);
}
- ReportProgress(OptionalContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0);
+ remotestore_impl::ReportProgress(OptionalContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0);
}
Result = RemoteResult.ConvertResult();
}
@@ -3099,33 +3307,34 @@ LoadOplog(CidStore& ChunkStore,
Result = RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError),
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
.Reason = fmt::format("Failed to clean existing oplog '{}'", Oplog.OplogId())};
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason));
+ remotestore_impl::ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason));
}
}
if (Result.ErrorCode == 0)
{
- WriteOplogSection(Oplog, OplogSection, OptionalContext);
+ remotestore_impl::WriteOplogSection(Oplog, OplogSection, OptionalContext);
}
}
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- LogRemoteStoreStatsDetails(RemoteStore.GetStats());
-
- ReportMessage(OptionalContext,
- fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {} {}",
- RemoteStoreInfo.ContainerName,
- Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE",
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
- NiceBytes(Info.OplogSizeBytes),
- Info.AttachmentBlocksDownloaded.load(),
- NiceBytes(Info.AttachmentBlockBytesDownloaded.load()),
- Info.AttachmentsDownloaded.load(),
- NiceBytes(Info.AttachmentBytesDownloaded.load()),
- Info.AttachmentsStored.load(),
- NiceBytes(Info.AttachmentBytesStored.load()),
- Info.MissingAttachmentCount.load(),
- GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
+ remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats());
+
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {} {}",
+ RemoteStoreInfo.ContainerName,
+ Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
+ NiceBytes(Info.OplogSizeBytes),
+ Info.AttachmentBlocksDownloaded.load(),
+ NiceBytes(Info.AttachmentBlockBytesDownloaded.load()),
+ Info.AttachmentsDownloaded.load(),
+ NiceBytes(Info.AttachmentBytesDownloaded.load()),
+ Info.AttachmentsStored.load(),
+ NiceBytes(Info.AttachmentBytesStored.load()),
+ Info.MissingAttachmentCount.load(),
+ remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
return Result;
}