aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp659
1 files changed, 372 insertions, 287 deletions
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index 935af9af0..2cdc76034 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -1197,30 +1197,25 @@ namespace remotestore_impl {
};
void DownloadAndSaveBlockChunks(LoadOplogContext& Context,
- Latch& AttachmentsDownloadLatch,
- Latch& AttachmentsWriteLatch,
- AsyncRemoteResult& RemoteResult,
+ ParallelWork& AttachmentWork,
DownloadInfo& Info,
Stopwatch& LoadAttachmentsTimer,
std::atomic_uint64_t& DownloadStartMS,
ThinChunkBlockDescription&& ThinBlockDescription,
std::vector<uint32_t>&& NeededChunkIndexes)
{
- AttachmentsDownloadLatch.AddCount(1);
- Context.NetworkWorkerPool.ScheduleWork(
+ AttachmentWork.ScheduleWork(
+ Context.NetworkWorkerPool,
[&Context,
- &AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- &RemoteResult,
+ &AttachmentWork,
ThinBlockDescription = std::move(ThinBlockDescription),
NeededChunkIndexes = std::move(NeededChunkIndexes),
&Info,
&LoadAttachmentsTimer,
- &DownloadStartMS]() {
+ &DownloadStartMS](std::atomic<bool>& AbortFlag) {
ZEN_TRACE_CPU("DownloadBlockChunks");
- auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
- if (RemoteResult.IsError())
+ if (AbortFlag)
{
return;
}
@@ -1246,7 +1241,7 @@ namespace remotestore_impl {
Info.MissingAttachmentCount.fetch_add(1);
if (!Context.IgnoreMissingAttachments)
{
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ throw RemoteStoreError(Result.Reason, Result.ErrorCode, Result.Text);
}
return;
}
@@ -1260,70 +1255,60 @@ namespace remotestore_impl {
fmt::format("Loaded {} bulk attachments in {}",
Chunks.size(),
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))));
- if (RemoteResult.IsError())
+ if (AbortFlag)
{
return;
}
- AttachmentsWriteLatch.AddCount(1);
- Context.WorkerPool.ScheduleWork(
- [&AttachmentsWriteLatch, &RemoteResult, &Info, &Context, Chunks = std::move(Result.Chunks)]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
+ AttachmentWork.ScheduleWork(
+ Context.WorkerPool,
+ [&Info, &Context, Chunks = std::move(Result.Chunks)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
{
return;
}
if (!Chunks.empty())
{
- try
- {
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
- WriteAttachmentBuffers.reserve(Chunks.size());
- WriteRawHashes.reserve(Chunks.size());
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ WriteAttachmentBuffers.reserve(Chunks.size());
+ WriteRawHashes.reserve(Chunks.size());
- for (const auto& It : Chunks)
- {
- WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
- WriteRawHashes.push_back(It.first);
- }
- std::vector<CidStore::InsertResult> InsertResults =
- Context.ChunkStore.AddChunks(WriteAttachmentBuffers,
- WriteRawHashes,
- CidStore::InsertMode::kCopyOnly);
+ for (const auto& It : Chunks)
+ {
+ WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(It.first);
+ }
+ std::vector<CidStore::InsertResult> InsertResults =
+ Context.ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
- for (size_t Index = 0; Index < InsertResults.size(); Index++)
+ for (size_t Index = 0; Index < InsertResults.size(); Index++)
+ {
+ if (InsertResults[Index].New)
{
- if (InsertResults[Index].New)
- {
- Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
- Info.AttachmentsStored.fetch_add(1);
- }
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
}
}
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to bulk save {} attachments", Chunks.size()),
- Ex.what());
- }
}
},
WorkerThreadPool::EMode::EnableBacklog);
}
+ catch (const RemoteStoreError&)
+ {
+ throw;
+ }
catch (const std::exception& Ex)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to bulk load {} attachments", NeededChunkIndexes.size()),
- Ex.what());
+ throw RemoteStoreError(fmt::format("Failed to bulk load {} attachments", NeededChunkIndexes.size()),
+ gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ Ex.what());
}
},
WorkerThreadPool::EMode::EnableBacklog);
};
void DownloadAndSaveBlock(LoadOplogContext& Context,
- Latch& AttachmentsDownloadLatch,
- Latch& AttachmentsWriteLatch,
- AsyncRemoteResult& RemoteResult,
+ ParallelWork& AttachmentWork,
DownloadInfo& Info,
Stopwatch& LoadAttachmentsTimer,
std::atomic_uint64_t& DownloadStartMS,
@@ -1332,23 +1317,20 @@ namespace remotestore_impl {
std::span<std::atomic<bool>> ChunkDownloadedFlags,
uint32_t RetriesLeft)
{
- AttachmentsDownloadLatch.AddCount(1);
- Context.NetworkWorkerPool.ScheduleWork(
- [&AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
+ AttachmentWork.ScheduleWork(
+ Context.NetworkWorkerPool,
+ [&AttachmentWork,
&Context,
- &RemoteResult,
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
RetriesLeft,
BlockHash = IoHash(BlockHash),
&AllNeededPartialChunkHashesLookup,
- ChunkDownloadedFlags]() {
+ ChunkDownloadedFlags](std::atomic<bool>& AbortFlag) {
ZEN_TRACE_CPU("DownloadBlock");
- auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
- if (RemoteResult.IsError())
+ if (AbortFlag)
{
return;
}
@@ -1376,11 +1358,11 @@ namespace remotestore_impl {
Info.MissingAttachmentCount.fetch_add(1);
if (!Context.IgnoreMissingAttachments)
{
- RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
+ throw RemoteStoreError(BlockResult.Reason, BlockResult.ErrorCode, BlockResult.Text);
}
return;
}
- if (RemoteResult.IsError())
+ if (AbortFlag)
{
return;
}
@@ -1401,12 +1383,10 @@ namespace remotestore_impl {
Info.AttachmentBlocksDownloaded.fetch_add(1);
Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
- AttachmentsWriteLatch.AddCount(1);
- Context.WorkerPool.ScheduleWork(
- [&AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
+ AttachmentWork.ScheduleWork(
+ Context.WorkerPool,
+ [&AttachmentWork,
&Context,
- &RemoteResult,
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
@@ -1414,9 +1394,8 @@ namespace remotestore_impl {
BlockHash = IoHash(BlockHash),
&AllNeededPartialChunkHashesLookup,
ChunkDownloadedFlags,
- Bytes = std::move(BlobBuffer)]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
+ Bytes = std::move(BlobBuffer)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
{
return;
}
@@ -1525,9 +1504,7 @@ namespace remotestore_impl {
ReportMessage(Context.OptionalJobContext, fmt::format("{}, retrying download", ErrorString));
return DownloadAndSaveBlock(Context,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
+ AttachmentWork,
Info,
LoadAttachmentsTimer,
DownloadStartMS,
@@ -1539,32 +1516,41 @@ namespace remotestore_impl {
else
{
ReportMessage(Context.OptionalJobContext, ErrorString);
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), ErrorString, {});
- return;
+ throw RemoteStoreError(ErrorString,
+ gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ {});
}
}
}
+ catch (const RemoteStoreError&)
+ {
+ throw;
+ }
catch (const std::exception& Ex)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to save block attachment {}", BlockHash),
- Ex.what());
+ throw RemoteStoreError(fmt::format("Failed to save block attachment {}", BlockHash),
+ gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ Ex.what());
}
},
WorkerThreadPool::EMode::EnableBacklog);
}
+ catch (const RemoteStoreError&)
+ {
+ throw;
+ }
catch (const std::exception& Ex)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to download block attachment {}", BlockHash),
- Ex.what());
+ throw RemoteStoreError(fmt::format("Failed to download block attachment {}", BlockHash),
+ gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ Ex.what());
}
},
WorkerThreadPool::EMode::EnableBacklog);
};
void DownloadPartialBlock(LoadOplogContext& Context,
- AsyncRemoteResult& RemoteResult,
+ std::atomic<bool>& AbortFlag,
DownloadInfo& Info,
double& DownloadTimeSeconds,
const ChunkBlockDescription& BlockDescription,
@@ -1593,7 +1579,7 @@ namespace remotestore_impl {
while (SubRangeCountComplete < SubBlockRangeCount)
{
- if (RemoteResult.IsError())
+ if (AbortFlag.load())
{
break;
}
@@ -1615,7 +1601,7 @@ namespace remotestore_impl {
SubRange.first,
SubRange.second);
DownloadTimeSeconds += CacheTimer.GetElapsedTimeMs() / 1000.0;
- if (RemoteResult.IsError())
+ if (AbortFlag.load())
{
break;
}
@@ -1636,7 +1622,7 @@ namespace remotestore_impl {
BuildStorageCache::BuildBlobRanges RangeBuffers =
Context.OptionalCache->GetBuildBlobRanges(Context.CacheBuildId, BlockDescription.BlockHash, SubRanges);
DownloadTimeSeconds += CacheTimer.GetElapsedTimeMs() / 1000.0;
- if (RemoteResult.IsError())
+ if (AbortFlag.load())
{
break;
}
@@ -1668,7 +1654,7 @@ namespace remotestore_impl {
RemoteProjectStore::LoadAttachmentRangesResult BlockResult =
Context.RemoteStore.LoadAttachmentRanges(BlockDescription.BlockHash, SubRanges);
DownloadTimeSeconds += BlockResult.ElapsedSeconds;
- if (RemoteResult.IsError())
+ if (AbortFlag.load())
{
break;
}
@@ -1683,8 +1669,7 @@ namespace remotestore_impl {
Info.MissingAttachmentCount.fetch_add(1);
if (!Context.IgnoreMissingAttachments)
{
- RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
- break;
+ throw RemoteStoreError(BlockResult.Reason, BlockResult.ErrorCode, BlockResult.Text);
}
}
else
@@ -1700,7 +1685,7 @@ namespace remotestore_impl {
BlockDescription.BlockHash,
ZenContentType::kCompressedBinary,
CompositeBuffer(std::vector<IoBuffer>{BlockResult.Bytes}));
- if (RemoteResult.IsError())
+ if (AbortFlag.load())
{
break;
}
@@ -1714,13 +1699,12 @@ namespace remotestore_impl {
{
if (BlockResult.Ranges.size() != SubRanges.size())
{
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Range response for block {} contains {} ranges, expected {} ranges",
- BlockDescription.BlockHash,
- BlockResult.Ranges.size(),
- SubRanges.size()),
- "");
- break;
+ throw RemoteStoreError(fmt::format("Range response for block {} contains {} ranges, expected {} ranges",
+ BlockDescription.BlockHash,
+ BlockResult.Ranges.size(),
+ SubRanges.size()),
+ gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ "");
}
OnDownloaded(std::move(BlockResult.Bytes), SubRangeStartIndex, BlockResult.Ranges);
}
@@ -1731,9 +1715,7 @@ namespace remotestore_impl {
}
void DownloadAndSavePartialBlock(LoadOplogContext& Context,
- Latch& AttachmentsDownloadLatch,
- Latch& AttachmentsWriteLatch,
- AsyncRemoteResult& RemoteResult,
+ ParallelWork& AttachmentWork,
DownloadInfo& Info,
Stopwatch& LoadAttachmentsTimer,
std::atomic_uint64_t& DownloadStartMS,
@@ -1746,12 +1728,10 @@ namespace remotestore_impl {
std::span<std::atomic<bool>> ChunkDownloadedFlags,
uint32_t RetriesLeft)
{
- AttachmentsDownloadLatch.AddCount(1);
- Context.NetworkWorkerPool.ScheduleWork(
- [&AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
+ AttachmentWork.ScheduleWork(
+ Context.NetworkWorkerPool,
+ [&AttachmentWork,
&Context,
- &RemoteResult,
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
@@ -1762,10 +1742,8 @@ namespace remotestore_impl {
BlockRangeCount,
&AllNeededPartialChunkHashesLookup,
ChunkDownloadedFlags,
- RetriesLeft]() {
+ RetriesLeft](std::atomic<bool>& AbortFlag) {
ZEN_TRACE_CPU("DownloadBlockRanges");
-
- auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
try
{
uint64_t Unset = (std::uint64_t)-1;
@@ -1776,7 +1754,7 @@ namespace remotestore_impl {
DownloadPartialBlock(
Context,
- RemoteResult,
+ AbortFlag,
Info,
DownloadElapsedSeconds,
BlockDescription,
@@ -1793,12 +1771,10 @@ namespace remotestore_impl {
Info.AttachmentBlockRangeBytesDownloaded.fetch_add(BlockPartSize);
Info.AttachmentBlocksRangesDownloaded++;
- AttachmentsWriteLatch.AddCount(1);
- Context.WorkerPool.ScheduleWork(
- [&AttachmentsWriteLatch,
+ AttachmentWork.ScheduleWork(
+ Context.WorkerPool,
+ [&AttachmentWork,
&Context,
- &AttachmentsDownloadLatch,
- &RemoteResult,
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
@@ -1811,8 +1787,8 @@ namespace remotestore_impl {
RetriesLeft,
BlockPayload = std::move(Buffer),
OffsetAndLengths =
- std::vector<std::pair<uint64_t, uint64_t>>(OffsetAndLengths.begin(), OffsetAndLengths.end())]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ std::vector<std::pair<uint64_t, uint64_t>>(OffsetAndLengths.begin(), OffsetAndLengths.end())](
+ std::atomic<bool>& AbortFlag) {
try
{
ZEN_ASSERT(BlockPayload.Size() > 0);
@@ -1820,7 +1796,7 @@ namespace remotestore_impl {
size_t RangeCount = OffsetAndLengths.size();
for (size_t RangeOffset = 0; RangeOffset < RangeCount; RangeOffset++)
{
- if (RemoteResult.IsError())
+ if (AbortFlag)
{
return;
}
@@ -1842,7 +1818,7 @@ namespace remotestore_impl {
ChunkBlockIndex < BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount;
ChunkBlockIndex++)
{
- if (RemoteResult.IsError())
+ if (AbortFlag)
{
break;
}
@@ -1904,9 +1880,7 @@ namespace remotestore_impl {
ReportMessage(Context.OptionalJobContext,
fmt::format("{}, retrying download", ErrorString));
return DownloadAndSavePartialBlock(Context,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
+ AttachmentWork,
Info,
LoadAttachmentsTimer,
DownloadStartMS,
@@ -1924,9 +1898,9 @@ namespace remotestore_impl {
Info.MissingAttachmentCount.fetch_add(1);
if (!Context.IgnoreMissingAttachments)
{
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound),
- "Malformed chunk block",
- ErrorString);
+ throw RemoteStoreError("Malformed chunk block",
+ gsl::narrow<int32_t>(HttpResponseCode::NotFound),
+ ErrorString);
}
}
else
@@ -1978,18 +1952,22 @@ namespace remotestore_impl {
}
}
}
+ catch (const RemoteStoreError&)
+ {
+ throw;
+ }
catch (const std::exception& Ex)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed saving {} ranges from block attachment {}",
- OffsetAndLengths.size(),
- BlockDescription.BlockHash),
- Ex.what());
+ throw RemoteStoreError(fmt::format("Failed saving {} ranges from block attachment {}",
+ OffsetAndLengths.size(),
+ BlockDescription.BlockHash),
+ gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ Ex.what());
}
},
WorkerThreadPool::EMode::EnableBacklog);
});
- if (!RemoteResult.IsError())
+ if (!AbortFlag)
{
ZEN_DEBUG("Loaded {} ranges from block attachment '{}' in {} ({})",
BlockRangeCount,
@@ -1998,39 +1976,33 @@ namespace remotestore_impl {
NiceBytes(DownloadedBytes));
}
}
+ catch (const RemoteStoreError&)
+ {
+ throw;
+ }
catch (const std::exception& Ex)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to download block attachment {} ranges", BlockDescription.BlockHash),
- Ex.what());
+ throw RemoteStoreError(fmt::format("Failed to download block attachment {} ranges", BlockDescription.BlockHash),
+ gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ Ex.what());
}
},
WorkerThreadPool::EMode::EnableBacklog);
};
void DownloadAndSaveAttachment(LoadOplogContext& Context,
- Latch& AttachmentsDownloadLatch,
- Latch& AttachmentsWriteLatch,
- AsyncRemoteResult& RemoteResult,
+ ParallelWork& AttachmentWork,
DownloadInfo& Info,
Stopwatch& LoadAttachmentsTimer,
std::atomic_uint64_t& DownloadStartMS,
const IoHash& RawHash)
{
- AttachmentsDownloadLatch.AddCount(1);
- Context.NetworkWorkerPool.ScheduleWork(
- [&Context,
- &RemoteResult,
- &AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- RawHash,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- &Info]() {
+ AttachmentWork.ScheduleWork(
+ Context.NetworkWorkerPool,
+ [&Context, &AttachmentWork, RawHash, &LoadAttachmentsTimer, &DownloadStartMS, &Info](std::atomic<bool>& AbortFlag) {
ZEN_TRACE_CPU("DownloadAttachment");
- auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
- if (RemoteResult.IsError())
+ if (AbortFlag)
{
return;
}
@@ -2056,7 +2028,7 @@ namespace remotestore_impl {
Info.MissingAttachmentCount.fetch_add(1);
if (!Context.IgnoreMissingAttachments)
{
- RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ throw RemoteStoreError(AttachmentResult.Reason, AttachmentResult.ErrorCode, AttachmentResult.Text);
}
return;
}
@@ -2074,7 +2046,7 @@ namespace remotestore_impl {
CompositeBuffer(SharedBuffer(BlobBuffer)));
}
}
- if (RemoteResult.IsError())
+ if (AbortFlag)
{
return;
}
@@ -2083,42 +2055,36 @@ namespace remotestore_impl {
Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
ZEN_ASSERT(BlobBuffer);
- AttachmentsWriteLatch.AddCount(1);
- Context.WorkerPool.ScheduleWork(
- [&Context, &AttachmentsWriteLatch, &RemoteResult, &Info, RawHash, AttachmentSize, Bytes = std::move(BlobBuffer)]() {
+ AttachmentWork.ScheduleWork(
+ Context.WorkerPool,
+ [&Context, &Info, RawHash, AttachmentSize, Bytes = std::move(BlobBuffer)](std::atomic<bool>& AbortFlag) {
ZEN_TRACE_CPU("WriteAttachment");
ZEN_ASSERT(Bytes);
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
+ if (AbortFlag)
{
return;
}
- try
- {
- CidStore::InsertResult InsertResult = Context.ChunkStore.AddChunk(Bytes, RawHash);
- if (InsertResult.New)
- {
- Info.AttachmentBytesStored.fetch_add(AttachmentSize);
- Info.AttachmentsStored.fetch_add(1);
- }
- Info.ChunksCompleteCount++;
- }
- catch (const std::exception& Ex)
+ CidStore::InsertResult InsertResult = Context.ChunkStore.AddChunk(Bytes, RawHash);
+ if (InsertResult.New)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Saving attachment {} failed", RawHash),
- Ex.what());
+ Info.AttachmentBytesStored.fetch_add(AttachmentSize);
+ Info.AttachmentsStored.fetch_add(1);
}
+ Info.ChunksCompleteCount++;
},
WorkerThreadPool::EMode::EnableBacklog);
}
+ catch (const RemoteStoreError&)
+ {
+ throw;
+ }
catch (const std::exception& Ex)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Loading attachment {} failed", RawHash),
- Ex.what());
+ throw RemoteStoreError(fmt::format("Loading attachment {} failed", RawHash),
+ gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ Ex.what());
}
},
WorkerThreadPool::EMode::EnableBacklog);
@@ -2342,7 +2308,6 @@ namespace remotestore_impl {
RemoteStore.SaveAttachment(std::move(Payload), RawHash, std::move(Block));
if (Result.ErrorCode)
{
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
ReportMessage(OptionalContext,
fmt::format("Failed to save attachment '{}', {} ({}): {}",
RawHash,
@@ -2456,7 +2421,6 @@ namespace remotestore_impl {
RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
if (Result.ErrorCode)
{
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
ReportMessage(OptionalContext,
fmt::format("Failed to save attachments with {} chunks ({}): {}",
NeededChunks.size(),
@@ -3872,10 +3836,10 @@ LoadOplog(LoadOplogContext&& Context)
NiceBytes(LoadContainerResult.ContainerObject.GetSize())));
Info.OplogSizeBytes = LoadContainerResult.ContainerObject.GetSize();
- remotestore_impl::AsyncRemoteResult RemoteResult;
- Latch AttachmentsDownloadLatch(1);
- Latch AttachmentsWriteLatch(1);
- std::atomic_size_t AttachmentCount = 0;
+ std::atomic<bool> AbortFlag(false);
+ std::atomic<bool> PauseFlag(false);
+ ParallelWork AttachmentWork(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ std::atomic_size_t AttachmentCount = 0;
Stopwatch LoadAttachmentsTimer;
std::atomic_uint64_t DownloadStartMS = (std::uint64_t)-1;
@@ -3901,17 +3865,16 @@ LoadOplog(LoadOplogContext&& Context)
std::vector<NeededBlockDownload> NeededBlockDownloads;
auto OnNeedBlock = [&Context,
- &AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
+ &AttachmentWork,
+ &AbortFlag,
&AttachmentCount,
- &RemoteResult,
&BlockCountToDownload,
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
&NeededBlockDownloads](ThinChunkBlockDescription&& ThinBlockDescription,
std::vector<uint32_t>&& NeededChunkIndexes) {
- if (RemoteResult.IsError())
+ if (AbortFlag.load())
{
return;
}
@@ -3921,9 +3884,7 @@ LoadOplog(LoadOplogContext&& Context)
if (ThinBlockDescription.BlockHash == IoHash::Zero)
{
DownloadAndSaveBlockChunks(Context,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
+ AttachmentWork,
Info,
LoadAttachmentsTimer,
DownloadStartMS,
@@ -3939,12 +3900,12 @@ LoadOplog(LoadOplogContext&& Context)
std::vector<IoHash> AttachmentsToDownload;
- auto OnNeedAttachment = [&AttachmentsToDownload, &RemoteResult, &Attachments, &AttachmentCount](const IoHash& RawHash) {
+ auto OnNeedAttachment = [&AttachmentsToDownload, &AbortFlag, &Attachments, &AttachmentCount](const IoHash& RawHash) {
if (!Attachments.insert(RawHash).second)
{
return;
}
- if (RemoteResult.IsError())
+ if (AbortFlag.load())
{
return;
}
@@ -3961,6 +3922,7 @@ LoadOplog(LoadOplogContext&& Context)
Context.Oplog.EnableUpdateCapture();
auto _ = MakeGuard([&Context]() { Context.Oplog.DisableUpdateCapture(); });
+ RemoteProjectStore::Result LoadResult;
CbObject OplogSection;
RemoteProjectStore::Result Result = ParseOplogContainer(LoadContainerResult.ContainerObject,
OnReferencedAttachments,
@@ -3972,7 +3934,8 @@ LoadOplog(LoadOplogContext&& Context)
Context.OptionalJobContext);
if (Result.ErrorCode != 0)
{
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ AbortFlag = true;
+ LoadResult = {.ErrorCode = Result.ErrorCode, .Reason = Result.Reason, .Text = Result.Text};
}
remotestore_impl::ReportMessage(Context.OptionalJobContext,
fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download",
@@ -4023,9 +3986,7 @@ LoadOplog(LoadOplogContext&& Context)
{
// Fall back to full download as we can't get enough information about the block
DownloadAndSaveBlock(Context,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
+ AttachmentWork,
Info,
LoadAttachmentsTimer,
DownloadStartMS,
@@ -4153,9 +4114,7 @@ LoadOplog(LoadOplogContext&& Context)
for (uint32_t FullBlockIndex : PartialBlocksResult.FullBlockIndexes)
{
DownloadAndSaveBlock(Context,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
+ AttachmentWork,
Info,
LoadAttachmentsTimer,
DownloadStartMS,
@@ -4177,9 +4136,7 @@ LoadOplog(LoadOplogContext&& Context)
}
DownloadAndSavePartialBlock(Context,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
+ AttachmentWork,
Info,
LoadAttachmentsTimer,
DownloadStartMS,
@@ -4197,27 +4154,16 @@ LoadOplog(LoadOplogContext&& Context)
for (const IoHash& AttachmentToDownload : AttachmentsToDownload)
{
- DownloadAndSaveAttachment(Context,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- AttachmentToDownload);
+ DownloadAndSaveAttachment(Context, AttachmentWork, Info, LoadAttachmentsTimer, DownloadStartMS, AttachmentToDownload);
}
uint64_t TotalChunksToDownload = AllNeededChunkHashes.size() + AttachmentsToDownload.size();
- AttachmentsDownloadLatch.CountDown();
+ try
{
- while (!AttachmentsDownloadLatch.Wait(1000))
- {
- if (remotestore_impl::IsCancelled(Context.OptionalJobContext))
+ AttachmentWork.Wait(1000, [&](bool /*IsAborted*/, bool /*IsPaused*/, std::ptrdiff_t /*Pending*/) {
+ if (remotestore_impl::IsCancelled(Context.OptionalJobContext) && !AbortFlag)
{
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- }
+ AbortFlag = true;
}
uint64_t PartialTransferWallTimeMS = TransferWallTimeMS;
if (DownloadStartMS != (uint64_t)-1)
@@ -4246,45 +4192,26 @@ LoadOplog(LoadOplogContext&& Context)
TotalChunksToDownload,
TotalChunksToDownload - CompletedChunkCount,
AttachmentsDownloadProgressTimer.GetElapsedTimeMs());
- }
+ });
}
- if (DownloadStartMS != (uint64_t)-1)
+ catch (const RemoteStoreError& Ex)
{
- TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
+ if (!LoadResult.ErrorCode)
+ {
+ LoadResult = {.ErrorCode = Ex.GetErrorCode(), .Reason = Ex.what(), .Text = std::string(Ex.GetText())};
+ }
}
-
- AttachmentsWriteLatch.CountDown();
+ catch (const std::exception& Ex)
{
- while (!AttachmentsWriteLatch.Wait(1000))
+ if (!LoadResult.ErrorCode)
{
- if (remotestore_impl::IsCancelled(Context.OptionalJobContext))
- {
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- }
- }
-
- uint64_t CompletedChunkCount = Info.ChunksCompleteCount.load();
-
- uint64_t AttachmentsDownloaded =
- Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load();
- uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() +
- Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load();
-
- remotestore_impl::ReportProgress(Context.OptionalJobContext,
- "Loading attachments"sv,
- fmt::format("{}/{} ({}) chunks. {} ({}) blobs downloaded.",
- CompletedChunkCount,
- TotalChunksToDownload,
- NiceBytes(Info.AttachmentBytesStored.load()),
- AttachmentsDownloaded,
- NiceBytes(AttachmentBytesDownloaded)),
- TotalChunksToDownload,
- TotalChunksToDownload - CompletedChunkCount,
- AttachmentsDownloadProgressTimer.GetElapsedTimeMs());
+ LoadResult = {.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError), .Reason = Ex.what()};
}
}
+ if (DownloadStartMS != (uint64_t)-1)
+ {
+ TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
+ }
if (AttachmentCount.load() > 0)
{
@@ -4295,25 +4222,24 @@ LoadOplog(LoadOplogContext&& Context)
0,
AttachmentsDownloadProgressTimer.GetElapsedTimeMs());
}
-
- if (Result.ErrorCode == 0)
+ if (LoadResult.ErrorCode == 0)
{
if (!FilesToDechunk.empty())
{
remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size()));
- Latch DechunkLatch(1);
+ ParallelWork DechunkWork(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
std::filesystem::path TempFilePath = Context.Oplog.TempPath();
for (size_t ChunkedIndex = 0; ChunkedIndex < FilesToDechunk.size(); ChunkedIndex++)
{
const ChunkedInfo& Chunked = FilesToDechunk[ChunkedIndex];
std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString();
- DechunkLatch.AddCount(1);
- Context.WorkerPool.ScheduleWork(
- [&Context, &DechunkLatch, TempFileName, &FilesToDechunk, ChunkedIndex, &RemoteResult, &Info]() {
+ DechunkWork.ScheduleWork(
+ Context.WorkerPool,
+ [&Context, TempFileName, &FilesToDechunk, ChunkedIndex, &Info](std::atomic<bool>& AbortFlag) {
ZEN_TRACE_CPU("DechunkAttachment");
- auto _ = MakeGuard([&DechunkLatch, &TempFileName] {
+ auto _ = MakeGuard([&TempFileName] {
std::error_code Ec;
if (IsFile(TempFileName, Ec))
{
@@ -4323,13 +4249,12 @@ LoadOplog(LoadOplogContext&& Context)
ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message());
}
}
- DechunkLatch.CountDown();
});
const ChunkedInfo& Chunked = FilesToDechunk[ChunkedIndex];
try
{
- if (RemoteResult.IsError())
+ if (AbortFlag.load())
{
return;
}
@@ -4342,12 +4267,11 @@ LoadOplog(LoadOplogContext&& Context)
TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate, Ec);
if (Ec)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- "Write error",
- fmt::format("Failed to open temp file {} for chunked attachment {}",
- TempFileName,
- Chunked.RawHash));
- return;
+ throw RemoteStoreError("Write error",
+ gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to open temp file {} for chunked attachment {}",
+ TempFileName,
+ Chunked.RawHash));
}
else
{
@@ -4357,7 +4281,7 @@ LoadOplog(LoadOplogContext&& Context)
BLAKE3Stream HashingStream;
for (std::uint32_t SequenceIndex : Chunked.ChunkSequence)
{
- if (RemoteResult.IsError())
+ if (AbortFlag.load())
{
return;
}
@@ -4374,9 +4298,9 @@ LoadOplog(LoadOplogContext&& Context)
Info.MissingAttachmentCount.fetch_add(1);
if (!Context.IgnoreMissingAttachments)
{
- RemoteResult.SetError(
- gsl::narrow<int>(HttpResponseCode::NotFound),
+ throw RemoteStoreError(
"Missing chunk",
+ gsl::narrow<int>(HttpResponseCode::NotFound),
fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
}
return;
@@ -4403,9 +4327,9 @@ LoadOplog(LoadOplogContext&& Context)
Info.MissingAttachmentCount.fetch_add(1);
if (!Context.IgnoreMissingAttachments)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- "Missing chunk",
- Message);
+ throw RemoteStoreError("Missing chunk",
+ gsl::narrow<int>(HttpResponseCode::NotFound),
+ Message);
}
return;
}
@@ -4443,9 +4367,9 @@ LoadOplog(LoadOplogContext&& Context)
Info.MissingAttachmentCount.fetch_add(1);
if (!Context.IgnoreMissingAttachments)
{
- RemoteResult.SetError(
- gsl::narrow<int>(HttpResponseCode::NotFound),
+ throw RemoteStoreError(
"Missing chunk",
+ gsl::narrow<int>(HttpResponseCode::NotFound),
fmt::format("Failed to decompress chunk {} for chunked attachment {}",
ChunkHash,
Chunked.RawHash));
@@ -4477,37 +4401,49 @@ LoadOplog(LoadOplogContext&& Context)
NiceBytes(Chunked.RawSize),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
+ catch (const RemoteStoreError&)
+ {
+ throw;
+ }
catch (const std::exception& Ex)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to dechunk file {}", Chunked.RawHash),
- Ex.what());
+ throw RemoteStoreError(fmt::format("Failed to dechunk file {}", Chunked.RawHash),
+ gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ Ex.what());
}
},
WorkerThreadPool::EMode::EnableBacklog);
}
Stopwatch DechunkProgressTimer;
- DechunkLatch.CountDown();
- while (!DechunkLatch.Wait(1000))
+ try
{
- ptrdiff_t Remaining = DechunkLatch.Remaining();
- if (remotestore_impl::IsCancelled(Context.OptionalJobContext))
- {
- if (!RemoteResult.IsError())
+ DechunkWork.Wait(1000, [&](bool /*IsAborted*/, bool /*IsPaused*/, std::ptrdiff_t Remaining) {
+ if (remotestore_impl::IsCancelled(Context.OptionalJobContext) && !AbortFlag)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(
- Context.OptionalJobContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ AbortFlag = true;
}
+ remotestore_impl::ReportProgress(Context.OptionalJobContext,
+ "Dechunking attachments"sv,
+ fmt::format("{} remaining...", Remaining),
+ FilesToDechunk.size(),
+ Remaining,
+ DechunkProgressTimer.GetElapsedTimeMs());
+ });
+ }
+ catch (const RemoteStoreError& Ex)
+ {
+ if (!LoadResult.ErrorCode)
+ {
+ LoadResult = {.ErrorCode = Ex.GetErrorCode(), .Reason = Ex.what(), .Text = std::string(Ex.GetText())};
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ if (!LoadResult.ErrorCode)
+ {
+ LoadResult = {.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError), .Reason = Ex.what()};
}
- remotestore_impl::ReportProgress(Context.OptionalJobContext,
- "Dechunking attachments"sv,
- fmt::format("{} remaining...", Remaining),
- FilesToDechunk.size(),
- Remaining,
- DechunkProgressTimer.GetElapsedTimeMs());
}
remotestore_impl::ReportProgress(Context.OptionalJobContext,
"Dechunking attachments"sv,
@@ -4516,8 +4452,8 @@ LoadOplog(LoadOplogContext&& Context)
0,
DechunkProgressTimer.GetElapsedTimeMs());
}
- Result = RemoteResult.ConvertResult();
}
+ Result = LoadResult;
if (Result.ErrorCode == 0)
{
@@ -4541,7 +4477,6 @@ LoadOplog(LoadOplogContext&& Context)
Result = remotestore_impl::WriteOplogSection(Context.Oplog, OplogSection, Context.OptionalJobContext);
if (Result.ErrorCode)
{
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
remotestore_impl::ReportMessage(Context.OptionalJobContext,
fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason));
}
@@ -6659,6 +6594,156 @@ TEST_CASE("project.store.embed_loose_files_already_resolved")
/*ForceDisableBlocks=*/false);
}
+TEST_CASE("project.store.import.error.missing_attachment")
+{
+ // Export a small oplog (blocks disabled to avoid pre-existing ZEN_ASSERT), delete one
+ // attachment file from the remote store, then verify that LoadOplog returns a non-zero
+ // error code when IgnoreMissingAttachments=false.
+ using namespace projectstore_testutils;
+
+ ScopedTemporaryDirectory TempDir;
+ ScopedTemporaryDirectory ExportDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ std::unique_ptr<ProjectStore> ProjectStoreDummy;
+ Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy);
+
+ Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog_missing_att", {});
+ REQUIRE(Oplog);
+
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{512, 1024})));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{2048, 3000})));
+
+ uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u);
+ uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u;
+ WorkerThreadPool WorkerPool(WorkerCount);
+ WorkerThreadPool NetworkPool(NetworkWorkerCount);
+
+ std::shared_ptr<RemoteProjectStore> RemoteStore;
+ RunSaveOplog(CidStore,
+ *Project,
+ *Oplog,
+ NetworkPool,
+ WorkerPool,
+ ExportDir.Path(),
+ "oplog_missing_att",
+ 64u * 1024u,
+ 1000,
+ 32u * 1024u,
+ /*EmbedLooseFiles=*/false,
+ /*ForceUpload=*/false,
+ /*IgnoreMissingAttachments=*/false,
+ /*OptionalContext=*/nullptr,
+ /*ForceDisableBlocks=*/true,
+ &RemoteStore);
+
+ // Find and delete one .blob attachment file from the remote store directory.
+ std::filesystem::path DeletedBlob;
+ for (const auto& Entry : std::filesystem::recursive_directory_iterator(ExportDir.Path()))
+ {
+ if (Entry.path().extension() == ".blob")
+ {
+ DeletedBlob = Entry.path();
+ break;
+ }
+ }
+ REQUIRE(!DeletedBlob.empty());
+ std::error_code Ec;
+ std::filesystem::remove(DeletedBlob, Ec);
+ REQUIRE(!Ec);
+
+ Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_missing_att_import", {});
+ REQUIRE(ImportOplog);
+
+ CapturingJobContext Ctx;
+ RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ .RemoteStore = *RemoteStore,
+ .Oplog = *ImportOplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = true,
+ .IgnoreMissingAttachments = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed,
+ .OptionalJobContext = &Ctx});
+ CHECK(ImportResult.ErrorCode != 0);
+}
+
+TEST_CASE("project.store.import.ignore_missing_attachment")
+{
+ // Same setup as project.store.import.error.missing_attachment, but with
+ // IgnoreMissingAttachments=true. LoadOplog should succeed (ErrorCode == 0)
+ // despite the missing attachment file.
+ using namespace projectstore_testutils;
+
+ ScopedTemporaryDirectory TempDir;
+ ScopedTemporaryDirectory ExportDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ std::unique_ptr<ProjectStore> ProjectStoreDummy;
+ Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy);
+
+ Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog_ignore_missing", {});
+ REQUIRE(Oplog);
+
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{512, 1024})));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{2048, 3000})));
+
+ uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u);
+ uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u;
+ WorkerThreadPool WorkerPool(WorkerCount);
+ WorkerThreadPool NetworkPool(NetworkWorkerCount);
+
+ std::shared_ptr<RemoteProjectStore> RemoteStore;
+ RunSaveOplog(CidStore,
+ *Project,
+ *Oplog,
+ NetworkPool,
+ WorkerPool,
+ ExportDir.Path(),
+ "oplog_ignore_missing",
+ 64u * 1024u,
+ 1000,
+ 32u * 1024u,
+ /*EmbedLooseFiles=*/false,
+ /*ForceUpload=*/false,
+ /*IgnoreMissingAttachments=*/false,
+ /*OptionalContext=*/nullptr,
+ /*ForceDisableBlocks=*/true,
+ &RemoteStore);
+
+ // Find and delete one .blob attachment file.
+ std::filesystem::path DeletedBlob;
+ for (const auto& Entry : std::filesystem::recursive_directory_iterator(ExportDir.Path()))
+ {
+ if (Entry.path().extension() == ".blob")
+ {
+ DeletedBlob = Entry.path();
+ break;
+ }
+ }
+ REQUIRE(!DeletedBlob.empty());
+ std::error_code Ec;
+ std::filesystem::remove(DeletedBlob, Ec);
+ REQUIRE(!Ec);
+
+ Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_ignore_missing_import", {});
+ REQUIRE(ImportOplog);
+
+ CapturingJobContext Ctx;
+ RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ .RemoteStore = *RemoteStore,
+ .Oplog = *ImportOplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = true,
+ .IgnoreMissingAttachments = true,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed,
+ .OptionalJobContext = &Ctx});
+ CHECK(ImportResult.ErrorCode == 0);
+}
+
TEST_CASE("project.store.blockcomposer.path_a_standalone_block")
{
// Path A: a single op with exactly MaxChunksPerBlock (4) chunks.