aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-20 13:22:05 +0200
committerGitHub Enterprise <[email protected]>2024-04-20 13:22:05 +0200
commitaa0b0d3cbfc6c4561591df856396703f7177292e (patch)
tree6ff9a4e94559ba62d8ee07076d56dedc7d2e9115 /src
parent5.4.5-pre0 (diff)
downloadzen-aa0b0d3cbfc6c4561591df856396703f7177292e.tar.xz
zen-aa0b0d3cbfc6c4561591df856396703f7177292e.zip
import oplog improvements (#54)
* report down/up transfer speed during progress * add disk buffering in http client * offload block decoding and chunk writing form network worker pool threads add block hash verification for blocks recevied at oplog import * separate download-latch from write-latch to get more accurate download speed * check headers when downloading with http client to go directly to file writing for large payloads * we must clear write callback even if we only provide it as an argument to the Download() call * make timeout optional in AddSponsorProcess * check return codes when creating windows threadpool
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/up_cmd.cpp2
-rw-r--r--src/zencore/workthreadpool.cpp18
-rw-r--r--src/zenhttp/httpclient.cpp228
-rw-r--r--src/zenserver/main.cpp5
-rw-r--r--src/zenserver/projectstore/projectstore.cpp33
-rw-r--r--src/zenserver/projectstore/projectstore.h4
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp285
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h4
-rw-r--r--src/zenutil/include/zenutil/zenserverprocess.h2
-rw-r--r--src/zenutil/zenserverprocess.cpp8
10 files changed, 384 insertions, 205 deletions
diff --git a/src/zen/cmds/up_cmd.cpp b/src/zen/cmds/up_cmd.cpp
index 0db5afb3b..5344a078d 100644
--- a/src/zen/cmds/up_cmd.cpp
+++ b/src/zen/cmds/up_cmd.cpp
@@ -152,7 +152,7 @@ AttachCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return 1;
}
- if (!Entry->AddSponsorProcess(m_OwnerPid))
+ if (!Entry->AddSponsorProcess(m_OwnerPid, 2000))
{
ZEN_WARN("unable to add sponsor process to running zen server instance");
return 1;
diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp
index f41c13bf6..d15fb2e83 100644
--- a/src/zencore/workthreadpool.cpp
+++ b/src/zencore/workthreadpool.cpp
@@ -3,6 +3,7 @@
#include <zencore/workthreadpool.h>
#include <zencore/blockingqueue.h>
+#include <zencore/except.h>
#include <zencore/logging.h>
#include <zencore/string.h>
#include <zencore/testing.h>
@@ -56,18 +57,33 @@ struct WorkerThreadPool::Impl
// Thread pool setup
m_ThreadPool = CreateThreadpool(NULL);
+ if (m_ThreadPool == NULL)
+ {
+ ThrowLastError("CreateThreadpool failed");
+ }
- SetThreadpoolThreadMinimum(m_ThreadPool, InThreadCount);
+ if (!SetThreadpoolThreadMinimum(m_ThreadPool, InThreadCount))
+ {
+ ThrowLastError("SetThreadpoolThreadMinimum failed");
+ }
SetThreadpoolThreadMaximum(m_ThreadPool, InThreadCount * 2);
InitializeThreadpoolEnvironment(&m_CallbackEnvironment);
m_CleanupGroup = CreateThreadpoolCleanupGroup();
+ if (m_CleanupGroup == NULL)
+ {
+ ThrowLastError("CreateThreadpoolCleanupGroup failed");
+ }
SetThreadpoolCallbackPool(&m_CallbackEnvironment, m_ThreadPool);
SetThreadpoolCallbackCleanupGroup(&m_CallbackEnvironment, m_CleanupGroup, NULL);
m_Work = CreateThreadpoolWork(&WorkCallback, this, &m_CallbackEnvironment);
+ if (m_Work == NULL)
+ {
+ ThrowLastError("CreateThreadpoolWork failed");
+ }
}
~Impl()
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index 262785a0a..81c9064f6 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -192,6 +192,7 @@ struct HttpClient::Impl : public RefCounted
cpr::Response Result = CprSession->Download(Write);
ZEN_TRACE("GET {}", Result);
CprSession->SetHeaderCallback({});
+ CprSession->SetWriteCallback({});
return Result;
}
inline cpr::Response Head()
@@ -431,10 +432,76 @@ public:
std::error_code Write(std::string_view DataString)
{
+ const uint8_t* DataPtr = (const uint8_t*)DataString.data();
+ size_t DataSize = DataString.size();
+ if (DataSize >= CacheBufferSize)
+ {
+ std::error_code Ec = Flush();
+ if (Ec)
+ {
+ return Ec;
+ }
+ return AppendData(DataPtr, DataSize);
+ }
+ size_t CopySize = Min(DataSize, CacheBufferSize - m_CacheBufferOffset);
+ memcpy(&m_CacheBuffer[m_CacheBufferOffset], DataPtr, CopySize);
+ m_CacheBufferOffset += CopySize;
+ DataSize -= CopySize;
+ if (m_CacheBufferOffset == CacheBufferSize)
+ {
+ AppendData(m_CacheBuffer, CacheBufferSize);
+ if (DataSize > 0)
+ {
+ ZEN_ASSERT(DataSize < CacheBufferSize);
+ memcpy(m_CacheBuffer, DataPtr + CopySize, DataSize);
+ }
+ m_CacheBufferOffset = DataSize;
+ }
+ else
+ {
+ ZEN_ASSERT(DataSize == 0);
+ }
+ return {};
+ }
+
+ IoBuffer DetachToIoBuffer()
+ {
+ if (std::error_code Ec = Flush(); Ec)
+ {
+ ThrowSystemError(Ec.value(), Ec.message());
+ }
+ ZEN_ASSERT(m_FileHandle != nullptr);
+ void* FileHandle = m_FileHandle;
+ IoBuffer Buffer(IoBuffer::File, FileHandle, 0, m_WriteOffset, /*IsWholeFile*/ true);
+ Buffer.SetDeleteOnClose(true);
+ m_FileHandle = 0;
+ m_WriteOffset = 0;
+ return Buffer;
+ }
+
+ uint64_t GetSize() const { return m_WriteOffset; }
+ void ResetWritePos(uint64_t WriteOffset)
+ {
+ Flush();
+ m_WriteOffset = WriteOffset;
+ }
+
+private:
+ std::error_code Flush()
+ {
+ if (m_CacheBufferOffset == 0)
+ {
+ return {};
+ }
+ std::error_code Res = AppendData(m_CacheBuffer, m_CacheBufferOffset);
+ m_CacheBufferOffset = 0;
+ return Res;
+ }
+
+ std::error_code AppendData(const void* Data, uint64_t Size)
+ {
ZEN_ASSERT(m_FileHandle != nullptr);
const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024;
- const void* Data = DataString.data();
- std::size_t Size = DataString.size();
while (Size)
{
@@ -476,23 +543,11 @@ public:
return {};
}
- IoBuffer DetachToIoBuffer()
- {
- ZEN_ASSERT(m_FileHandle != nullptr);
- void* FileHandle = m_FileHandle;
- IoBuffer Buffer(IoBuffer::File, FileHandle, 0, m_WriteOffset, /*IsWholeFile*/ true);
- Buffer.SetDeleteOnClose(true);
- m_FileHandle = 0;
- m_WriteOffset = 0;
- return Buffer;
- }
-
- uint64_t GetSize() const { return m_WriteOffset; }
- void ResetWritePos(uint64_t WriteOffset) { m_WriteOffset = WriteOffset; }
-
-private:
- void* m_FileHandle;
- std::uint64_t m_WriteOffset;
+ void* m_FileHandle;
+ std::uint64_t m_WriteOffset;
+ static constexpr uint64_t CacheBufferSize = 512u * 1024u;
+ uint8_t m_CacheBuffer[CacheBufferSize];
+ std::uint64_t m_CacheBufferOffset = 0;
};
//////////////////////////////////////////////////////////////////////////
@@ -851,23 +906,28 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
std::unique_ptr<TempPayloadFile> PayloadFile;
cpr::Response Response = DoWithRetry(
[&]() {
- auto DownloadCallback = [&](std::string data, intptr_t) {
- if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024))
+ auto GetHeader = [&](std::string header) -> std::pair<std::string, std::string> {
+ size_t DelimiterPos = header.find(':');
+ if (DelimiterPos != std::string::npos)
{
- PayloadFile = std::make_unique<TempPayloadFile>();
- std::error_code Ec = PayloadFile->Open(TempFolderPath);
- if (Ec)
- {
- ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}",
- TempFolderPath.string(),
- Ec.message());
- return false;
- }
- PayloadFile->Write(PayloadString);
- PayloadString.clear();
+ std::string Key = header.substr(0, DelimiterPos);
+ constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n");
+ Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters);
+ Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters);
+
+ std::string Value = header.substr(DelimiterPos + 1);
+ Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters);
+ Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters);
+
+ return std::make_pair(Key, Value);
}
+ return std::make_pair(header, "");
+ };
+
+ auto DownloadCallback = [&](std::string data, intptr_t) {
if (PayloadFile)
{
+ ZEN_ASSERT(PayloadString.empty());
std::error_code Ec = PayloadFile->Write(data);
if (Ec)
{
@@ -886,9 +946,46 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
cpr::Response Response;
{
+ std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
+ auto HeaderCallback = [&](std::string header, intptr_t) {
+ std::pair<std::string, std::string> Header = GetHeader(header);
+ if (Header.first == "Content-Length"sv)
+ {
+ std::optional<size_t> ContentSize = ParseInt<size_t>(Header.second);
+ if (ContentSize.has_value())
+ {
+ if (ContentSize.value() > 1024 * 1024)
+ {
+ PayloadFile = std::make_unique<TempPayloadFile>();
+ std::error_code Ec = PayloadFile->Open(TempFolderPath);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}",
+ TempFolderPath.string(),
+ Ec.message());
+ PayloadFile.reset();
+ }
+ }
+ else
+ {
+ PayloadString.reserve(ContentSize.value());
+ }
+ }
+ }
+ if (!Header.first.empty())
+ {
+ ReceivedHeaders.emplace_back(std::move(Header));
+ }
+ return 1;
+ };
+
Impl::Session Sess =
m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
- Response = Sess.Download(cpr::WriteCallback{DownloadCallback});
+ Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback});
+ for (const std::pair<std::string, std::string>& H : ReceivedHeaders)
+ {
+ Response.header.insert_or_assign(H.first, H.second);
+ }
}
if (m_ConnectionSettings.AllowResume)
{
@@ -899,7 +996,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
}
if (auto It = Response.header.find("Accept-Ranges"); It != Response.header.end())
{
- return It->second == "bytes";
+ return It->second == "bytes"sv;
}
return false;
};
@@ -923,53 +1020,44 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
auto HeaderCallback = [&](std::string header, intptr_t) {
- size_t DelimiterPos = header.find(':');
- if (DelimiterPos != std::string::npos)
+ std::pair<std::string, std::string> Header = GetHeader(header);
+ if (!Header.first.empty())
{
- std::string Key = header.substr(0, DelimiterPos);
- constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n");
- Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters);
- Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters);
-
- std::string Value = header.substr(DelimiterPos + 1);
- Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters);
- Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters);
-
- ReceivedHeaders.push_back({Key, Value});
+ ReceivedHeaders.emplace_back(std::move(Header));
+ }
- if (Key == "Content-Range"sv)
+ if (Header.first == "Content-Range"sv)
+ {
+ if (Header.second.starts_with("bytes "sv))
{
- if (Value.starts_with("bytes "))
+ size_t RangeStartEnd = Header.second.find('-', 6);
+ if (RangeStartEnd != std::string::npos)
{
- size_t RangeStartEnd = Value.find('-', 6);
- if (RangeStartEnd != std::string::npos)
+ const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6));
+ if (Start)
{
- const auto Start = ParseInt<uint64_t>(Value.substr(6, RangeStartEnd - 6));
- if (Start)
+ uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
+ if (Start.value() == DownloadedSize)
{
- uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
- if (Start.value() == DownloadedSize)
- {
- return 1;
- }
- else if (Start.value() > DownloadedSize)
- {
- return 0;
- }
- if (PayloadFile)
- {
- PayloadFile->ResetWritePos(Start.value());
- }
- else
- {
- PayloadString = PayloadString.substr(0, Start.value());
- }
return 1;
}
+ else if (Start.value() > DownloadedSize)
+ {
+ return 0;
+ }
+ if (PayloadFile)
+ {
+ PayloadFile->ResetWritePos(Start.value());
+ }
+ else
+ {
+ PayloadString = PayloadString.substr(0, Start.value());
+ }
+ return 1;
}
}
- return 0;
}
+ return 0;
}
return 1;
};
diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp
index 8715f5447..b96118484 100644
--- a/src/zenserver/main.cpp
+++ b/src/zenserver/main.cpp
@@ -123,7 +123,7 @@ ZenEntryPoint::Run()
Entry->Pid.load(),
m_ServerOptions.OwnerPid);
- if (Entry->AddSponsorProcess(m_ServerOptions.OwnerPid))
+ if (Entry->AddSponsorProcess(m_ServerOptions.OwnerPid, 2000))
{
std::exit(0);
}
@@ -193,7 +193,8 @@ ZenEntryPoint::Run()
if (m_ServerOptions.OwnerPid)
{
- Entry->AddSponsorProcess(m_ServerOptions.OwnerPid);
+ // We are adding a sponsor process to our own entry, can't wait for pick since the code is not run until later
+ Entry->AddSponsorProcess(m_ServerOptions.OwnerPid, 0);
}
ZenServer Server;
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index e452c658e..3a7922aaf 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -3210,37 +3210,6 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
return ConvertResult(ContainerResult);
}
-std::pair<HttpResponseCode, std::string>
-ProjectStore::WriteBlock(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload)
-{
- ZEN_TRACE_CPU("Store::WriteBlock");
-
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown project '{}'", ProjectId)};
- }
- Project->TouchProject();
-
- ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
- if (!Oplog)
- {
- return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown oplog '{}/{}'", ProjectId, OplogId)};
- }
- Project->TouchOplog(OplogId);
-
- if (!IterateBlock(std::move(Payload), [this](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
- IoBuffer Compressed = Chunk.GetCompressed().Flatten().AsIoBuffer();
- m_CidStore.AddChunk(Compressed, AttachmentRawHash);
- ZEN_DEBUG("Saved attachment {} from block, size {}", AttachmentRawHash, Compressed.GetSize());
- }))
- {
- return {HttpResponseCode::BadRequest, "Invalid chunk in block"};
- }
-
- return {HttpResponseCode::OK, {}};
-}
-
bool
ProjectStore::Rpc(HttpServerRequest& HttpReq,
const std::string_view ProjectId,
@@ -4736,7 +4705,7 @@ TEST_CASE("project.store.block")
}
CompressedBuffer Block = GenerateBlock(std::move(Chunks));
IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer();
- CHECK(IterateBlock(std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {}));
+ CHECK(IterateBlock(Block.DecodeRawHash(), std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {}));
}
#endif
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 75844f84e..269fe7336 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -369,10 +369,6 @@ public:
const HttpServerRequest::QueryParams& Params,
CbObject& OutResponse);
- std::pair<HttpResponseCode, std::string> WriteBlock(const std::string_view ProjectId,
- const std::string_view OplogId,
- IoBuffer&& Payload);
-
bool Rpc(HttpServerRequest& HttpReq,
const std::string_view ProjectId,
const std::string_view OplogId,
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 65ef099e4..d8402366d 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -113,21 +113,19 @@ IsCancelled(JobContext* OptionalContext)
return OptionalContext->IsCancelled();
}
+std::string
+GetStats(const RemoteProjectStore::Stats& Stats, uint64_t ElapsedWallTimeMS)
+{
+ return fmt::format("Sent: {} ({}/s) Recv: {} ({}/s)",
+ NiceBytes(Stats.m_SentBytes),
+ NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000) / ElapsedWallTimeMS) : 0u),
+ NiceBytes(Stats.m_ReceivedBytes),
+ NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000) / ElapsedWallTimeMS) : 0u));
+}
+
void
-ReportRemoteStoreStats(JobContext* OptionalContext,
- const RemoteProjectStore::RemoteStoreInfo& RemoteStoreInfo,
- const RemoteProjectStore::Stats& Stats,
- uint64_t ElapsedWallTimeMS)
+LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats)
{
- ReportMessage(
- OptionalContext,
- fmt::format("Remote store '{}': "
- "Sent: {} ({}/s) Recv: {} ({}/s)",
- RemoteStoreInfo.Description,
- NiceBytes(Stats.m_SentBytes),
- NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000) / ElapsedWallTimeMS) : 0u),
- NiceBytes(Stats.m_ReceivedBytes),
- NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000) / ElapsedWallTimeMS) : 0u)));
ZEN_INFO("Oplog request count: {}. Average request size: {}. Peak request speed: {}",
Stats.m_RequestCount,
NiceLatencyNs(Stats.m_RequestCount > 0 ? (Stats.m_ReceivedBytes + Stats.m_SentBytes) / Stats.m_RequestCount : 0u),
@@ -144,9 +142,19 @@ ReportRemoteStoreStats(JobContext* OptionalContext,
}
bool
-IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
+IterateBlock(const IoHash& BlockHash,
+ IoBuffer&& CompressedBlock,
+ std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
{
- IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer();
+ IoHash RawHash;
+ uint64_t RawSize;
+ IoBuffer BlockPayload =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(CompressedBlock)), RawHash, RawSize).Decompress().AsIoBuffer();
+ if (RawHash != BlockHash)
+ {
+ ZEN_WARN("Header rawhash for downloaded block {} does not match, got {}", BlockHash, RawHash);
+ return false;
+ }
MemoryView BlockView = BlockPayload.GetView();
const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData());
@@ -1711,19 +1719,29 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
}
}
- ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", Remaining), AttachmentsToSave, Remaining);
+ uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs();
+ ReportProgress(
+ OptionalContext,
+ fmt::format("Saving attachments, {} remaining... {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)),
+ AttachmentsToSave,
+ Remaining);
}
+ uint64_t ElapsedTimeMS = Timer.GetElapsedTimeMs();
if (AttachmentsToSave > 0)
{
- ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0);
+ ReportProgress(OptionalContext,
+ fmt::format("Saving attachments, {} remaining. {}", 0, GetStats(RemoteStore.GetStats(), ElapsedTimeMS)),
+ AttachmentsToSave,
+ 0);
}
ReportMessage(OptionalContext,
- fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {}",
+ fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {} {}",
AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(),
BlockAttachmentCountToUpload,
LargeAttachmentCountToUpload,
BulkAttachmentCountToUpload,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs())));
+ NiceTimeSpanMs(ElapsedTimeMS),
+ GetStats(RemoteStore.GetStats(), ElapsedTimeMS)));
}
RemoteProjectStore::Result
@@ -2077,10 +2095,10 @@ SaveOplog(CidStore& ChunkStore,
RemoteProjectStore::Result Result = RemoteResult.ConvertResult();
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- ReportRemoteStoreStats(OptionalContext, RemoteStoreInfo, RemoteStore.GetStats(), TransferWallTimeMS);
+ LogRemoteStoreStatsDetails(RemoteStore.GetStats());
ReportMessage(OptionalContext,
- fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({})",
+ fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}",
RemoteStoreInfo.ContainerName,
RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
@@ -2088,7 +2106,8 @@ SaveOplog(CidStore& ChunkStore,
Info.AttachmentBlocksUploaded.load(),
NiceBytes(Info.AttachmentBlockBytesUploaded.load()),
Info.AttachmentsUploaded.load(),
- NiceBytes(Info.AttachmentBytesUploaded.load())));
+ NiceBytes(Info.AttachmentBytesUploaded.load()),
+ GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
return Result;
};
@@ -2360,7 +2379,8 @@ LoadOplog(CidStore& ChunkStore,
Info.OplogSizeBytes = LoadContainerResult.ContainerObject.GetSize();
AsyncRemoteResult RemoteResult;
- Latch AttachmentsWorkLatch(1);
+ Latch AttachmentsDownloadLatch(1);
+ Latch AttachmentsWriteLatch(1);
std::atomic_size_t AttachmentCount = 0;
Stopwatch LoadAttachmentsTimer;
@@ -2381,7 +2401,9 @@ LoadOplog(CidStore& ChunkStore,
&Oplog,
&ChunkStore,
&NetworkWorkerPool,
- &AttachmentsWorkLatch,
+ &WorkerPool,
+ &AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
&AttachmentCount,
&RemoteResult,
&BlockCountToDownload,
@@ -2400,11 +2422,13 @@ LoadOplog(CidStore& ChunkStore,
BlockCountToDownload++;
if (BlockHash == IoHash::Zero)
{
- AttachmentsWorkLatch.AddCount(1);
+ AttachmentsDownloadLatch.AddCount(1);
AttachmentCount.fetch_add(1);
NetworkWorkerPool.ScheduleWork([&RemoteStore,
&ChunkStore,
- &AttachmentsWorkLatch,
+ &WorkerPool,
+ &AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
&RemoteResult,
Chunks = std::move(Chunks),
&Info,
@@ -2412,7 +2436,7 @@ LoadOplog(CidStore& ChunkStore,
&DownloadStartMS,
IgnoreMissingAttachments,
OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
return;
@@ -2442,26 +2466,37 @@ LoadOplog(CidStore& ChunkStore,
{
return;
}
- for (const auto& It : Result.Chunks)
- {
- uint64_t ChunkSize = It.second.GetCompressedSize();
- Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
- CidStore::InsertResult InsertResult =
- ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly);
- if (InsertResult.New)
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
{
- Info.AttachmentBytesStored.fetch_add(ChunkSize);
- Info.AttachmentsStored.fetch_add(1);
+ return;
}
- }
+ for (const auto& It : Chunks)
+ {
+ uint64_t ChunkSize = It.second.GetCompressedSize();
+ Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ CidStore::InsertResult InsertResult = ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(),
+ It.first,
+ CidStore::InsertMode::kCopyOnly);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(ChunkSize);
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
+ });
});
return;
}
- AttachmentsWorkLatch.AddCount(1);
+ AttachmentsDownloadLatch.AddCount(1);
AttachmentCount.fetch_add(1);
- NetworkWorkerPool.ScheduleWork([&AttachmentsWorkLatch,
+ NetworkWorkerPool.ScheduleWork([&AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
&ChunkStore,
&RemoteStore,
+ &WorkerPool,
BlockHash,
&RemoteResult,
Chunks = std::move(Chunks),
@@ -2470,7 +2505,7 @@ LoadOplog(CidStore& ChunkStore,
&DownloadStartMS,
IgnoreMissingAttachments,
OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
return;
@@ -2503,38 +2538,56 @@ LoadOplog(CidStore& ChunkStore,
NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
NiceBytes(BlockSize));
Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
- std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
- WantedChunks.reserve(Chunks.size());
- WantedChunks.insert(Chunks.begin(), Chunks.end());
- bool StoreChunksOK =
- IterateBlock(std::move(BlockResult.Bytes),
- [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
- if (WantedChunks.contains(AttachmentRawHash))
- {
- uint64_t ChunkSize = Chunk.GetCompressedSize();
- CidStore::InsertResult InsertResult =
- ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
- if (InsertResult.New)
+
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&AttachmentsWriteLatch,
+ &RemoteResult,
+ &Info,
+ &ChunkStore,
+ BlockHash,
+ Chunks = std::move(Chunks),
+ Bytes = std::move(BlockResult.Bytes),
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ ZEN_ASSERT(Bytes.Size() > 0);
+ std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
+ WantedChunks.reserve(Chunks.size());
+ WantedChunks.insert(Chunks.begin(), Chunks.end());
+ bool StoreChunksOK =
+ IterateBlock(BlockHash,
+ IoBuffer(Bytes),
+ [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
+ if (WantedChunks.contains(AttachmentRawHash))
{
- Info.AttachmentBytesStored.fetch_add(ChunkSize);
- Info.AttachmentsStored.fetch_add(1);
+ uint64_t ChunkSize = Chunk.GetCompressedSize();
+ CidStore::InsertResult InsertResult =
+ ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(ChunkSize);
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ WantedChunks.erase(AttachmentRawHash);
}
- WantedChunks.erase(AttachmentRawHash);
- }
- });
- if (!StoreChunksOK)
- {
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has invalid format ({}): {}",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Invalid format for block {}", BlockHash),
- {});
- return;
- }
- ZEN_ASSERT(WantedChunks.empty());
+ });
+ if (!StoreChunksOK)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} has invalid format ({}): {}",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Invalid format for block {}", BlockHash),
+ {});
+ return;
+ }
+ ZEN_ASSERT(WantedChunks.empty());
+ });
});
};
@@ -2542,7 +2595,9 @@ LoadOplog(CidStore& ChunkStore,
&Oplog,
&ChunkStore,
&NetworkWorkerPool,
- &AttachmentsWorkLatch,
+ &WorkerPool,
+ &AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
&RemoteResult,
&Attachments,
&AttachmentCount,
@@ -2562,19 +2617,21 @@ LoadOplog(CidStore& ChunkStore,
Oplog.CaptureAddedAttachments(std::vector<IoHash>{RawHash});
- AttachmentsWorkLatch.AddCount(1);
+ AttachmentsDownloadLatch.AddCount(1);
AttachmentCount.fetch_add(1);
NetworkWorkerPool.ScheduleWork([&RemoteStore,
&ChunkStore,
+ &WorkerPool,
&RemoteResult,
- &AttachmentsWorkLatch,
+ &AttachmentsDownloadLatch,
+ &AttachmentsWriteLatch,
RawHash,
&LoadAttachmentsTimer,
&DownloadStartMS,
&Info,
IgnoreMissingAttachments,
OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
return;
@@ -2607,12 +2664,28 @@ LoadOplog(CidStore& ChunkStore,
return;
}
Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
- CidStore::InsertResult InsertResult = ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
- if (InsertResult.New)
- {
- Info.AttachmentBytesStored.fetch_add(AttachmentSize);
- Info.AttachmentsStored.fetch_add(1);
- }
+
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&AttachmentsWriteLatch,
+ &RemoteResult,
+ &Info,
+ &ChunkStore,
+ RawHash,
+ AttachmentSize,
+ Bytes = std::move(AttachmentResult.Bytes),
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(AttachmentSize);
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ });
});
};
@@ -2646,10 +2719,10 @@ LoadOplog(CidStore& ChunkStore,
BlockCountToDownload,
FilesToDechunk.size()));
- AttachmentsWorkLatch.CountDown();
- while (!AttachmentsWorkLatch.Wait(1000))
+ AttachmentsDownloadLatch.CountDown();
+ while (!AttachmentsDownloadLatch.Wait(1000))
{
- ptrdiff_t Remaining = AttachmentsWorkLatch.Remaining();
+ ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining();
if (IsCancelled(OptionalContext))
{
if (!RemoteResult.IsError())
@@ -2657,16 +2730,47 @@ LoadOplog(CidStore& ChunkStore,
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
}
}
- ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", Remaining), AttachmentCount.load(), Remaining);
+ uint64_t PartialTransferWallTimeMS = TransferWallTimeMS;
+ if (DownloadStartMS != (uint64_t)-1)
+ {
+ PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
+ }
+ ReportProgress(
+ OptionalContext,
+ fmt::format("Loading attachments, {} remaining. {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)),
+ AttachmentCount.load(),
+ Remaining);
+ }
+ if (DownloadStartMS != (uint64_t)-1)
+ {
+ TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
}
+
if (AttachmentCount.load() > 0)
{
- ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", 0), AttachmentCount.load(), 0);
+ ReportProgress(OptionalContext,
+ fmt::format("Loading attachments, {} remaining. {}", 0, GetStats(RemoteStore.GetStats(), TransferWallTimeMS)),
+ AttachmentCount.load(),
+ 0);
}
- if (DownloadStartMS != (uint64_t)-1)
+ AttachmentsWriteLatch.CountDown();
+ while (!AttachmentsWriteLatch.Wait(1000))
{
- TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
+ ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining();
+ if (IsCancelled(OptionalContext))
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
+ }
+ ReportProgress(OptionalContext, fmt::format("Writing attachments, {} remaining.", Remaining), AttachmentCount.load(), Remaining);
+ }
+
+ if (AttachmentCount.load() > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Writing attachments, {} remaining.", 0), AttachmentCount.load(), 0);
}
if (Result.ErrorCode == 0)
@@ -2811,10 +2915,10 @@ LoadOplog(CidStore& ChunkStore,
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- ReportRemoteStoreStats(OptionalContext, RemoteStoreInfo, RemoteStore.GetStats(), TransferWallTimeMS);
+ LogRemoteStoreStatsDetails(RemoteStore.GetStats());
ReportMessage(OptionalContext,
- fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {}",
+ fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {} {}",
RemoteStoreInfo.ContainerName,
Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE",
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
@@ -2825,7 +2929,8 @@ LoadOplog(CidStore& ChunkStore,
NiceBytes(Info.AttachmentBytesDownloaded.load()),
Info.AttachmentsStored.load(),
NiceBytes(Info.AttachmentBytesStored.load()),
- Info.MissingAttachmentCount.load()));
+ Info.MissingAttachmentCount.load(),
+ GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
return Result;
}
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index b00aa231f..d4ccd8c7b 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -162,6 +162,8 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
JobContext* OptionalContext);
CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks);
-bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
+bool IterateBlock(const IoHash& BlockHash,
+ IoBuffer&& CompressedBlock,
+ std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
} // namespace zen
diff --git a/src/zenutil/include/zenutil/zenserverprocess.h b/src/zenutil/include/zenutil/zenserverprocess.h
index 1bd00acb7..f7204fb43 100644
--- a/src/zenutil/include/zenutil/zenserverprocess.h
+++ b/src/zenutil/include/zenutil/zenserverprocess.h
@@ -168,7 +168,7 @@ public:
bool IsShutdownRequested() const;
void SignalReady();
bool IsReady() const;
- bool AddSponsorProcess(uint32_t Pid);
+ bool AddSponsorProcess(uint32_t Pid, uint64_t Timeout = 0);
};
static_assert(sizeof(ZenServerEntry) == 64);
diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp
index f5bc088a5..34eec9790 100644
--- a/src/zenutil/zenserverprocess.cpp
+++ b/src/zenutil/zenserverprocess.cpp
@@ -403,7 +403,7 @@ ZenServerState::ZenServerEntry::IsReady() const
}
bool
-ZenServerState::ZenServerEntry::AddSponsorProcess(uint32_t PidToAdd)
+ZenServerState::ZenServerEntry::AddSponsorProcess(uint32_t PidToAdd, uint64_t Timeout)
{
uint32_t ServerPid = Pid.load();
auto WaitForPickup = [&](uint32_t AddedSlotIndex) {
@@ -427,13 +427,13 @@ ZenServerState::ZenServerEntry::AddSponsorProcess(uint32_t PidToAdd)
{
if (SponsorPids[SponsorIndex].load(std::memory_order_relaxed) == PidToAdd)
{
- return WaitForPickup(SponsorIndex);
+ return Timeout == 0 ? true : WaitForPickup(SponsorIndex);
}
uint32_t Expected = 0;
if (SponsorPids[SponsorIndex].compare_exchange_strong(Expected, PidToAdd))
{
// Success!
- return WaitForPickup(SponsorIndex);
+ return Timeout == 0 ? true : WaitForPickup(SponsorIndex);
}
}
@@ -865,6 +865,8 @@ ZenServerInstance::OnServerReady()
m_BasePort = Entry->EffectiveListenPort;
CreateShutdownEvent(m_BasePort);
+
+ ZEN_DEBUG("Server '{}' is ready on port {}", m_Name, m_BasePort);
}
std::string