aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp345
1 files changed, 117 insertions, 228 deletions
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index 55f40d223..13b2e7b1e 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -69,34 +69,6 @@ namespace zen {
namespace remotestore_impl {
using namespace std::literals;
- ////////////////////////////// AsyncRemoteResult
-
- struct AsyncRemoteResult
- {
- void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText)
- {
- int32_t Expected = 0;
- if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1))
- {
- m_ErrorReason = ErrorReason;
- m_ErrorText = ErrorText;
- }
- }
- bool IsError() const { return m_ErrorCode.load() != 0; }
- int GetError() const { return m_ErrorCode.load(); };
- const std::string& GetErrorReason() const { return m_ErrorReason; };
- const std::string& GetErrorText() const { return m_ErrorText; };
- RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const
- {
- return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText};
- }
-
- private:
- std::atomic<int32_t> m_ErrorCode = 0;
- std::string m_ErrorReason;
- std::string m_ErrorText;
- };
-
void ReportProgress(JobContext* OptionalContext,
std::string_view CurrentOp,
std::string_view Details,
@@ -462,9 +434,7 @@ namespace remotestore_impl {
}
}
IoBuffer TempAttachmentBuffer = IoBufferBuilder::MakeFromFile(AttachmentPath);
- ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress());
CompressedFile.Close();
- ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress());
TempAttachmentBuffer.SetDeleteOnClose(true);
ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress());
return TempAttachmentBuffer;
@@ -1032,7 +1002,7 @@ namespace remotestore_impl {
if (CompressedSize > MaxChunkEmbedSize)
{
- TGetAttachmentBufferFunc FetchFunc = [Data = std::move(TempAttachmentBuffer)](const IoHash&) {
+ TGetAttachmentBufferFunc FetchFunc = [Data = std::move(TempAttachmentBuffer)](const IoHash&) mutable {
return CompositeBuffer(SharedBuffer(std::move(Data)));
};
@@ -2156,7 +2126,6 @@ namespace remotestore_impl {
const std::unordered_set<IoHash, IoHash::Hasher>& Needs,
bool ForceAll,
UploadInfo& Info,
- AsyncRemoteResult& RemoteResult,
JobContext* OptionalContext)
{
using namespace std::literals;
@@ -2217,22 +2186,15 @@ namespace remotestore_impl {
if (!UnknownAttachments.empty())
{
- RemoteResult.SetError(
- gsl::narrow<int>(HttpResponseCode::NotFound),
+ throw RemoteStoreError(
fmt::format("Upload requested of {} missing attachments, the base container referenced blocks that are no longer available",
UnknownAttachments.size()),
+ gsl::narrow<int>(HttpResponseCode::NotFound),
"");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return;
}
if (IsCancelled(OptionalContext))
{
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- }
return;
}
@@ -2245,121 +2207,91 @@ namespace remotestore_impl {
Stopwatch Timer;
- ptrdiff_t AttachmentsToSave(0);
- Latch SaveAttachmentsLatch(1);
+ std::atomic<bool> AbortFlag(false);
+ std::atomic<bool> PauseFlag(false);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ ptrdiff_t AttachmentsToSave(0);
for (const IoHash& RawHash : AttachmentsToUpload)
{
- if (RemoteResult.IsError())
+ if (AbortFlag.load())
{
break;
}
- SaveAttachmentsLatch.AddCount(1);
AttachmentsToSave++;
- WorkerPool.ScheduleWork(
- [&ChunkStore,
- &RemoteStore,
- &SaveAttachmentsLatch,
- &RemoteResult,
- RawHash,
- &CreatedBlocks,
- &LooseFileAttachments,
- &Info,
- OptionalContext]() {
+ Work.ScheduleWork(
+ WorkerPool,
+ [&ChunkStore, &RemoteStore, RawHash, &CreatedBlocks, &LooseFileAttachments, &Info, OptionalContext](
+ std::atomic<bool>& AbortFlag) {
ZEN_TRACE_CPU("UploadAttachment");
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
+ if (AbortFlag.load())
{
return;
}
- try
+ CompositeBuffer Payload;
+ ChunkBlockDescription Block;
+ if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
{
- CompositeBuffer Payload;
- ChunkBlockDescription Block;
- if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
- {
- Payload = BlockIt->second.Payload;
- Block = BlockIt->second.Block;
- }
- else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
- {
- Payload = LooseTmpFileIt->second(RawHash);
- }
- else
- {
- Payload = CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash)));
- }
- if (!Payload)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to find attachment {}", RawHash),
- {});
- ZEN_WARN("Failed to save attachment '{}' ({}): {}",
- RawHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
- return;
- }
- const bool IsBlock = Block.BlockHash == RawHash;
- size_t PayloadSize = Payload.GetSize();
- RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(std::move(Payload), RawHash, std::move(Block));
- if (Result.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachment '{}', {} ({}): {}",
- RawHash,
- NiceBytes(PayloadSize),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- return;
- }
- if (IsBlock)
- {
- Info.AttachmentBlocksUploaded.fetch_add(1);
- Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize);
- ZEN_INFO("Saved block attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(PayloadSize));
- }
- else
- {
- Info.AttachmentsUploaded.fetch_add(1);
- Info.AttachmentBytesUploaded.fetch_add(PayloadSize);
- ZEN_INFO("Saved large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(PayloadSize));
- }
+ Payload = BlockIt->second.Payload;
+ Block = BlockIt->second.Block;
}
- catch (const std::exception& Ex)
+ else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to upload attachment {}", RawHash),
- Ex.what());
+ Payload = LooseTmpFileIt->second(RawHash);
}
- },
- WorkerThreadPool::EMode::EnableBacklog);
+ else
+ {
+ Payload = CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash)));
+ }
+ if (!Payload)
+ {
+ throw RemoteStoreError(fmt::format("Failed to find attachment {}", RawHash),
+ gsl::narrow<int>(HttpResponseCode::NotFound),
+ {});
+ }
+ const bool IsBlock = Block.BlockHash == RawHash;
+ size_t PayloadSize = Payload.GetSize();
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(std::move(Payload), RawHash, std::move(Block));
+ if (Result.ErrorCode)
+ {
+ throw RemoteStoreError(fmt::format("Failed to save attachment '{}', {}", RawHash, NiceBytes(PayloadSize)),
+ Result.ErrorCode,
+ Result.Text);
+ }
+ if (IsBlock)
+ {
+ Info.AttachmentBlocksUploaded.fetch_add(1);
+ Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize);
+ ZEN_INFO("Saved block attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(PayloadSize));
+ }
+ else
+ {
+ Info.AttachmentsUploaded.fetch_add(1);
+ Info.AttachmentBytesUploaded.fetch_add(PayloadSize);
+ ZEN_INFO("Saved large attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(PayloadSize));
+ }
+ });
}
if (IsCancelled(OptionalContext))
{
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- }
- return;
+ AbortFlag = true;
}
if (!BulkBlockAttachmentsToUpload.empty())
{
for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& Chunks : BlockChunks)
{
- if (RemoteResult.IsError())
+ if (AbortFlag.load())
{
break;
}
@@ -2379,87 +2311,62 @@ namespace remotestore_impl {
continue;
}
- SaveAttachmentsLatch.AddCount(1);
AttachmentsToSave++;
- WorkerPool.ScheduleWork(
+ Work.ScheduleWork(
+ WorkerPool,
[&RemoteStore,
&ChunkStore,
- &SaveAttachmentsLatch,
- &RemoteResult,
NeededChunks = std::move(NeededChunks),
&BulkBlockAttachmentsToUpload,
&Info,
- OptionalContext]() {
+ OptionalContext](std::atomic<bool>& AbortFlag) {
ZEN_TRACE_CPU("UploadChunk");
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
+ if (AbortFlag.load())
{
return;
}
- try
+ size_t ChunksSize = 0;
+ std::vector<SharedBuffer> ChunkBuffers;
+ ChunkBuffers.reserve(NeededChunks.size());
+ for (const IoHash& Chunk : NeededChunks)
{
- size_t ChunksSize = 0;
- std::vector<SharedBuffer> ChunkBuffers;
- ChunkBuffers.reserve(NeededChunks.size());
- for (const IoHash& Chunk : NeededChunks)
+ auto It = BulkBlockAttachmentsToUpload.find(Chunk);
+ ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end());
+ CompositeBuffer ChunkPayload = It->second(It->first).second;
+ if (!ChunkPayload)
{
- auto It = BulkBlockAttachmentsToUpload.find(Chunk);
- ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end());
- CompositeBuffer ChunkPayload = It->second(It->first).second;
- if (!ChunkPayload)
- {
- RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
- fmt::format("Missing chunk {}"sv, Chunk),
- fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
- ChunkBuffers.clear();
- break;
- }
- ChunksSize += ChunkPayload.GetSize();
- ChunkBuffers.emplace_back(SharedBuffer(ChunkPayload.Flatten().AsIoBuffer()));
- }
- RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
- if (Result.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachments with {} chunks ({}): {}",
- NeededChunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- return;
+ throw RemoteStoreError(fmt::format("Missing chunk {}"sv, Chunk),
+ static_cast<int32_t>(HttpResponseCode::NotFound),
+ fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
}
- Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size());
- Info.AttachmentBytesUploaded.fetch_add(ChunksSize);
-
- ZEN_INFO("Saved {} bulk attachments in {} ({})",
- NeededChunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(ChunksSize));
+ ChunksSize += ChunkPayload.GetSize();
+ ChunkBuffers.emplace_back(SharedBuffer(ChunkPayload.Flatten().AsIoBuffer()));
}
- catch (const std::exception& Ex)
+ RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
+ if (Result.ErrorCode)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to bulk upload {} attachments", NeededChunks.size()),
- Ex.what());
+ throw RemoteStoreError(fmt::format("Failed to save attachments with {} chunks", NeededChunks.size()),
+ Result.ErrorCode,
+ Result.Text);
}
- },
- WorkerThreadPool::EMode::EnableBacklog);
+ Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size());
+ Info.AttachmentBytesUploaded.fetch_add(ChunksSize);
+
+ ZEN_INFO("Saved {} bulk attachments in {} ({})",
+ NeededChunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(ChunksSize));
+ });
}
}
Stopwatch SaveAttachmentsProgressTimer;
- SaveAttachmentsLatch.CountDown();
- while (!SaveAttachmentsLatch.Wait(1000))
- {
- ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining();
- if (IsCancelled(OptionalContext))
+ Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t Remaining) {
+ ZEN_UNUSED(IsAborted, IsPaused);
+ if (IsCancelled(OptionalContext) && !AbortFlag.load())
{
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- }
+ AbortFlag = true;
}
uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs();
ReportProgress(OptionalContext,
@@ -2470,7 +2377,7 @@ namespace remotestore_impl {
AttachmentsToSave,
Remaining,
SaveAttachmentsProgressTimer.GetElapsedTimeMs());
- }
+ });
uint64_t ElapsedTimeMS = Timer.GetElapsedTimeMs();
if (AttachmentsToSave > 0)
{
@@ -2732,12 +2639,13 @@ BuildContainer(CidStore& ChunkStore,
}
else
{
+ const size_t TotalAttachmentCount = UploadAttachments.size() + ReusedAttachmentCount;
remotestore_impl::ReportMessage(OptionalContext,
fmt::format("Resolving {} attachments from {} ops ({} ({:.1f}%) found in existing blocks)",
UploadAttachments.size(),
TotalOpCount,
ReusedAttachmentCount,
- (100.f * ReusedAttachmentCount) / UploadAttachments.size()));
+ (100.f * ReusedAttachmentCount) / TotalAttachmentCount));
ResolveAttachments(ChunkStore,
WorkerPool,
@@ -2926,11 +2834,12 @@ BuildContainer(CidStore& ChunkStore,
if (auto It = LooseUploadAttachments.find(AttachmentHash); It != LooseUploadAttachments.end())
{
uint64_t RawSize = It->second.first;
- IoBuffer Payload = It->second.second;
- return [RawSize = RawSize, Payload = Payload](const IoHash& ChunkHash) -> std::pair<uint64_t, CompositeBuffer> {
- ZEN_UNUSED(ChunkHash);
- return {RawSize, CompositeBuffer(SharedBuffer(std::move(Payload)))};
- };
+ IoBuffer Payload = std::move(It->second.second);
+ return
+ [RawSize, Payload = std::move(Payload)](const IoHash& ChunkHash) mutable -> std::pair<uint64_t, CompositeBuffer> {
+ ZEN_UNUSED(ChunkHash);
+ return {RawSize, CompositeBuffer(SharedBuffer(std::move(Payload)))};
+ };
}
else
{
@@ -3433,8 +3342,7 @@ SaveOplog(CidStore& ChunkStore,
}
{
- Stopwatch UploadAttachmentsTimer;
- remotestore_impl::AsyncRemoteResult RemoteResult;
+ Stopwatch UploadAttachmentsTimer;
UploadAttachments(NetworkWorkerPool,
ChunkStore,
RemoteStore,
@@ -3445,17 +3353,8 @@ SaveOplog(CidStore& ChunkStore,
ContainerSaveResult.Needs,
ForceUpload,
Info,
- RemoteResult,
OptionalContext);
TransferWallTimeMS += UploadAttachmentsTimer.GetElapsedTimeMs();
- if (RemoteResult.IsError())
- {
- throw RemoteStoreError(fmt::format("Failed to upload attachments for oplog '{}': {}",
- RemoteStoreInfo.ContainerName,
- RemoteResult.GetErrorReason()),
- RemoteResult.GetError(),
- RemoteResult.GetErrorText());
- }
const uint32_t MaxTries = 8;
uint32_t Try = 0;
@@ -3520,17 +3419,8 @@ SaveOplog(CidStore& ChunkStore,
ContainerFinalizeResult.Needs,
false,
Info,
- RemoteResult,
OptionalContext);
TransferWallTimeMS += RetryUploadAttachmentsTimer.GetElapsedTimeMs();
- if (RemoteResult.IsError())
- {
- throw RemoteStoreError(fmt::format("Failed to upload attachments for oplog '{}': {}",
- RemoteStoreInfo.ContainerName,
- RemoteResult.GetErrorReason()),
- RemoteResult.GetError(),
- RemoteResult.GetErrorText());
- }
}
}
@@ -3922,7 +3812,6 @@ LoadOplog(LoadOplogContext&& Context)
Context.Oplog.EnableUpdateCapture();
auto _ = MakeGuard([&Context]() { Context.Oplog.DisableUpdateCapture(); });
- RemoteProjectStore::Result LoadResult;
CbObject OplogSection;
RemoteProjectStore::Result Result = ParseOplogContainer(LoadContainerResult.ContainerObject,
OnReferencedAttachments,
@@ -3934,8 +3823,9 @@ LoadOplog(LoadOplogContext&& Context)
Context.OptionalJobContext);
if (Result.ErrorCode != 0)
{
- AbortFlag = true;
- LoadResult = {.ErrorCode = Result.ErrorCode, .Reason = Result.Reason, .Text = Result.Text};
+ AbortFlag = true;
+ AttachmentWork.Wait();
+ throw RemoteStoreError(Result.Reason, Result.ErrorCode, Result.Text);
}
remotestore_impl::ReportMessage(Context.OptionalJobContext,
fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download",
@@ -4191,11 +4081,6 @@ LoadOplog(LoadOplogContext&& Context)
AttachmentsDownloadProgressTimer.GetElapsedTimeMs());
});
- if (LoadResult.ErrorCode)
- {
- throw RemoteStoreError(LoadResult.Reason, LoadResult.ErrorCode, LoadResult.Text);
- }
-
if (DownloadStartMS != (uint64_t)-1)
{
TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
@@ -4382,10 +4267,6 @@ LoadOplog(LoadOplogContext&& Context)
NiceBytes(Chunked.RawSize),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
- catch (const RemoteStoreError&)
- {
- throw;
- }
catch (const std::exception& Ex)
{
throw RemoteStoreError(fmt::format("Failed to dechunk file {}", Chunked.RawHash),
@@ -4694,17 +4575,25 @@ namespace projectstore_testutils {
struct CapturingJobContext : public JobContext
{
bool IsCancelled() const override { return false; }
- void ReportMessage(std::string_view Message) override { Messages.emplace_back(Message); }
+ void ReportMessage(std::string_view Message) override
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ Messages.emplace_back(Message);
+ }
void ReportProgress(std::string_view, std::string_view, ptrdiff_t, ptrdiff_t, uint64_t) override {}
bool HasMessage(std::string_view Substr) const
{
+ RwLock::SharedLockScope _(m_Lock);
return std::any_of(Messages.begin(), Messages.end(), [Substr](const std::string& M) {
return M.find(Substr) != std::string::npos;
});
}
std::vector<std::string> Messages;
+
+ private:
+ mutable RwLock m_Lock;
};
// Create a test IoHash with a unique value based on a small index.