aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-20 13:22:05 +0200
committerGitHub Enterprise <[email protected]>2024-04-20 13:22:05 +0200
commitaa0b0d3cbfc6c4561591df856396703f7177292e (patch)
tree6ff9a4e94559ba62d8ee07076d56dedc7d2e9115 /src/zenserver
parent5.4.5-pre0 (diff)
downloadzen-aa0b0d3cbfc6c4561591df856396703f7177292e.tar.xz
zen-aa0b0d3cbfc6c4561591df856396703f7177292e.zip
import oplog improvements (#54)
* report down/up transfer speed during progress * add disk buffering in http client * offload block decoding and chunk writing form network worker pool threads add block hash verification for blocks recevied at oplog import * separate download-latch from write-latch to get more accurate download speed * check headers when downloading with http client to go directly to file writing for large payloads * we must clear write callback even if we only provide it as an argument to the Download() call * make timeout optional in AddSponsorProcess * check return codes when creating windows threadpool
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/main.cpp5
-rw-r--r--src/zenserver/projectstore/projectstore.cpp33
-rw-r--r--src/zenserver/projectstore/projectstore.h4
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp285
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h4
5 files changed, 202 insertions, 129 deletions
diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp
index 8715f5447..b96118484 100644
--- a/src/zenserver/main.cpp
+++ b/src/zenserver/main.cpp
@@ -123,7 +123,7 @@ ZenEntryPoint::Run()
Entry->Pid.load(),
m_ServerOptions.OwnerPid);
- if (Entry->AddSponsorProcess(m_ServerOptions.OwnerPid))
+ if (Entry->AddSponsorProcess(m_ServerOptions.OwnerPid, 2000))
{
std::exit(0);
}
@@ -193,7 +193,8 @@ ZenEntryPoint::Run()
if (m_ServerOptions.OwnerPid)
{
- Entry->AddSponsorProcess(m_ServerOptions.OwnerPid);
+ // We are adding a sponsor process to our own entry, can't wait for pick since the code is not run until later
+ Entry->AddSponsorProcess(m_ServerOptions.OwnerPid, 0);
}
ZenServer Server;
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index e452c658e..3a7922aaf 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -3210,37 +3210,6 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
return ConvertResult(ContainerResult);
}
-std::pair<HttpResponseCode, std::string>
-ProjectStore::WriteBlock(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload)
-{
- ZEN_TRACE_CPU("Store::WriteBlock");
-
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown project '{}'", ProjectId)};
- }
- Project->TouchProject();
-
- ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
- if (!Oplog)
- {
- return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown oplog '{}/{}'", ProjectId, OplogId)};
- }
- Project->TouchOplog(OplogId);
-
- if (!IterateBlock(std::move(Payload), [this](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
- IoBuffer Compressed = Chunk.GetCompressed().Flatten().AsIoBuffer();
- m_CidStore.AddChunk(Compressed, AttachmentRawHash);
- ZEN_DEBUG("Saved attachment {} from block, size {}", AttachmentRawHash, Compressed.GetSize());
- }))
- {
- return {HttpResponseCode::BadRequest, "Invalid chunk in block"};
- }
-
- return {HttpResponseCode::OK, {}};
-}
-
bool
ProjectStore::Rpc(HttpServerRequest& HttpReq,
const std::string_view ProjectId,
@@ -4736,7 +4705,7 @@ TEST_CASE("project.store.block")
}
CompressedBuffer Block = GenerateBlock(std::move(Chunks));
IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer();
- CHECK(IterateBlock(std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {}));
+ CHECK(IterateBlock(Block.DecodeRawHash(), std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {}));
}
#endif
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 75844f84e..269fe7336 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -369,10 +369,6 @@ public:
const HttpServerRequest::QueryParams& Params,
CbObject& OutResponse);
- std::pair<HttpResponseCode, std::string> WriteBlock(const std::string_view ProjectId,
- const std::string_view OplogId,
- IoBuffer&& Payload);
-
bool Rpc(HttpServerRequest& HttpReq,
const std::string_view ProjectId,
const std::string_view OplogId,
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 65ef099e4..d8402366d 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -113,21 +113,19 @@ IsCancelled(JobContext* OptionalContext)
return OptionalContext->IsCancelled();
}
+std::string
+GetStats(const RemoteProjectStore::Stats& Stats, uint64_t ElapsedWallTimeMS)
+{
+ return fmt::format("Sent: {} ({}/s) Recv: {} ({}/s)",
+ NiceBytes(Stats.m_SentBytes),
+ NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000) / ElapsedWallTimeMS) : 0u),
+ NiceBytes(Stats.m_ReceivedBytes),
+ NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000) / ElapsedWallTimeMS) : 0u));
+}
+
void
-ReportRemoteStoreStats(JobContext* OptionalContext,
- const RemoteProjectStore::RemoteStoreInfo& RemoteStoreInfo,
- const RemoteProjectStore::Stats& Stats,
- uint64_t ElapsedWallTimeMS)
+LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats)
{
- ReportMessage(
- OptionalContext,
- fmt::format("Remote store '{}': "
- "Sent: {} ({}/s) Recv: {} ({}/s)",
- RemoteStoreInfo.Description,
- NiceBytes(Stats.m_SentBytes),
- NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000) / ElapsedWallTimeMS) : 0u),
- NiceBytes(Stats.m_ReceivedBytes),
- NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000) / ElapsedWallTimeMS) : 0u)));
ZEN_INFO("Oplog request count: {}. Average request size: {}. Peak request speed: {}",
Stats.m_RequestCount,
NiceLatencyNs(Stats.m_RequestCount > 0 ? (Stats.m_ReceivedBytes + Stats.m_SentBytes) / Stats.m_RequestCount : 0u),
@@ -144,9 +142,19 @@ ReportRemoteStoreStats(JobContext* OptionalContext,
}
bool
-IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
+IterateBlock(const IoHash& BlockHash,
+ IoBuffer&& CompressedBlock,
+ std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
{
- IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer();
+ IoHash RawHash;
+ uint64_t RawSize;
+ IoBuffer BlockPayload =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(CompressedBlock)), RawHash, RawSize).Decompress().AsIoBuffer();
+ if (RawHash != BlockHash)
+ {
+ ZEN_WARN("Header rawhash for downloaded block {} does not match, got {}", BlockHash, RawHash);
+ return false;
+ }
MemoryView BlockView = BlockPayload.GetView();
const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData());
@@ -1711,19 +1719,29 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
}
}
- ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", Remaining), AttachmentsToSave, Remaining);
+ uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs();
+ ReportProgress(
+ OptionalContext,
+ fmt::format("Saving attachments, {} remaining... {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)),
+ AttachmentsToSave,
+ Remaining);
}
+ uint64_t ElapsedTimeMS = Timer.GetElapsedTimeMs();
if (AttachmentsToSave > 0)
{
- ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0);
+ ReportProgress(OptionalContext,
+ fmt::format("Saving attachments, {} remaining. {}", 0, GetStats(RemoteStore.GetStats(), ElapsedTimeMS)),
+ AttachmentsToSave,
+ 0);
}
ReportMessage(OptionalContext,
- fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {}",
+ fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {} {}",
AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(),
BlockAttachmentCountToUpload,
LargeAttachmentCountToUpload,
BulkAttachmentCountToUpload,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs())));
+ NiceTimeSpanMs(ElapsedTimeMS),
+ GetStats(RemoteStore.GetStats(), ElapsedTimeMS)));
}
RemoteProjectStore::Result
@@ -2077,10 +2095,10 @@ SaveOplog(CidStore& ChunkStore,
RemoteProjectStore::Result Result = RemoteResult.ConvertResult();
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- ReportRemoteStoreStats(OptionalContext, RemoteStoreInfo, RemoteStore.GetStats(), TransferWallTimeMS);
+ LogRemoteStoreStatsDetails(RemoteStore.GetStats());
ReportMessage(OptionalContext,
- fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({})",
+ fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}",
RemoteStoreInfo.ContainerName,
RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
@@ -2088,7 +2106,8 @@ SaveOplog(CidStore& ChunkStore,
Info.AttachmentBlocksUploaded.load(),
NiceBytes(Info.AttachmentBlockBytesUploaded.load()),
Info.AttachmentsUploaded.load(),
- NiceBytes(Info.AttachmentBytesUploaded.load())));
+ NiceBytes(Info.AttachmentBytesUploaded.load()),
+ GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
return Result;
};
@@ -2360,7 +2379,8 @@ LoadOplog(CidStore& ChunkStore,
Info.OplogSizeBytes = LoadContainerResult.ContainerObject.GetSize();
AsyncRemoteResult RemoteResult;
- Latch AttachmentsWorkLatch(1);
+ Latch AttachmentsDownloadLatch(1);
+ Latch AttachmentsWriteLatch(1);
std::atomic_size_t AttachmentCount = 0;
Stopwatch LoadAttachmentsTimer;
@@ -2381,7 +2401,9 @@ LoadOplog(CidStore& ChunkStore,
&Oplog,
&ChunkStore,
&NetworkWorkerPool,
- &AttachmentsWorkLatch,
+ &WorkerPool,
+ &AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
&AttachmentCount,
&RemoteResult,
&BlockCountToDownload,
@@ -2400,11 +2422,13 @@ LoadOplog(CidStore& ChunkStore,
BlockCountToDownload++;
if (BlockHash == IoHash::Zero)
{
- AttachmentsWorkLatch.AddCount(1);
+ AttachmentsDownloadLatch.AddCount(1);
AttachmentCount.fetch_add(1);
NetworkWorkerPool.ScheduleWork([&RemoteStore,
&ChunkStore,
- &AttachmentsWorkLatch,
+ &WorkerPool,
+ &AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
&RemoteResult,
Chunks = std::move(Chunks),
&Info,
@@ -2412,7 +2436,7 @@ LoadOplog(CidStore& ChunkStore,
&DownloadStartMS,
IgnoreMissingAttachments,
OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
return;
@@ -2442,26 +2466,37 @@ LoadOplog(CidStore& ChunkStore,
{
return;
}
- for (const auto& It : Result.Chunks)
- {
- uint64_t ChunkSize = It.second.GetCompressedSize();
- Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
- CidStore::InsertResult InsertResult =
- ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly);
- if (InsertResult.New)
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
{
- Info.AttachmentBytesStored.fetch_add(ChunkSize);
- Info.AttachmentsStored.fetch_add(1);
+ return;
}
- }
+ for (const auto& It : Chunks)
+ {
+ uint64_t ChunkSize = It.second.GetCompressedSize();
+ Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ CidStore::InsertResult InsertResult = ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(),
+ It.first,
+ CidStore::InsertMode::kCopyOnly);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(ChunkSize);
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
+ });
});
return;
}
- AttachmentsWorkLatch.AddCount(1);
+ AttachmentsDownloadLatch.AddCount(1);
AttachmentCount.fetch_add(1);
- NetworkWorkerPool.ScheduleWork([&AttachmentsWorkLatch,
+ NetworkWorkerPool.ScheduleWork([&AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
&ChunkStore,
&RemoteStore,
+ &WorkerPool,
BlockHash,
&RemoteResult,
Chunks = std::move(Chunks),
@@ -2470,7 +2505,7 @@ LoadOplog(CidStore& ChunkStore,
&DownloadStartMS,
IgnoreMissingAttachments,
OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
return;
@@ -2503,38 +2538,56 @@ LoadOplog(CidStore& ChunkStore,
NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
NiceBytes(BlockSize));
Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
- std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
- WantedChunks.reserve(Chunks.size());
- WantedChunks.insert(Chunks.begin(), Chunks.end());
- bool StoreChunksOK =
- IterateBlock(std::move(BlockResult.Bytes),
- [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
- if (WantedChunks.contains(AttachmentRawHash))
- {
- uint64_t ChunkSize = Chunk.GetCompressedSize();
- CidStore::InsertResult InsertResult =
- ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
- if (InsertResult.New)
+
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&AttachmentsWriteLatch,
+ &RemoteResult,
+ &Info,
+ &ChunkStore,
+ BlockHash,
+ Chunks = std::move(Chunks),
+ Bytes = std::move(BlockResult.Bytes),
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ ZEN_ASSERT(Bytes.Size() > 0);
+ std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
+ WantedChunks.reserve(Chunks.size());
+ WantedChunks.insert(Chunks.begin(), Chunks.end());
+ bool StoreChunksOK =
+ IterateBlock(BlockHash,
+ IoBuffer(Bytes),
+ [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
+ if (WantedChunks.contains(AttachmentRawHash))
{
- Info.AttachmentBytesStored.fetch_add(ChunkSize);
- Info.AttachmentsStored.fetch_add(1);
+ uint64_t ChunkSize = Chunk.GetCompressedSize();
+ CidStore::InsertResult InsertResult =
+ ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(ChunkSize);
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ WantedChunks.erase(AttachmentRawHash);
}
- WantedChunks.erase(AttachmentRawHash);
- }
- });
- if (!StoreChunksOK)
- {
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has invalid format ({}): {}",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Invalid format for block {}", BlockHash),
- {});
- return;
- }
- ZEN_ASSERT(WantedChunks.empty());
+ });
+ if (!StoreChunksOK)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} has invalid format ({}): {}",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Invalid format for block {}", BlockHash),
+ {});
+ return;
+ }
+ ZEN_ASSERT(WantedChunks.empty());
+ });
});
};
@@ -2542,7 +2595,9 @@ LoadOplog(CidStore& ChunkStore,
&Oplog,
&ChunkStore,
&NetworkWorkerPool,
- &AttachmentsWorkLatch,
+ &WorkerPool,
+ &AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
&RemoteResult,
&Attachments,
&AttachmentCount,
@@ -2562,19 +2617,21 @@ LoadOplog(CidStore& ChunkStore,
Oplog.CaptureAddedAttachments(std::vector<IoHash>{RawHash});
- AttachmentsWorkLatch.AddCount(1);
+ AttachmentsDownloadLatch.AddCount(1);
AttachmentCount.fetch_add(1);
NetworkWorkerPool.ScheduleWork([&RemoteStore,
&ChunkStore,
+ &WorkerPool,
&RemoteResult,
- &AttachmentsWorkLatch,
+ &AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
RawHash,
&LoadAttachmentsTimer,
&DownloadStartMS,
&Info,
IgnoreMissingAttachments,
OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
return;
@@ -2607,12 +2664,28 @@ LoadOplog(CidStore& ChunkStore,
return;
}
Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
- CidStore::InsertResult InsertResult = ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
- if (InsertResult.New)
- {
- Info.AttachmentBytesStored.fetch_add(AttachmentSize);
- Info.AttachmentsStored.fetch_add(1);
- }
+
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&AttachmentsWriteLatch,
+ &RemoteResult,
+ &Info,
+ &ChunkStore,
+ RawHash,
+ AttachmentSize,
+ Bytes = std::move(AttachmentResult.Bytes),
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(AttachmentSize);
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ });
});
};
@@ -2646,10 +2719,10 @@ LoadOplog(CidStore& ChunkStore,
BlockCountToDownload,
FilesToDechunk.size()));
- AttachmentsWorkLatch.CountDown();
- while (!AttachmentsWorkLatch.Wait(1000))
+ AttachmentsDownloadLatch.CountDown();
+ while (!AttachmentsDownloadLatch.Wait(1000))
{
- ptrdiff_t Remaining = AttachmentsWorkLatch.Remaining();
+ ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining();
if (IsCancelled(OptionalContext))
{
if (!RemoteResult.IsError())
@@ -2657,16 +2730,47 @@ LoadOplog(CidStore& ChunkStore,
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
}
}
- ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", Remaining), AttachmentCount.load(), Remaining);
+ uint64_t PartialTransferWallTimeMS = TransferWallTimeMS;
+ if (DownloadStartMS != (uint64_t)-1)
+ {
+ PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
+ }
+ ReportProgress(
+ OptionalContext,
+ fmt::format("Loading attachments, {} remaining. {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)),
+ AttachmentCount.load(),
+ Remaining);
+ }
+ if (DownloadStartMS != (uint64_t)-1)
+ {
+ TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
}
+
if (AttachmentCount.load() > 0)
{
- ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", 0), AttachmentCount.load(), 0);
+ ReportProgress(OptionalContext,
+ fmt::format("Loading attachments, {} remaining. {}", 0, GetStats(RemoteStore.GetStats(), TransferWallTimeMS)),
+ AttachmentCount.load(),
+ 0);
}
- if (DownloadStartMS != (uint64_t)-1)
+ AttachmentsWriteLatch.CountDown();
+ while (!AttachmentsWriteLatch.Wait(1000))
{
- TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
+ ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining();
+ if (IsCancelled(OptionalContext))
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
+ }
+ ReportProgress(OptionalContext, fmt::format("Writing attachments, {} remaining.", Remaining), AttachmentCount.load(), Remaining);
+ }
+
+ if (AttachmentCount.load() > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Writing attachments, {} remaining.", 0), AttachmentCount.load(), 0);
}
if (Result.ErrorCode == 0)
@@ -2811,10 +2915,10 @@ LoadOplog(CidStore& ChunkStore,
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- ReportRemoteStoreStats(OptionalContext, RemoteStoreInfo, RemoteStore.GetStats(), TransferWallTimeMS);
+ LogRemoteStoreStatsDetails(RemoteStore.GetStats());
ReportMessage(OptionalContext,
- fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {}",
+ fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {} {}",
RemoteStoreInfo.ContainerName,
Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE",
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
@@ -2825,7 +2929,8 @@ LoadOplog(CidStore& ChunkStore,
NiceBytes(Info.AttachmentBytesDownloaded.load()),
Info.AttachmentsStored.load(),
NiceBytes(Info.AttachmentBytesStored.load()),
- Info.MissingAttachmentCount.load()));
+ Info.MissingAttachmentCount.load(),
+ GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
return Result;
}
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index b00aa231f..d4ccd8c7b 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -162,6 +162,8 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
JobContext* OptionalContext);
CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks);
-bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
+bool IterateBlock(const IoHash& BlockHash,
+ IoBuffer&& CompressedBlock,
+ std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
} // namespace zen