aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-09-10 16:38:33 +0200
committerGitHub Enterprise <[email protected]>2025-09-10 16:38:33 +0200
commit339668ac935f781c06225d2d685642e27348772b (patch)
treea5552d166eef9b5c72a2f9a6903e584dfc8968d7 /src/zenserver/projectstore
parentfaster oplog entries with referenceset (#488) (diff)
downloadzen-339668ac935f781c06225d2d685642e27348772b.tar.xz
zen-339668ac935f781c06225d2d685642e27348772b.zip
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
Diffstat (limited to 'src/zenserver/projectstore')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp29
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp1543
2 files changed, 803 insertions, 769 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 322af5e69..7cb115110 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1704,7 +1704,7 @@ ProjectStore::Oplog::Validate(const std::filesystem::path& ProjectRootDir,
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++)
@@ -2249,7 +2249,7 @@ ProjectStore::Oplog::IterateChunks(const std::filesystem::path& P
{
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++)
@@ -4122,21 +4122,24 @@ ProjectStore::Flush()
WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst);
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
for (const Ref<Project>& Project : Projects)
{
- Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) {
- try
- {
- Project->Flush();
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what());
- }
- });
+ Work.ScheduleWork(
+ WorkerPool,
+ [this, Project](std::atomic<bool>&) {
+ try
+ {
+ Project->Flush();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what());
+ }
+ },
+ 0);
}
}
catch (const std::exception& Ex)
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 2a81ee3e3..feafcc810 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -245,101 +245,105 @@ namespace remotestore_impl {
const std::vector<IoHash>& Chunks)
{
AttachmentsDownloadLatch.AddCount(1);
- NetworkWorkerPool.ScheduleWork([&RemoteStore,
- &ChunkStore,
- &WorkerPool,
- &AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- &RemoteResult,
- Chunks = Chunks,
- &Info,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- uint64_t Unset = (std::uint64_t)-1;
- DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
- if (Result.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to load attachments with {} chunks ({}): {}",
- Chunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- Info.MissingAttachmentCount.fetch_add(1);
- if (IgnoreMissingAttachments)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- }
- return;
- }
- Info.AttachmentsDownloaded.fetch_add(Chunks.size());
- ZEN_INFO("Loaded {} bulk attachments in {}",
- Chunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ NetworkWorkerPool.ScheduleWork(
+ [&RemoteStore,
+ &ChunkStore,
+ &WorkerPool,
+ &AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ &RemoteResult,
+ Chunks = Chunks,
+ &Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ IgnoreMissingAttachments,
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
return;
}
- AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork([&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
+ try
+ {
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
+ RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
+ if (Result.ErrorCode)
{
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to load attachments with {} chunks ({}): {}",
+ Chunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ }
return;
}
- if (!Chunks.empty())
+ Info.AttachmentsDownloaded.fetch_add(Chunks.size());
+ ZEN_INFO("Loaded {} bulk attachments in {}",
+ Chunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ if (RemoteResult.IsError())
{
- try
- {
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
- WriteAttachmentBuffers.reserve(Chunks.size());
- WriteRawHashes.reserve(Chunks.size());
-
- for (const auto& It : Chunks)
+ return;
+ }
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
{
- uint64_t ChunkSize = It.second.GetCompressedSize();
- Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
- WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
- WriteRawHashes.push_back(It.first);
+ return;
}
- std::vector<CidStore::InsertResult> InsertResults =
- ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
-
- for (size_t Index = 0; Index < InsertResults.size(); Index++)
+ if (!Chunks.empty())
{
- if (InsertResults[Index].New)
+ try
{
- Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
- Info.AttachmentsStored.fetch_add(1);
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ WriteAttachmentBuffers.reserve(Chunks.size());
+ WriteRawHashes.reserve(Chunks.size());
+
+ for (const auto& It : Chunks)
+ {
+ uint64_t ChunkSize = It.second.GetCompressedSize();
+ Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(It.first);
+ }
+ std::vector<CidStore::InsertResult> InsertResults =
+ ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
+
+ for (size_t Index = 0; Index < InsertResults.size(); Index++)
+ {
+ if (InsertResults[Index].New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to bulk save {} attachments", Chunks.size()),
+ Ex.what());
}
}
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to bulk save {} attachments", Chunks.size()),
- Ex.what());
- }
- }
- });
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to bulk load {} attachments", Chunks.size()),
- Ex.what());
- }
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to bulk load {} attachments", Chunks.size()),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
};
void DownloadAndSaveBlock(CidStore& ChunkStore,
@@ -359,226 +363,237 @@ namespace remotestore_impl {
uint32_t RetriesLeft)
{
AttachmentsDownloadLatch.AddCount(1);
- NetworkWorkerPool.ScheduleWork([&AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- &ChunkStore,
- &RemoteStore,
- &NetworkWorkerPool,
- &WorkerPool,
- BlockHash,
- &RemoteResult,
- &Info,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext,
- RetriesLeft,
- Chunks = Chunks]() {
- auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- uint64_t Unset = (std::uint64_t)-1;
- DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
- if (BlockResult.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to download block attachment {} ({}): {}",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
- }
- return;
- }
+ NetworkWorkerPool.ScheduleWork(
+ [&AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ &ChunkStore,
+ &RemoteStore,
+ &NetworkWorkerPool,
+ &WorkerPool,
+ BlockHash,
+ &RemoteResult,
+ &Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ RetriesLeft,
+ Chunks = Chunks]() {
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
return;
}
- uint64_t BlockSize = BlockResult.Bytes.GetSize();
- Info.AttachmentBlocksDownloaded.fetch_add(1);
- ZEN_INFO("Loaded block attachment '{}' in {} ({})",
- BlockHash,
- NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
- NiceBytes(BlockSize));
- Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
-
- AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork([&AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- &ChunkStore,
- &RemoteStore,
- &NetworkWorkerPool,
- &WorkerPool,
- BlockHash,
- &RemoteResult,
- &Info,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext,
- RetriesLeft,
- Chunks = Chunks,
- Bytes = std::move(BlockResult.Bytes)]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
+ try
+ {
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
+ RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
+ if (BlockResult.ErrorCode)
{
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to download block attachment {} ({}): {}",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
+ }
return;
}
- try
+ if (RemoteResult.IsError())
{
- ZEN_ASSERT(Bytes.Size() > 0);
- std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
- WantedChunks.reserve(Chunks.size());
- WantedChunks.insert(Chunks.begin(), Chunks.end());
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
-
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize);
- if (!Compressed)
- {
- if (RetriesLeft > 0)
+ return;
+ }
+ uint64_t BlockSize = BlockResult.Bytes.GetSize();
+ Info.AttachmentBlocksDownloaded.fetch_add(1);
+ ZEN_INFO("Loaded block attachment '{}' in {} ({})",
+ BlockHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
+ NiceBytes(BlockSize));
+ Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
+
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ &ChunkStore,
+ &RemoteStore,
+ &NetworkWorkerPool,
+ &WorkerPool,
+ BlockHash,
+ &RemoteResult,
+ &Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ RetriesLeft,
+ Chunks = Chunks,
+ Bytes = std::move(BlockResult.Bytes)]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
{
- ReportMessage(
- OptionalContext,
- fmt::format("Block attachment {} is malformed, can't parse as compressed binary, retrying download",
- BlockHash));
- return DownloadAndSaveBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- BlockHash,
- std::move(Chunks),
- RetriesLeft - 1);
+ return;
}
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash));
- RemoteResult.SetError(
- gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash),
- {});
- return;
- }
- SharedBuffer BlockPayload = Compressed.Decompress();
- if (!BlockPayload)
- {
- if (RetriesLeft > 0)
+ try
{
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} is malformed, can't decompress payload, retrying download",
- BlockHash));
- return DownloadAndSaveBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- BlockHash,
- std::move(Chunks),
- RetriesLeft - 1);
- }
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash));
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash),
- {});
- return;
- }
- if (RawHash != BlockHash)
- {
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash));
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash),
- {});
- return;
- }
+ ZEN_ASSERT(Bytes.Size() > 0);
+ std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
+ WantedChunks.reserve(Chunks.size());
+ WantedChunks.insert(Chunks.begin(), Chunks.end());
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize);
+ if (!Compressed)
+ {
+ if (RetriesLeft > 0)
+ {
+ ReportMessage(
+ OptionalContext,
+ fmt::format(
+ "Block attachment {} is malformed, can't parse as compressed binary, retrying download",
+ BlockHash));
+ return DownloadAndSaveBlock(ChunkStore,
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ NetworkWorkerPool,
+ WorkerPool,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockHash,
+ std::move(Chunks),
+ RetriesLeft - 1);
+ }
+ ReportMessage(
+ OptionalContext,
+ fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash));
+ RemoteResult.SetError(
+ gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash),
+ {});
+ return;
+ }
+ SharedBuffer BlockPayload = Compressed.Decompress();
+ if (!BlockPayload)
+ {
+ if (RetriesLeft > 0)
+ {
+ ReportMessage(
+ OptionalContext,
+ fmt::format("Block attachment {} is malformed, can't decompress payload, retrying download",
+ BlockHash));
+ return DownloadAndSaveBlock(ChunkStore,
+ RemoteStore,
+ IgnoreMissingAttachments,
+ OptionalContext,
+ NetworkWorkerPool,
+ WorkerPool,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockHash,
+ std::move(Chunks),
+ RetriesLeft - 1);
+ }
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash));
+ RemoteResult.SetError(
+ gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash),
+ {});
+ return;
+ }
+ if (RawHash != BlockHash)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash));
+ RemoteResult.SetError(
+ gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash),
+ {});
+ return;
+ }
- uint64_t BlockHeaderSize = 0;
- bool StoreChunksOK = IterateChunkBlock(
- BlockPayload,
- [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk,
- const IoHash& AttachmentRawHash) {
- if (WantedChunks.contains(AttachmentRawHash))
- {
- WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
- IoHash RawHash;
- uint64_t RawSize;
- ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize));
- ZEN_ASSERT(RawHash == AttachmentRawHash);
- WriteRawHashes.emplace_back(AttachmentRawHash);
- WantedChunks.erase(AttachmentRawHash);
- }
- },
- BlockHeaderSize);
-
- if (!StoreChunksOK)
- {
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has invalid format ({}): {}",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Invalid format for block {}", BlockHash),
- {});
- return;
- }
+ uint64_t BlockHeaderSize = 0;
+ bool StoreChunksOK = IterateChunkBlock(
+ BlockPayload,
+ [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk,
+ const IoHash& AttachmentRawHash) {
+ if (WantedChunks.contains(AttachmentRawHash))
+ {
+ WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ IoHash RawHash;
+ uint64_t RawSize;
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(),
+ RawHash,
+ RawSize));
+ ZEN_ASSERT(RawHash == AttachmentRawHash);
+ WriteRawHashes.emplace_back(AttachmentRawHash);
+ WantedChunks.erase(AttachmentRawHash);
+ }
+ },
+ BlockHeaderSize);
+
+ if (!StoreChunksOK)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} has invalid format ({}): {}",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Invalid format for block {}", BlockHash),
+ {});
+ return;
+ }
- ZEN_ASSERT(WantedChunks.empty());
+ ZEN_ASSERT(WantedChunks.empty());
- if (!WriteAttachmentBuffers.empty())
- {
- auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
- for (size_t Index = 0; Index < Results.size(); Index++)
- {
- const auto& Result = Results[Index];
- if (Result.New)
+ if (!WriteAttachmentBuffers.empty())
{
- Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
- Info.AttachmentsStored.fetch_add(1);
+ auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < Results.size(); Index++)
+ {
+ const auto& Result = Results[Index];
+ if (Result.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
}
}
- }
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed save block attachment {}", BlockHash),
- Ex.what());
- }
- });
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to block attachment {}", BlockHash),
- Ex.what());
- }
- });
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed save block attachment {}", BlockHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to block attachment {}", BlockHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
};
void DownloadAndSaveAttachment(CidStore& ChunkStore,
@@ -596,92 +611,96 @@ namespace remotestore_impl {
const IoHash& RawHash)
{
AttachmentsDownloadLatch.AddCount(1);
- NetworkWorkerPool.ScheduleWork([&RemoteStore,
- &ChunkStore,
- &WorkerPool,
- &RemoteResult,
- &AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- RawHash,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- &Info,
- IgnoreMissingAttachments,
- OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- uint64_t Unset = (std::uint64_t)-1;
- DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
- if (AttachmentResult.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to download large attachment {}: '{}', error code : {}",
- RawHash,
- AttachmentResult.Reason,
- AttachmentResult.ErrorCode));
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
- }
- return;
- }
- uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
- ZEN_INFO("Loaded large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
- NiceBytes(AttachmentSize));
- Info.AttachmentsDownloaded.fetch_add(1);
+ NetworkWorkerPool.ScheduleWork(
+ [&RemoteStore,
+ &ChunkStore,
+ &WorkerPool,
+ &RemoteResult,
+ &AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
+ RawHash,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ &Info,
+ IgnoreMissingAttachments,
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
return;
}
- Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
-
- AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork([&AttachmentsWriteLatch,
- &RemoteResult,
- &Info,
- &ChunkStore,
- RawHash,
- AttachmentSize,
- Bytes = std::move(AttachmentResult.Bytes),
- OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
+ try
+ {
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
+ if (AttachmentResult.ErrorCode)
{
- CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash);
- if (InsertResult.New)
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to download large attachment {}: '{}', error code : {}",
+ RawHash,
+ AttachmentResult.Reason,
+ AttachmentResult.ErrorCode));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
{
- Info.AttachmentBytesStored.fetch_add(AttachmentSize);
- Info.AttachmentsStored.fetch_add(1);
+ RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
}
+ return;
}
- catch (const std::exception& Ex)
+ uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
+ ZEN_INFO("Loaded large attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
+ NiceBytes(AttachmentSize));
+ Info.AttachmentsDownloaded.fetch_add(1);
+ if (RemoteResult.IsError())
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Saving attachment {} failed", RawHash),
- Ex.what());
+ return;
}
- });
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Loading attachment {} failed", RawHash),
- Ex.what());
- }
- });
+ Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
+
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&AttachmentsWriteLatch,
+ &RemoteResult,
+ &Info,
+ &ChunkStore,
+ RawHash,
+ AttachmentSize,
+ Bytes = std::move(AttachmentResult.Bytes),
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ try
+ {
+ CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(AttachmentSize);
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Saving attachment {} failed", RawHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Loading attachment {} failed", RawHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
};
void CreateBlock(WorkerThreadPool& WorkerPool,
@@ -694,45 +713,47 @@ namespace remotestore_impl {
AsyncRemoteResult& RemoteResult)
{
OpSectionsLatch.AddCount(1);
- WorkerPool.ScheduleWork([&Blocks,
- &SectionsLock,
- &OpSectionsLatch,
- BlockIndex,
- Chunks = std::move(ChunksInBlock),
- &AsyncOnBlock,
- &RemoteResult]() mutable {
- auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- size_t ChunkCount = Chunks.size();
- try
- {
- ZEN_ASSERT(ChunkCount > 0);
- Stopwatch Timer;
- ChunkBlockDescription Block;
- CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block);
- IoHash BlockHash = CompressedBlock.DecodeRawHash();
+ WorkerPool.ScheduleWork(
+ [&Blocks,
+ &SectionsLock,
+ &OpSectionsLatch,
+ BlockIndex,
+ Chunks = std::move(ChunksInBlock),
+ &AsyncOnBlock,
+ &RemoteResult]() mutable {
+ auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); });
+ if (RemoteResult.IsError())
{
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope __(SectionsLock);
- Blocks[BlockIndex] = Block;
+ return;
}
- uint64_t BlockSize = CompressedBlock.GetCompressedSize();
- AsyncOnBlock(std::move(CompressedBlock), std::move(Block));
- ZEN_INFO("Generated block with {} attachments in {} ({})",
- ChunkCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- NiceBytes(BlockSize));
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount),
- Ex.what());
- }
- });
+ size_t ChunkCount = Chunks.size();
+ try
+ {
+ ZEN_ASSERT(ChunkCount > 0);
+ Stopwatch Timer;
+ ChunkBlockDescription Block;
+ CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block);
+ IoHash BlockHash = CompressedBlock.DecodeRawHash();
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope __(SectionsLock);
+ Blocks[BlockIndex] = Block;
+ }
+ uint64_t BlockSize = CompressedBlock.GetCompressedSize();
+ AsyncOnBlock(std::move(CompressedBlock), std::move(Block));
+ ZEN_INFO("Generated block with {} attachments in {} ({})",
+ ChunkCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ NiceBytes(BlockSize));
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
struct UploadInfo
@@ -861,89 +882,91 @@ namespace remotestore_impl {
SaveAttachmentsLatch.AddCount(1);
AttachmentsToSave++;
- WorkerPool.ScheduleWork([&ChunkStore,
- &RemoteStore,
- &SaveAttachmentsLatch,
- &RemoteResult,
- RawHash,
- &CreatedBlocks,
- &LooseFileAttachments,
- &Info,
- OptionalContext]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- IoBuffer Payload;
- ChunkBlockDescription Block;
- if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
- {
- Payload = BlockIt->second.Payload;
- Block = BlockIt->second.Block;
- }
- else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
- {
- Payload = LooseTmpFileIt->second(RawHash);
- }
- else
- {
- Payload = ChunkStore.FindChunkByCid(RawHash);
- }
- if (!Payload)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to find attachment {}", RawHash),
- {});
- ZEN_WARN("Failed to save attachment '{}' ({}): {}",
- RawHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
- return;
- }
- const bool IsBlock = Block.BlockHash == RawHash;
- size_t PayloadSize = Payload.GetSize();
- RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash, std::move(Block));
- if (Result.ErrorCode)
+ WorkerPool.ScheduleWork(
+ [&ChunkStore,
+ &RemoteStore,
+ &SaveAttachmentsLatch,
+ &RemoteResult,
+ RawHash,
+ &CreatedBlocks,
+ &LooseFileAttachments,
+ &Info,
+ OptionalContext]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
{
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachment '{}', {} ({}): {}",
- RawHash,
- NiceBytes(PayloadSize),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
return;
}
- if (IsBlock)
+ try
{
- Info.AttachmentBlocksUploaded.fetch_add(1);
- Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize);
- ZEN_INFO("Saved block attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(PayloadSize));
+ IoBuffer Payload;
+ ChunkBlockDescription Block;
+ if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
+ {
+ Payload = BlockIt->second.Payload;
+ Block = BlockIt->second.Block;
+ }
+ else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
+ {
+ Payload = LooseTmpFileIt->second(RawHash);
+ }
+ else
+ {
+ Payload = ChunkStore.FindChunkByCid(RawHash);
+ }
+ if (!Payload)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to find attachment {}", RawHash),
+ {});
+ ZEN_WARN("Failed to save attachment '{}' ({}): {}",
+ RawHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ const bool IsBlock = Block.BlockHash == RawHash;
+ size_t PayloadSize = Payload.GetSize();
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash, std::move(Block));
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to save attachment '{}', {} ({}): {}",
+ RawHash,
+ NiceBytes(PayloadSize),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ return;
+ }
+ if (IsBlock)
+ {
+ Info.AttachmentBlocksUploaded.fetch_add(1);
+ Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize);
+ ZEN_INFO("Saved block attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(PayloadSize));
+ }
+ else
+ {
+ Info.AttachmentsUploaded.fetch_add(1);
+ Info.AttachmentBytesUploaded.fetch_add(PayloadSize);
+ ZEN_INFO("Saved large attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(PayloadSize));
+ }
}
- else
+ catch (const std::exception& Ex)
{
- Info.AttachmentsUploaded.fetch_add(1);
- Info.AttachmentBytesUploaded.fetch_add(PayloadSize);
- ZEN_INFO("Saved large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(PayloadSize));
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("To upload attachment {}", RawHash),
+ Ex.what());
}
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("To upload attachment {}", RawHash),
- Ex.what());
- }
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
if (IsCancelled(OptionalContext))
@@ -982,66 +1005,68 @@ namespace remotestore_impl {
SaveAttachmentsLatch.AddCount(1);
AttachmentsToSave++;
- WorkerPool.ScheduleWork([&RemoteStore,
- &ChunkStore,
- &SaveAttachmentsLatch,
- &RemoteResult,
- NeededChunks = std::move(NeededChunks),
- &BulkBlockAttachmentsToUpload,
- &Info,
- OptionalContext]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- size_t ChunksSize = 0;
- std::vector<SharedBuffer> ChunkBuffers;
- ChunkBuffers.reserve(NeededChunks.size());
- for (const IoHash& Chunk : NeededChunks)
+ WorkerPool.ScheduleWork(
+ [&RemoteStore,
+ &ChunkStore,
+ &SaveAttachmentsLatch,
+ &RemoteResult,
+ NeededChunks = std::move(NeededChunks),
+ &BulkBlockAttachmentsToUpload,
+ &Info,
+ OptionalContext]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
{
- auto It = BulkBlockAttachmentsToUpload.find(Chunk);
- ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end());
- CompressedBuffer ChunkPayload = It->second(It->first).second;
- if (!ChunkPayload)
+ return;
+ }
+ try
+ {
+ size_t ChunksSize = 0;
+ std::vector<SharedBuffer> ChunkBuffers;
+ ChunkBuffers.reserve(NeededChunks.size());
+ for (const IoHash& Chunk : NeededChunks)
+ {
+ auto It = BulkBlockAttachmentsToUpload.find(Chunk);
+ ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end());
+ CompressedBuffer ChunkPayload = It->second(It->first).second;
+ if (!ChunkPayload)
+ {
+ RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
+ fmt::format("Missing chunk {}"sv, Chunk),
+ fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
+ ChunkBuffers.clear();
+ break;
+ }
+ ChunksSize += ChunkPayload.GetCompressedSize();
+ ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer()));
+ }
+ RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
+ if (Result.ErrorCode)
{
- RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
- fmt::format("Missing chunk {}"sv, Chunk),
- fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
- ChunkBuffers.clear();
- break;
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to save attachments with {} chunks ({}): {}",
+ NeededChunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ return;
}
- ChunksSize += ChunkPayload.GetCompressedSize();
- ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer()));
+ Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size());
+ Info.AttachmentBytesUploaded.fetch_add(ChunksSize);
+
+ ZEN_INFO("Saved {} bulk attachments in {} ({})",
+ NeededChunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(ChunksSize));
}
- RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
- if (Result.ErrorCode)
+ catch (const std::exception& Ex)
{
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachments with {} chunks ({}): {}",
- NeededChunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- return;
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to buck upload {} attachments", NeededChunks.size()),
+ Ex.what());
}
- Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size());
- Info.AttachmentBytesUploaded.fetch_add(ChunksSize);
-
- ZEN_INFO("Saved {} bulk attachments in {} ({})",
- NeededChunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(ChunksSize));
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to buck upload {} attachments", NeededChunks.size()),
- Ex.what());
- }
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
}
@@ -1516,148 +1541,152 @@ BuildContainer(CidStore& ChunkStore,
ResolveAttachmentsLatch.AddCount(1);
- WorkerPool.ScheduleWork([&ChunkStore,
- UploadAttachment = &It.second,
- RawHash = It.first,
- &ResolveAttachmentsLatch,
- &ResolveLock,
- &ChunkedHashes,
- &LargeChunkHashes,
- &ChunkedUploadAttachments,
- &LooseUploadAttachments,
- &MissingHashes,
- &OnLargeAttachment,
- &AttachmentTempPath,
- &ChunkFile,
- &ChunkedFiles,
- MaxChunkEmbedSize,
- ChunkFileSizeLimit,
- AllowChunking,
- &RemoteResult,
- OptionalContext]() {
- auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); });
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- return;
- }
+ WorkerPool.ScheduleWork(
+ [&ChunkStore,
+ UploadAttachment = &It.second,
+ RawHash = It.first,
+ &ResolveAttachmentsLatch,
+ &ResolveLock,
+ &ChunkedHashes,
+ &LargeChunkHashes,
+ &ChunkedUploadAttachments,
+ &LooseUploadAttachments,
+ &MissingHashes,
+ &OnLargeAttachment,
+ &AttachmentTempPath,
+ &ChunkFile,
+ &ChunkedFiles,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ AllowChunking,
+ &RemoteResult,
+ OptionalContext]() {
+ auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); });
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ return;
+ }
- try
- {
- if (!UploadAttachment->RawPath.empty())
+ try
{
- const std::filesystem::path& FilePath = UploadAttachment->RawPath;
- IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath);
- if (RawData)
+ if (!UploadAttachment->RawPath.empty())
{
- if (AllowChunking && RawData.GetSize() > ChunkFileSizeLimit)
+ const std::filesystem::path& FilePath = UploadAttachment->RawPath;
+ IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath);
+ if (RawData)
{
- IoBufferFileReference FileRef;
- (void)RawData.GetFileReference(FileRef);
-
- ChunkedFile Chunked = ChunkFile(RawHash, RawData, FileRef, OptionalContext);
- ResolveLock.WithExclusiveLock(
- [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() {
- ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size());
- ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size());
- for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
- {
- ChunkedHashes.insert(ChunkHash);
- }
- ChunkedFiles.emplace_back(std::move(Chunked));
+ if (AllowChunking && RawData.GetSize() > ChunkFileSizeLimit)
+ {
+ IoBufferFileReference FileRef;
+ (void)RawData.GetFileReference(FileRef);
+
+ ChunkedFile Chunked = ChunkFile(RawHash, RawData, FileRef, OptionalContext);
+ ResolveLock.WithExclusiveLock(
+ [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() {
+ ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size());
+ ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size());
+ for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
+ {
+ ChunkedHashes.insert(ChunkHash);
+ }
+ ChunkedFiles.emplace_back(std::move(Chunked));
+ });
+ }
+ else if (RawData.GetSize() > (MaxChunkEmbedSize * 2))
+ {
+ // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't
+ // it will be a loose attachment instead of going into a block
+ OnLargeAttachment(RawHash, [RawData = std::move(RawData), AttachmentTempPath](const IoHash& RawHash) {
+ size_t RawSize = RawData.GetSize();
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::VeryFast);
+
+ std::filesystem::path AttachmentPath = AttachmentTempPath;
+ AttachmentPath.append(RawHash.ToHexString());
+ IoBuffer TempAttachmentBuffer =
+ WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath);
+ ZEN_INFO("Saved temp attachment to '{}', {} ({})",
+ AttachmentPath,
+ NiceBytes(RawSize),
+ NiceBytes(TempAttachmentBuffer.GetSize()));
+ return TempAttachmentBuffer;
});
- }
- else if (RawData.GetSize() > (MaxChunkEmbedSize * 2))
- {
- // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't
- // it will be a loose attachment instead of going into a block
- OnLargeAttachment(RawHash, [RawData = std::move(RawData), AttachmentTempPath](const IoHash& RawHash) {
- size_t RawSize = RawData.GetSize();
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)),
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ }
+ else
+ {
+ uint64_t RawSize = RawData.GetSize();
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData),
OodleCompressor::Mermaid,
OodleCompressionLevel::VeryFast);
std::filesystem::path AttachmentPath = AttachmentTempPath;
AttachmentPath.append(RawHash.ToHexString());
+
+ uint64_t CompressedSize = Compressed.GetCompressedSize();
IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath);
ZEN_INFO("Saved temp attachment to '{}', {} ({})",
AttachmentPath,
NiceBytes(RawSize),
NiceBytes(TempAttachmentBuffer.GetSize()));
- return TempAttachmentBuffer;
- });
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+
+ if (CompressedSize > MaxChunkEmbedSize)
+ {
+ OnLargeAttachment(RawHash,
+ [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; });
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ }
+ else
+ {
+ UploadAttachment->Size = CompressedSize;
+ ResolveLock.WithExclusiveLock(
+ [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() {
+ LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data)));
+ });
+ }
+ }
}
else
{
- uint64_t RawSize = RawData.GetSize();
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData),
- OodleCompressor::Mermaid,
- OodleCompressionLevel::VeryFast);
-
- std::filesystem::path AttachmentPath = AttachmentTempPath;
- AttachmentPath.append(RawHash.ToHexString());
-
- uint64_t CompressedSize = Compressed.GetCompressedSize();
- IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath);
- ZEN_INFO("Saved temp attachment to '{}', {} ({})",
- AttachmentPath,
- NiceBytes(RawSize),
- NiceBytes(TempAttachmentBuffer.GetSize()));
-
- if (CompressedSize > MaxChunkEmbedSize)
- {
- OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; });
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
- }
- else
- {
- UploadAttachment->Size = CompressedSize;
- ResolveLock.WithExclusiveLock(
- [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() {
- LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data)));
- });
- }
+ ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
}
}
else
{
- ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
- }
- }
- else
- {
- IoBuffer Data = ChunkStore.FindChunkByCid(RawHash);
- if (Data)
- {
- auto GetForChunking =
- [](size_t ChunkFileSizeLimit, const IoBuffer& Data, IoBufferFileReference& OutFileRef) -> bool {
- if (Data.IsWholeFile())
- {
- IoHash VerifyRawHash;
- uint64_t VerifyRawSize;
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize);
- if (Compressed)
+ IoBuffer Data = ChunkStore.FindChunkByCid(RawHash);
+ if (Data)
+ {
+ auto GetForChunking =
+ [](size_t ChunkFileSizeLimit, const IoBuffer& Data, IoBufferFileReference& OutFileRef) -> bool {
+ if (Data.IsWholeFile())
{
- if (VerifyRawSize > ChunkFileSizeLimit)
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize);
+ if (Compressed)
{
- OodleCompressor Compressor;
- OodleCompressionLevel CompressionLevel;
- uint64_t BlockSize;
- if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ if (VerifyRawSize > ChunkFileSizeLimit)
{
- if (CompressionLevel == OodleCompressionLevel::None)
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize;
+ if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
{
- CompositeBuffer Decompressed = Compressed.DecompressToComposite();
- if (Decompressed)
+ if (CompressionLevel == OodleCompressionLevel::None)
{
- std::span<const SharedBuffer> Segments = Decompressed.GetSegments();
- if (Segments.size() == 1)
+ CompositeBuffer Decompressed = Compressed.DecompressToComposite();
+ if (Decompressed)
{
- IoBuffer DecompressedData = Segments[0].AsIoBuffer();
- if (DecompressedData.GetFileReference(OutFileRef))
+ std::span<const SharedBuffer> Segments = Decompressed.GetSegments();
+ if (Segments.size() == 1)
{
- return true;
+ IoBuffer DecompressedData = Segments[0].AsIoBuffer();
+ if (DecompressedData.GetFileReference(OutFileRef))
+ {
+ return true;
+ }
}
}
}
@@ -1665,49 +1694,49 @@ BuildContainer(CidStore& ChunkStore,
}
}
}
- }
- return false;
- };
+ return false;
+ };
- IoBufferFileReference FileRef;
- if (AllowChunking && GetForChunking(ChunkFileSizeLimit, Data, FileRef))
- {
- ChunkedFile Chunked = ChunkFile(RawHash, Data, FileRef, OptionalContext);
- ResolveLock.WithExclusiveLock(
- [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() {
- ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size());
- ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size());
- for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
- {
- ChunkedHashes.insert(ChunkHash);
- }
- ChunkedFiles.emplace_back(std::move(Chunked));
- });
- }
- else if (Data.GetSize() > MaxChunkEmbedSize)
- {
- OnLargeAttachment(RawHash,
- [&ChunkStore](const IoHash& RawHash) { return ChunkStore.FindChunkByCid(RawHash); });
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ IoBufferFileReference FileRef;
+ if (AllowChunking && GetForChunking(ChunkFileSizeLimit, Data, FileRef))
+ {
+ ChunkedFile Chunked = ChunkFile(RawHash, Data, FileRef, OptionalContext);
+ ResolveLock.WithExclusiveLock(
+ [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() {
+ ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size());
+ ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size());
+ for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
+ {
+ ChunkedHashes.insert(ChunkHash);
+ }
+ ChunkedFiles.emplace_back(std::move(Chunked));
+ });
+ }
+ else if (Data.GetSize() > MaxChunkEmbedSize)
+ {
+ OnLargeAttachment(RawHash,
+ [&ChunkStore](const IoHash& RawHash) { return ChunkStore.FindChunkByCid(RawHash); });
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ }
+ else
+ {
+ UploadAttachment->Size = Data.GetSize();
+ }
}
else
{
- UploadAttachment->Size = Data.GetSize();
+ ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
}
}
- else
- {
- ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
- }
}
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to resolve attachment {}", RawHash),
- Ex.what());
- }
- });
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to resolve attachment {}", RawHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
ResolveAttachmentsLatch.CountDown();
@@ -3077,101 +3106,103 @@ LoadOplog(CidStore& ChunkStore,
{
std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString();
DechunkLatch.AddCount(1);
- WorkerPool.ScheduleWork([&ChunkStore,
- &DechunkLatch,
- TempFileName,
- &Chunked,
- &RemoteResult,
- IgnoreMissingAttachments,
- &Info,
- OptionalContext]() {
- auto _ = MakeGuard([&DechunkLatch, &TempFileName] {
- std::error_code Ec;
- if (IsFile(TempFileName, Ec))
- {
- RemoveFile(TempFileName, Ec);
- if (Ec)
+ WorkerPool.ScheduleWork(
+ [&ChunkStore,
+ &DechunkLatch,
+ TempFileName,
+ &Chunked,
+ &RemoteResult,
+ IgnoreMissingAttachments,
+ &Info,
+ OptionalContext]() {
+ auto _ = MakeGuard([&DechunkLatch, &TempFileName] {
+ std::error_code Ec;
+ if (IsFile(TempFileName, Ec))
{
- ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message());
+ RemoveFile(TempFileName, Ec);
+ if (Ec)
+ {
+ ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message());
+ }
}
- }
- DechunkLatch.CountDown();
- });
- try
- {
- if (RemoteResult.IsError())
- {
- return;
- }
- Stopwatch Timer;
- IoBuffer TmpBuffer;
+ DechunkLatch.CountDown();
+ });
+ try
{
- BasicFile TmpFile;
- TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate);
+ if (RemoteResult.IsError())
{
- BasicFileWriter TmpWriter(TmpFile, 64u * 1024u);
-
- uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder();
- BLAKE3Stream HashingStream;
- for (std::uint32_t SequenceIndex : Chunked.ChunkSequence)
+ return;
+ }
+ Stopwatch Timer;
+ IoBuffer TmpBuffer;
+ {
+ BasicFile TmpFile;
+ TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate);
{
- const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex];
- IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash);
- if (!Chunk)
- {
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
+ BasicFileWriter TmpWriter(TmpFile, 64u * 1024u);
- // We only add 1 as the resulting missing count will be 1 for the dechunked file
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
+ uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder();
+ BLAKE3Stream HashingStream;
+ for (std::uint32_t SequenceIndex : Chunked.ChunkSequence)
+ {
+ const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex];
+ IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash);
+ if (!Chunk)
{
- RemoteResult.SetError(
- gsl::narrow<int>(HttpResponseCode::NotFound),
- "Missing chunk",
+ remotestore_impl::ReportMessage(
+ OptionalContext,
fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
+
+ // We only add 1 as the resulting missing count will be 1 for the dechunked file
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(
+ gsl::narrow<int>(HttpResponseCode::NotFound),
+ "Missing chunk",
+ fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
+ }
+ return;
+ }
+ CompositeBuffer Decompressed =
+ CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite();
+ for (const SharedBuffer& Segment : Decompressed.GetSegments())
+ {
+ MemoryView SegmentData = Segment.GetView();
+ HashingStream.Append(SegmentData);
+ TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset);
+ Offset += SegmentData.GetSize();
}
- return;
- }
- CompositeBuffer Decompressed =
- CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite();
- for (const SharedBuffer& Segment : Decompressed.GetSegments())
- {
- MemoryView SegmentData = Segment.GetView();
- HashingStream.Append(SegmentData);
- TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset);
- Offset += SegmentData.GetSize();
}
+ BLAKE3 RawHash = HashingStream.GetHash();
+ ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash));
+ UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash);
+ TmpWriter.Write(Header.GetData(), Header.GetSize(), 0);
}
- BLAKE3 RawHash = HashingStream.GetHash();
- ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash));
- UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash);
- TmpWriter.Write(Header.GetData(), Header.GetSize(), 0);
+ TmpFile.Close();
+ TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName);
}
- TmpFile.Close();
- TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName);
+ CidStore::InsertResult InsertResult =
+ ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
+
+ ZEN_INFO("Dechunked attachment {} ({}) in {}",
+ Chunked.RawHash,
+ NiceBytes(Chunked.RawSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
- CidStore::InsertResult InsertResult =
- ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace);
- if (InsertResult.New)
+ catch (const std::exception& Ex)
{
- Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize());
- Info.AttachmentsStored.fetch_add(1);
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to dechunck file {}", Chunked.RawHash),
+ Ex.what());
}
-
- ZEN_INFO("Dechunked attachment {} ({}) in {}",
- Chunked.RawHash,
- NiceBytes(Chunked.RawSize),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to dechunck file {}", Chunked.RawHash),
- Ex.what());
- }
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
DechunkLatch.CountDown();