diff options
| author | Dan Engelbrecht <[email protected]> | 2025-09-10 16:38:33 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-09-10 16:38:33 +0200 |
| commit | 339668ac935f781c06225d2d685642e27348772b (patch) | |
| tree | a5552d166eef9b5c72a2f9a6903e584dfc8968d7 /src/zenserver/projectstore | |
| parent | faster oplog entries with referenceset (#488) (diff) | |
| download | zen-339668ac935f781c06225d2d685642e27348772b.tar.xz zen-339668ac935f781c06225d2d685642e27348772b.zip | |
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
Diffstat (limited to 'src/zenserver/projectstore')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 29 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 1543 |
2 files changed, 803 insertions, 769 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 322af5e69..7cb115110 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1704,7 +1704,7 @@ ProjectStore::Oplog::Validate(const std::filesystem::path& ProjectRootDir, std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) @@ -2249,7 +2249,7 @@ ProjectStore::Oplog::IterateChunks(const std::filesystem::path& P { std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) @@ -4122,21 +4122,24 @@ ProjectStore::Flush() WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { for (const Ref<Project>& Project : Projects) { - Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) { - try - { - Project->Flush(); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what()); - } - }); + Work.ScheduleWork( + WorkerPool, + [this, Project](std::atomic<bool>&) { + try + { + Project->Flush(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what()); + } + }, + 0); } } catch (const std::exception& Ex) diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 2a81ee3e3..feafcc810 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -245,101 +245,105 @@ namespace remotestore_impl { const std::vector<IoHash>& Chunks) { AttachmentsDownloadLatch.AddCount(1); - NetworkWorkerPool.ScheduleWork([&RemoteStore, - &ChunkStore, - &WorkerPool, - &AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - &RemoteResult, - Chunks = Chunks, - &Info, - &LoadAttachmentsTimer, - &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext]() { - auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - uint64_t Unset = (std::uint64_t)-1; - DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); - if (Result.ErrorCode) - { - ReportMessage(OptionalContext, - fmt::format("Failed to load attachments with {} chunks ({}): {}", - Chunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - Info.MissingAttachmentCount.fetch_add(1); - if (IgnoreMissingAttachments) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - } - return; - } - Info.AttachmentsDownloaded.fetch_add(Chunks.size()); - ZEN_INFO("Loaded {} bulk attachments in {}", - Chunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + NetworkWorkerPool.ScheduleWork( + [&RemoteStore, + &ChunkStore, + &WorkerPool, + &AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + &RemoteResult, + Chunks = Chunks, + &Info, + &LoadAttachmentsTimer, + &DownloadStartMS, + IgnoreMissingAttachments, + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } - AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork([&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) + try + { + uint64_t Unset = (std::uint64_t)-1; + DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); + RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); + if (Result.ErrorCode) { + ReportMessage(OptionalContext, + fmt::format("Failed to load attachments with {} chunks ({}): {}", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + Info.MissingAttachmentCount.fetch_add(1); + if (IgnoreMissingAttachments) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + } return; } - if (!Chunks.empty()) + Info.AttachmentsDownloaded.fetch_add(Chunks.size()); + ZEN_INFO("Loaded {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + if (RemoteResult.IsError()) { - try - { - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; - WriteAttachmentBuffers.reserve(Chunks.size()); - WriteRawHashes.reserve(Chunks.size()); - - for (const auto& It : Chunks) + return; + } + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) { - uint64_t ChunkSize = It.second.GetCompressedSize(); - Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); - WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); - WriteRawHashes.push_back(It.first); + return; } - std::vector<CidStore::InsertResult> InsertResults = - ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); - - for (size_t Index = 0; Index < InsertResults.size(); Index++) + if (!Chunks.empty()) { - if (InsertResults[Index].New) + try { - Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); - Info.AttachmentsStored.fetch_add(1); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + WriteAttachmentBuffers.reserve(Chunks.size()); + WriteRawHashes.reserve(Chunks.size()); + + for (const auto& It : Chunks) + { + uint64_t ChunkSize = It.second.GetCompressedSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(It.first); + } + std::vector<CidStore::InsertResult> InsertResults = + ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); + + for (size_t Index = 0; Index < InsertResults.size(); Index++) + { + if (InsertResults[Index].New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + } + } + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to bulk save {} attachments", Chunks.size()), + Ex.what()); } } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to bulk save {} attachments", Chunks.size()), - Ex.what()); - } - } - }); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to bulk load {} attachments", Chunks.size()), - Ex.what()); - } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to bulk load {} attachments", Chunks.size()), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); }; void DownloadAndSaveBlock(CidStore& ChunkStore, @@ -359,226 +363,237 @@ namespace remotestore_impl { uint32_t RetriesLeft) { AttachmentsDownloadLatch.AddCount(1); - NetworkWorkerPool.ScheduleWork([&AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - &ChunkStore, - &RemoteStore, - &NetworkWorkerPool, - &WorkerPool, - BlockHash, - &RemoteResult, - &Info, - &LoadAttachmentsTimer, - &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext, - RetriesLeft, - Chunks = Chunks]() { - auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - uint64_t Unset = (std::uint64_t)-1; - DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); - if (BlockResult.ErrorCode) - { - ReportMessage(OptionalContext, - fmt::format("Failed to download block attachment {} ({}): {}", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); - } - return; - } + NetworkWorkerPool.ScheduleWork( + [&AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + &ChunkStore, + &RemoteStore, + &NetworkWorkerPool, + &WorkerPool, + BlockHash, + &RemoteResult, + &Info, + &LoadAttachmentsTimer, + &DownloadStartMS, + IgnoreMissingAttachments, + OptionalContext, + RetriesLeft, + Chunks = Chunks]() { + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } - uint64_t BlockSize = BlockResult.Bytes.GetSize(); - Info.AttachmentBlocksDownloaded.fetch_add(1); - ZEN_INFO("Loaded block attachment '{}' in {} ({})", - BlockHash, - NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), - NiceBytes(BlockSize)); - Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); - - AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork([&AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - &ChunkStore, - &RemoteStore, - &NetworkWorkerPool, - &WorkerPool, - BlockHash, - &RemoteResult, - &Info, - &LoadAttachmentsTimer, - &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext, - RetriesLeft, - Chunks = Chunks, - Bytes = std::move(BlockResult.Bytes)]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) + try + { + uint64_t Unset = (std::uint64_t)-1; + DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); + RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); + if (BlockResult.ErrorCode) { + ReportMessage(OptionalContext, + fmt::format("Failed to download block attachment {} ({}): {}", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); + } return; } - try + if (RemoteResult.IsError()) { - ZEN_ASSERT(Bytes.Size() > 0); - std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; - WantedChunks.reserve(Chunks.size()); - WantedChunks.insert(Chunks.begin(), Chunks.end()); - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; - - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize); - if (!Compressed) - { - if (RetriesLeft > 0) + return; + } + uint64_t BlockSize = BlockResult.Bytes.GetSize(); + Info.AttachmentBlocksDownloaded.fetch_add(1); + ZEN_INFO("Loaded block attachment '{}' in {} ({})", + BlockHash, + NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), + NiceBytes(BlockSize)); + Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); + + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + &ChunkStore, + &RemoteStore, + &NetworkWorkerPool, + &WorkerPool, + BlockHash, + &RemoteResult, + &Info, + &LoadAttachmentsTimer, + &DownloadStartMS, + IgnoreMissingAttachments, + OptionalContext, + RetriesLeft, + Chunks = Chunks, + Bytes = std::move(BlockResult.Bytes)]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) { - ReportMessage( - OptionalContext, - fmt::format("Block attachment {} is malformed, can't parse as compressed binary, retrying download", - BlockHash)); - return DownloadAndSaveBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - BlockHash, - std::move(Chunks), - RetriesLeft - 1); + return; } - ReportMessage(OptionalContext, - fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash)); - RemoteResult.SetError( - gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash), - {}); - return; - } - SharedBuffer BlockPayload = Compressed.Decompress(); - if (!BlockPayload) - { - if (RetriesLeft > 0) + try { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} is malformed, can't decompress payload, retrying download", - BlockHash)); - return DownloadAndSaveBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - BlockHash, - std::move(Chunks), - RetriesLeft - 1); - } - ReportMessage(OptionalContext, - fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash)); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash), - {}); - return; - } - if (RawHash != BlockHash) - { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash)); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash), - {}); - return; - } + ZEN_ASSERT(Bytes.Size() > 0); + std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; + WantedChunks.reserve(Chunks.size()); + WantedChunks.insert(Chunks.begin(), Chunks.end()); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize); + if (!Compressed) + { + if (RetriesLeft > 0) + { + ReportMessage( + OptionalContext, + fmt::format( + "Block attachment {} is malformed, can't parse as compressed binary, retrying download", + BlockHash)); + return DownloadAndSaveBlock(ChunkStore, + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + NetworkWorkerPool, + WorkerPool, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockHash, + std::move(Chunks), + RetriesLeft - 1); + } + ReportMessage( + OptionalContext, + fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash)); + RemoteResult.SetError( + gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash), + {}); + return; + } + SharedBuffer BlockPayload = Compressed.Decompress(); + if (!BlockPayload) + { + if (RetriesLeft > 0) + { + ReportMessage( + OptionalContext, + fmt::format("Block attachment {} is malformed, can't decompress payload, retrying download", + BlockHash)); + return DownloadAndSaveBlock(ChunkStore, + RemoteStore, + IgnoreMissingAttachments, + OptionalContext, + NetworkWorkerPool, + WorkerPool, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockHash, + std::move(Chunks), + RetriesLeft - 1); + } + ReportMessage(OptionalContext, + fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash)); + RemoteResult.SetError( + gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash), + {}); + return; + } + if (RawHash != BlockHash) + { + ReportMessage(OptionalContext, + fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash)); + RemoteResult.SetError( + gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash), + {}); + return; + } - uint64_t BlockHeaderSize = 0; - bool StoreChunksOK = IterateChunkBlock( - BlockPayload, - [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk, - const IoHash& AttachmentRawHash) { - if (WantedChunks.contains(AttachmentRawHash)) - { - WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); - IoHash RawHash; - uint64_t RawSize; - ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize)); - ZEN_ASSERT(RawHash == AttachmentRawHash); - WriteRawHashes.emplace_back(AttachmentRawHash); - WantedChunks.erase(AttachmentRawHash); - } - }, - BlockHeaderSize); - - if (!StoreChunksOK) - { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} has invalid format ({}): {}", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Invalid format for block {}", BlockHash), - {}); - return; - } + uint64_t BlockHeaderSize = 0; + bool StoreChunksOK = IterateChunkBlock( + BlockPayload, + [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk, + const IoHash& AttachmentRawHash) { + if (WantedChunks.contains(AttachmentRawHash)) + { + WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + IoHash RawHash; + uint64_t RawSize; + ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), + RawHash, + RawSize)); + ZEN_ASSERT(RawHash == AttachmentRawHash); + WriteRawHashes.emplace_back(AttachmentRawHash); + WantedChunks.erase(AttachmentRawHash); + } + }, + BlockHeaderSize); + + if (!StoreChunksOK) + { + ReportMessage(OptionalContext, + fmt::format("Block attachment {} has invalid format ({}): {}", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Invalid format for block {}", BlockHash), + {}); + return; + } - ZEN_ASSERT(WantedChunks.empty()); + ZEN_ASSERT(WantedChunks.empty()); - if (!WriteAttachmentBuffers.empty()) - { - auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); - for (size_t Index = 0; Index < Results.size(); Index++) - { - const auto& Result = Results[Index]; - if (Result.New) + if (!WriteAttachmentBuffers.empty()) { - Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); - Info.AttachmentsStored.fetch_add(1); + auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < Results.size(); Index++) + { + const auto& Result = Results[Index]; + if (Result.New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + } + } } } - } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed save block attachment {}", BlockHash), - Ex.what()); - } - }); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to block attachment {}", BlockHash), - Ex.what()); - } - }); + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed save block attachment {}", BlockHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to block attachment {}", BlockHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); }; void DownloadAndSaveAttachment(CidStore& ChunkStore, @@ -596,92 +611,96 @@ namespace remotestore_impl { const IoHash& RawHash) { AttachmentsDownloadLatch.AddCount(1); - NetworkWorkerPool.ScheduleWork([&RemoteStore, - &ChunkStore, - &WorkerPool, - &RemoteResult, - &AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - RawHash, - &LoadAttachmentsTimer, - &DownloadStartMS, - &Info, - IgnoreMissingAttachments, - OptionalContext]() { - auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - uint64_t Unset = (std::uint64_t)-1; - DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); - if (AttachmentResult.ErrorCode) - { - ReportMessage(OptionalContext, - fmt::format("Failed to download large attachment {}: '{}', error code : {}", - RawHash, - AttachmentResult.Reason, - AttachmentResult.ErrorCode)); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); - } - return; - } - uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize(); - ZEN_INFO("Loaded large attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), - NiceBytes(AttachmentSize)); - Info.AttachmentsDownloaded.fetch_add(1); + NetworkWorkerPool.ScheduleWork( + [&RemoteStore, + &ChunkStore, + &WorkerPool, + &RemoteResult, + &AttachmentsDownloadLatch, + &AttachmentsWriteLatch, + RawHash, + &LoadAttachmentsTimer, + &DownloadStartMS, + &Info, + IgnoreMissingAttachments, + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } - Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); - - AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork([&AttachmentsWriteLatch, - &RemoteResult, - &Info, - &ChunkStore, - RawHash, - AttachmentSize, - Bytes = std::move(AttachmentResult.Bytes), - OptionalContext]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try + try + { + uint64_t Unset = (std::uint64_t)-1; + DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); + RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); + if (AttachmentResult.ErrorCode) { - CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash); - if (InsertResult.New) + ReportMessage(OptionalContext, + fmt::format("Failed to download large attachment {}: '{}', error code : {}", + RawHash, + AttachmentResult.Reason, + AttachmentResult.ErrorCode)); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) { - Info.AttachmentBytesStored.fetch_add(AttachmentSize); - Info.AttachmentsStored.fetch_add(1); + RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); } + return; } - catch (const std::exception& Ex) + uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize(); + ZEN_INFO("Loaded large attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), + NiceBytes(AttachmentSize)); + Info.AttachmentsDownloaded.fetch_add(1); + if (RemoteResult.IsError()) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Saving attachment {} failed", RawHash), - Ex.what()); + return; } - }); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Loading attachment {} failed", RawHash), - Ex.what()); - } - }); + Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); + + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&AttachmentsWriteLatch, + &RemoteResult, + &Info, + &ChunkStore, + RawHash, + AttachmentSize, + Bytes = std::move(AttachmentResult.Bytes), + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + try + { + CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(AttachmentSize); + Info.AttachmentsStored.fetch_add(1); + } + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Saving attachment {} failed", RawHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Loading attachment {} failed", RawHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); }; void CreateBlock(WorkerThreadPool& WorkerPool, @@ -694,45 +713,47 @@ namespace remotestore_impl { AsyncRemoteResult& RemoteResult) { OpSectionsLatch.AddCount(1); - WorkerPool.ScheduleWork([&Blocks, - &SectionsLock, - &OpSectionsLatch, - BlockIndex, - Chunks = std::move(ChunksInBlock), - &AsyncOnBlock, - &RemoteResult]() mutable { - auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - size_t ChunkCount = Chunks.size(); - try - { - ZEN_ASSERT(ChunkCount > 0); - Stopwatch Timer; - ChunkBlockDescription Block; - CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block); - IoHash BlockHash = CompressedBlock.DecodeRawHash(); + WorkerPool.ScheduleWork( + [&Blocks, + &SectionsLock, + &OpSectionsLatch, + BlockIndex, + Chunks = std::move(ChunksInBlock), + &AsyncOnBlock, + &RemoteResult]() mutable { + auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); + if (RemoteResult.IsError()) { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope __(SectionsLock); - Blocks[BlockIndex] = Block; + return; } - uint64_t BlockSize = CompressedBlock.GetCompressedSize(); - AsyncOnBlock(std::move(CompressedBlock), std::move(Block)); - ZEN_INFO("Generated block with {} attachments in {} ({})", - ChunkCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - NiceBytes(BlockSize)); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount), - Ex.what()); - } - }); + size_t ChunkCount = Chunks.size(); + try + { + ZEN_ASSERT(ChunkCount > 0); + Stopwatch Timer; + ChunkBlockDescription Block; + CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block); + IoHash BlockHash = CompressedBlock.DecodeRawHash(); + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope __(SectionsLock); + Blocks[BlockIndex] = Block; + } + uint64_t BlockSize = CompressedBlock.GetCompressedSize(); + AsyncOnBlock(std::move(CompressedBlock), std::move(Block)); + ZEN_INFO("Generated block with {} attachments in {} ({})", + ChunkCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + NiceBytes(BlockSize)); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); } struct UploadInfo @@ -861,89 +882,91 @@ namespace remotestore_impl { SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; - WorkerPool.ScheduleWork([&ChunkStore, - &RemoteStore, - &SaveAttachmentsLatch, - &RemoteResult, - RawHash, - &CreatedBlocks, - &LooseFileAttachments, - &Info, - OptionalContext]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - IoBuffer Payload; - ChunkBlockDescription Block; - if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) - { - Payload = BlockIt->second.Payload; - Block = BlockIt->second.Block; - } - else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) - { - Payload = LooseTmpFileIt->second(RawHash); - } - else - { - Payload = ChunkStore.FindChunkByCid(RawHash); - } - if (!Payload) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to find attachment {}", RawHash), - {}); - ZEN_WARN("Failed to save attachment '{}' ({}): {}", - RawHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - const bool IsBlock = Block.BlockHash == RawHash; - size_t PayloadSize = Payload.GetSize(); - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash, std::move(Block)); - if (Result.ErrorCode) + WorkerPool.ScheduleWork( + [&ChunkStore, + &RemoteStore, + &SaveAttachmentsLatch, + &RemoteResult, + RawHash, + &CreatedBlocks, + &LooseFileAttachments, + &Info, + OptionalContext]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ReportMessage(OptionalContext, - fmt::format("Failed to save attachment '{}', {} ({}): {}", - RawHash, - NiceBytes(PayloadSize), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); return; } - if (IsBlock) + try { - Info.AttachmentBlocksUploaded.fetch_add(1); - Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize); - ZEN_INFO("Saved block attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(PayloadSize)); + IoBuffer Payload; + ChunkBlockDescription Block; + if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) + { + Payload = BlockIt->second.Payload; + Block = BlockIt->second.Block; + } + else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) + { + Payload = LooseTmpFileIt->second(RawHash); + } + else + { + Payload = ChunkStore.FindChunkByCid(RawHash); + } + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {}", RawHash), + {}); + ZEN_WARN("Failed to save attachment '{}' ({}): {}", + RawHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + const bool IsBlock = Block.BlockHash == RawHash; + size_t PayloadSize = Payload.GetSize(); + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash, std::move(Block)); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachment '{}', {} ({}): {}", + RawHash, + NiceBytes(PayloadSize), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + return; + } + if (IsBlock) + { + Info.AttachmentBlocksUploaded.fetch_add(1); + Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize); + ZEN_INFO("Saved block attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(PayloadSize)); + } + else + { + Info.AttachmentsUploaded.fetch_add(1); + Info.AttachmentBytesUploaded.fetch_add(PayloadSize); + ZEN_INFO("Saved large attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(PayloadSize)); + } } - else + catch (const std::exception& Ex) { - Info.AttachmentsUploaded.fetch_add(1); - Info.AttachmentBytesUploaded.fetch_add(PayloadSize); - ZEN_INFO("Saved large attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(PayloadSize)); + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("To upload attachment {}", RawHash), + Ex.what()); } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("To upload attachment {}", RawHash), - Ex.what()); - } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } if (IsCancelled(OptionalContext)) @@ -982,66 +1005,68 @@ namespace remotestore_impl { SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; - WorkerPool.ScheduleWork([&RemoteStore, - &ChunkStore, - &SaveAttachmentsLatch, - &RemoteResult, - NeededChunks = std::move(NeededChunks), - &BulkBlockAttachmentsToUpload, - &Info, - OptionalContext]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - size_t ChunksSize = 0; - std::vector<SharedBuffer> ChunkBuffers; - ChunkBuffers.reserve(NeededChunks.size()); - for (const IoHash& Chunk : NeededChunks) + WorkerPool.ScheduleWork( + [&RemoteStore, + &ChunkStore, + &SaveAttachmentsLatch, + &RemoteResult, + NeededChunks = std::move(NeededChunks), + &BulkBlockAttachmentsToUpload, + &Info, + OptionalContext]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) { - auto It = BulkBlockAttachmentsToUpload.find(Chunk); - ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); - CompressedBuffer ChunkPayload = It->second(It->first).second; - if (!ChunkPayload) + return; + } + try + { + size_t ChunksSize = 0; + std::vector<SharedBuffer> ChunkBuffers; + ChunkBuffers.reserve(NeededChunks.size()); + for (const IoHash& Chunk : NeededChunks) + { + auto It = BulkBlockAttachmentsToUpload.find(Chunk); + ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); + CompressedBuffer ChunkPayload = It->second(It->first).second; + if (!ChunkPayload) + { + RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), + fmt::format("Missing chunk {}"sv, Chunk), + fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); + ChunkBuffers.clear(); + break; + } + ChunksSize += ChunkPayload.GetCompressedSize(); + ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer())); + } + RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); + if (Result.ErrorCode) { - RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), - fmt::format("Missing chunk {}"sv, Chunk), - fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); - ChunkBuffers.clear(); - break; + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachments with {} chunks ({}): {}", + NeededChunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + return; } - ChunksSize += ChunkPayload.GetCompressedSize(); - ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer())); + Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size()); + Info.AttachmentBytesUploaded.fetch_add(ChunksSize); + + ZEN_INFO("Saved {} bulk attachments in {} ({})", + NeededChunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(ChunksSize)); } - RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); - if (Result.ErrorCode) + catch (const std::exception& Ex) { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ReportMessage(OptionalContext, - fmt::format("Failed to save attachments with {} chunks ({}): {}", - NeededChunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - return; + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to buck upload {} attachments", NeededChunks.size()), + Ex.what()); } - Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size()); - Info.AttachmentBytesUploaded.fetch_add(ChunksSize); - - ZEN_INFO("Saved {} bulk attachments in {} ({})", - NeededChunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(ChunksSize)); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to buck upload {} attachments", NeededChunks.size()), - Ex.what()); - } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } } @@ -1516,148 +1541,152 @@ BuildContainer(CidStore& ChunkStore, ResolveAttachmentsLatch.AddCount(1); - WorkerPool.ScheduleWork([&ChunkStore, - UploadAttachment = &It.second, - RawHash = It.first, - &ResolveAttachmentsLatch, - &ResolveLock, - &ChunkedHashes, - &LargeChunkHashes, - &ChunkedUploadAttachments, - &LooseUploadAttachments, - &MissingHashes, - &OnLargeAttachment, - &AttachmentTempPath, - &ChunkFile, - &ChunkedFiles, - MaxChunkEmbedSize, - ChunkFileSizeLimit, - AllowChunking, - &RemoteResult, - OptionalContext]() { - auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); }); - if (remotestore_impl::IsCancelled(OptionalContext)) - { - return; - } + WorkerPool.ScheduleWork( + [&ChunkStore, + UploadAttachment = &It.second, + RawHash = It.first, + &ResolveAttachmentsLatch, + &ResolveLock, + &ChunkedHashes, + &LargeChunkHashes, + &ChunkedUploadAttachments, + &LooseUploadAttachments, + &MissingHashes, + &OnLargeAttachment, + &AttachmentTempPath, + &ChunkFile, + &ChunkedFiles, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + AllowChunking, + &RemoteResult, + OptionalContext]() { + auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); }); + if (remotestore_impl::IsCancelled(OptionalContext)) + { + return; + } - try - { - if (!UploadAttachment->RawPath.empty()) + try { - const std::filesystem::path& FilePath = UploadAttachment->RawPath; - IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath); - if (RawData) + if (!UploadAttachment->RawPath.empty()) { - if (AllowChunking && RawData.GetSize() > ChunkFileSizeLimit) + const std::filesystem::path& FilePath = UploadAttachment->RawPath; + IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath); + if (RawData) { - IoBufferFileReference FileRef; - (void)RawData.GetFileReference(FileRef); - - ChunkedFile Chunked = ChunkFile(RawHash, RawData, FileRef, OptionalContext); - ResolveLock.WithExclusiveLock( - [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() { - ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size()); - ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size()); - for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) - { - ChunkedHashes.insert(ChunkHash); - } - ChunkedFiles.emplace_back(std::move(Chunked)); + if (AllowChunking && RawData.GetSize() > ChunkFileSizeLimit) + { + IoBufferFileReference FileRef; + (void)RawData.GetFileReference(FileRef); + + ChunkedFile Chunked = ChunkFile(RawHash, RawData, FileRef, OptionalContext); + ResolveLock.WithExclusiveLock( + [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() { + ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size()); + ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size()); + for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) + { + ChunkedHashes.insert(ChunkHash); + } + ChunkedFiles.emplace_back(std::move(Chunked)); + }); + } + else if (RawData.GetSize() > (MaxChunkEmbedSize * 2)) + { + // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't + // it will be a loose attachment instead of going into a block + OnLargeAttachment(RawHash, [RawData = std::move(RawData), AttachmentTempPath](const IoHash& RawHash) { + size_t RawSize = RawData.GetSize(); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), + OodleCompressor::Mermaid, + OodleCompressionLevel::VeryFast); + + std::filesystem::path AttachmentPath = AttachmentTempPath; + AttachmentPath.append(RawHash.ToHexString()); + IoBuffer TempAttachmentBuffer = + WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath); + ZEN_INFO("Saved temp attachment to '{}', {} ({})", + AttachmentPath, + NiceBytes(RawSize), + NiceBytes(TempAttachmentBuffer.GetSize())); + return TempAttachmentBuffer; }); - } - else if (RawData.GetSize() > (MaxChunkEmbedSize * 2)) - { - // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't - // it will be a loose attachment instead of going into a block - OnLargeAttachment(RawHash, [RawData = std::move(RawData), AttachmentTempPath](const IoHash& RawHash) { - size_t RawSize = RawData.GetSize(); - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), + ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + } + else + { + uint64_t RawSize = RawData.GetSize(); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData), OodleCompressor::Mermaid, OodleCompressionLevel::VeryFast); std::filesystem::path AttachmentPath = AttachmentTempPath; AttachmentPath.append(RawHash.ToHexString()); + + uint64_t CompressedSize = Compressed.GetCompressedSize(); IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath); ZEN_INFO("Saved temp attachment to '{}', {} ({})", AttachmentPath, NiceBytes(RawSize), NiceBytes(TempAttachmentBuffer.GetSize())); - return TempAttachmentBuffer; - }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + + if (CompressedSize > MaxChunkEmbedSize) + { + OnLargeAttachment(RawHash, + [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; }); + ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + } + else + { + UploadAttachment->Size = CompressedSize; + ResolveLock.WithExclusiveLock( + [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() { + LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data))); + }); + } + } } else { - uint64_t RawSize = RawData.GetSize(); - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData), - OodleCompressor::Mermaid, - OodleCompressionLevel::VeryFast); - - std::filesystem::path AttachmentPath = AttachmentTempPath; - AttachmentPath.append(RawHash.ToHexString()); - - uint64_t CompressedSize = Compressed.GetCompressedSize(); - IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath); - ZEN_INFO("Saved temp attachment to '{}', {} ({})", - AttachmentPath, - NiceBytes(RawSize), - NiceBytes(TempAttachmentBuffer.GetSize())); - - if (CompressedSize > MaxChunkEmbedSize) - { - OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); - } - else - { - UploadAttachment->Size = CompressedSize; - ResolveLock.WithExclusiveLock( - [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() { - LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data))); - }); - } + ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); } } else { - ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); - } - } - else - { - IoBuffer Data = ChunkStore.FindChunkByCid(RawHash); - if (Data) - { - auto GetForChunking = - [](size_t ChunkFileSizeLimit, const IoBuffer& Data, IoBufferFileReference& OutFileRef) -> bool { - if (Data.IsWholeFile()) - { - IoHash VerifyRawHash; - uint64_t VerifyRawSize; - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize); - if (Compressed) + IoBuffer Data = ChunkStore.FindChunkByCid(RawHash); + if (Data) + { + auto GetForChunking = + [](size_t ChunkFileSizeLimit, const IoBuffer& Data, IoBufferFileReference& OutFileRef) -> bool { + if (Data.IsWholeFile()) { - if (VerifyRawSize > ChunkFileSizeLimit) + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize); + if (Compressed) { - OodleCompressor Compressor; - OodleCompressionLevel CompressionLevel; - uint64_t BlockSize; - if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + if (VerifyRawSize > ChunkFileSizeLimit) { - if (CompressionLevel == OodleCompressionLevel::None) + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + uint64_t BlockSize; + if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) { - CompositeBuffer Decompressed = Compressed.DecompressToComposite(); - if (Decompressed) + if (CompressionLevel == OodleCompressionLevel::None) { - std::span<const SharedBuffer> Segments = Decompressed.GetSegments(); - if (Segments.size() == 1) + CompositeBuffer Decompressed = Compressed.DecompressToComposite(); + if (Decompressed) { - IoBuffer DecompressedData = Segments[0].AsIoBuffer(); - if (DecompressedData.GetFileReference(OutFileRef)) + std::span<const SharedBuffer> Segments = Decompressed.GetSegments(); + if (Segments.size() == 1) { - return true; + IoBuffer DecompressedData = Segments[0].AsIoBuffer(); + if (DecompressedData.GetFileReference(OutFileRef)) + { + return true; + } } } } @@ -1665,49 +1694,49 @@ BuildContainer(CidStore& ChunkStore, } } } - } - return false; - }; + return false; + }; - IoBufferFileReference FileRef; - if (AllowChunking && GetForChunking(ChunkFileSizeLimit, Data, FileRef)) - { - ChunkedFile Chunked = ChunkFile(RawHash, Data, FileRef, OptionalContext); - ResolveLock.WithExclusiveLock( - [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() { - ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size()); - ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size()); - for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) - { - ChunkedHashes.insert(ChunkHash); - } - ChunkedFiles.emplace_back(std::move(Chunked)); - }); - } - else if (Data.GetSize() > MaxChunkEmbedSize) - { - OnLargeAttachment(RawHash, - [&ChunkStore](const IoHash& RawHash) { return ChunkStore.FindChunkByCid(RawHash); }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + IoBufferFileReference FileRef; + if (AllowChunking && GetForChunking(ChunkFileSizeLimit, Data, FileRef)) + { + ChunkedFile Chunked = ChunkFile(RawHash, Data, FileRef, OptionalContext); + ResolveLock.WithExclusiveLock( + [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() { + ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size()); + ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size()); + for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) + { + ChunkedHashes.insert(ChunkHash); + } + ChunkedFiles.emplace_back(std::move(Chunked)); + }); + } + else if (Data.GetSize() > MaxChunkEmbedSize) + { + OnLargeAttachment(RawHash, + [&ChunkStore](const IoHash& RawHash) { return ChunkStore.FindChunkByCid(RawHash); }); + ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + } + else + { + UploadAttachment->Size = Data.GetSize(); + } } else { - UploadAttachment->Size = Data.GetSize(); + ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); } } - else - { - ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); - } } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to resolve attachment {}", RawHash), - Ex.what()); - } - }); + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to resolve attachment {}", RawHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); } ResolveAttachmentsLatch.CountDown(); @@ -3077,101 +3106,103 @@ LoadOplog(CidStore& ChunkStore, { std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString(); DechunkLatch.AddCount(1); - WorkerPool.ScheduleWork([&ChunkStore, - &DechunkLatch, - TempFileName, - &Chunked, - &RemoteResult, - IgnoreMissingAttachments, - &Info, - OptionalContext]() { - auto _ = MakeGuard([&DechunkLatch, &TempFileName] { - std::error_code Ec; - if (IsFile(TempFileName, Ec)) - { - RemoveFile(TempFileName, Ec); - if (Ec) + WorkerPool.ScheduleWork( + [&ChunkStore, + &DechunkLatch, + TempFileName, + &Chunked, + &RemoteResult, + IgnoreMissingAttachments, + &Info, + OptionalContext]() { + auto _ = MakeGuard([&DechunkLatch, &TempFileName] { + std::error_code Ec; + if (IsFile(TempFileName, Ec)) { - ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message()); + RemoveFile(TempFileName, Ec); + if (Ec) + { + ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message()); + } } - } - DechunkLatch.CountDown(); - }); - try - { - if (RemoteResult.IsError()) - { - return; - } - Stopwatch Timer; - IoBuffer TmpBuffer; + DechunkLatch.CountDown(); + }); + try { - BasicFile TmpFile; - TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate); + if (RemoteResult.IsError()) { - BasicFileWriter TmpWriter(TmpFile, 64u * 1024u); - - uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); - BLAKE3Stream HashingStream; - for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) + return; + } + Stopwatch Timer; + IoBuffer TmpBuffer; + { + BasicFile TmpFile; + TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate); { - const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex]; - IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash); - if (!Chunk) - { - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); + BasicFileWriter TmpWriter(TmpFile, 64u * 1024u); - // We only add 1 as the resulting missing count will be 1 for the dechunked file - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) + uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); + BLAKE3Stream HashingStream; + for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) + { + const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex]; + IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash); + if (!Chunk) { - RemoteResult.SetError( - gsl::narrow<int>(HttpResponseCode::NotFound), - "Missing chunk", + remotestore_impl::ReportMessage( + OptionalContext, fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); + + // We only add 1 as the resulting missing count will be 1 for the dechunked file + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError( + gsl::narrow<int>(HttpResponseCode::NotFound), + "Missing chunk", + fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); + } + return; + } + CompositeBuffer Decompressed = + CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite(); + for (const SharedBuffer& Segment : Decompressed.GetSegments()) + { + MemoryView SegmentData = Segment.GetView(); + HashingStream.Append(SegmentData); + TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset); + Offset += SegmentData.GetSize(); } - return; - } - CompositeBuffer Decompressed = - CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite(); - for (const SharedBuffer& Segment : Decompressed.GetSegments()) - { - MemoryView SegmentData = Segment.GetView(); - HashingStream.Append(SegmentData); - TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset); - Offset += SegmentData.GetSize(); } + BLAKE3 RawHash = HashingStream.GetHash(); + ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash)); + UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash); + TmpWriter.Write(Header.GetData(), Header.GetSize(), 0); } - BLAKE3 RawHash = HashingStream.GetHash(); - ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash)); - UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash); - TmpWriter.Write(Header.GetData(), Header.GetSize(), 0); + TmpFile.Close(); + TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName); } - TmpFile.Close(); - TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName); + CidStore::InsertResult InsertResult = + ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize()); + Info.AttachmentsStored.fetch_add(1); + } + + ZEN_INFO("Dechunked attachment {} ({}) in {}", + Chunked.RawHash, + NiceBytes(Chunked.RawSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } - CidStore::InsertResult InsertResult = - ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); - if (InsertResult.New) + catch (const std::exception& Ex) { - Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize()); - Info.AttachmentsStored.fetch_add(1); + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to dechunck file {}", Chunked.RawHash), + Ex.what()); } - - ZEN_INFO("Dechunked attachment {} ({}) in {}", - Chunked.RawHash, - NiceBytes(Chunked.RawSize), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to dechunck file {}", Chunked.RawHash), - Ex.what()); - } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } DechunkLatch.CountDown(); |