diff options
| author | Dan Engelbrecht <[email protected]> | 2024-04-20 13:22:05 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-04-20 13:22:05 +0200 |
| commit | aa0b0d3cbfc6c4561591df856396703f7177292e (patch) | |
| tree | 6ff9a4e94559ba62d8ee07076d56dedc7d2e9115 /src | |
| parent | 5.4.5-pre0 (diff) | |
| download | zen-aa0b0d3cbfc6c4561591df856396703f7177292e.tar.xz zen-aa0b0d3cbfc6c4561591df856396703f7177292e.zip | |
import oplog improvements (#54)
* report down/up transfer speed during progress
* add disk buffering in http client
* offload block decoding and chunk writing form network worker pool threads
add block hash verification for blocks recevied at oplog import
* separate download-latch from write-latch to get more accurate download speed
* check headers when downloading with http client to go directly to file writing for large payloads
* we must clear write callback even if we only provide it as an argument to the Download() call
* make timeout optional in AddSponsorProcess
* check return codes when creating windows threadpool
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/up_cmd.cpp | 2 | ||||
| -rw-r--r-- | src/zencore/workthreadpool.cpp | 18 | ||||
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 228 | ||||
| -rw-r--r-- | src/zenserver/main.cpp | 5 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 33 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 4 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 285 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 4 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/zenserverprocess.h | 2 | ||||
| -rw-r--r-- | src/zenutil/zenserverprocess.cpp | 8 |
10 files changed, 384 insertions, 205 deletions
diff --git a/src/zen/cmds/up_cmd.cpp b/src/zen/cmds/up_cmd.cpp index 0db5afb3b..5344a078d 100644 --- a/src/zen/cmds/up_cmd.cpp +++ b/src/zen/cmds/up_cmd.cpp @@ -152,7 +152,7 @@ AttachCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return 1; } - if (!Entry->AddSponsorProcess(m_OwnerPid)) + if (!Entry->AddSponsorProcess(m_OwnerPid, 2000)) { ZEN_WARN("unable to add sponsor process to running zen server instance"); return 1; diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index f41c13bf6..d15fb2e83 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -3,6 +3,7 @@ #include <zencore/workthreadpool.h> #include <zencore/blockingqueue.h> +#include <zencore/except.h> #include <zencore/logging.h> #include <zencore/string.h> #include <zencore/testing.h> @@ -56,18 +57,33 @@ struct WorkerThreadPool::Impl // Thread pool setup m_ThreadPool = CreateThreadpool(NULL); + if (m_ThreadPool == NULL) + { + ThrowLastError("CreateThreadpool failed"); + } - SetThreadpoolThreadMinimum(m_ThreadPool, InThreadCount); + if (!SetThreadpoolThreadMinimum(m_ThreadPool, InThreadCount)) + { + ThrowLastError("SetThreadpoolThreadMinimum failed"); + } SetThreadpoolThreadMaximum(m_ThreadPool, InThreadCount * 2); InitializeThreadpoolEnvironment(&m_CallbackEnvironment); m_CleanupGroup = CreateThreadpoolCleanupGroup(); + if (m_CleanupGroup == NULL) + { + ThrowLastError("CreateThreadpoolCleanupGroup failed"); + } SetThreadpoolCallbackPool(&m_CallbackEnvironment, m_ThreadPool); SetThreadpoolCallbackCleanupGroup(&m_CallbackEnvironment, m_CleanupGroup, NULL); m_Work = CreateThreadpoolWork(&WorkCallback, this, &m_CallbackEnvironment); + if (m_Work == NULL) + { + ThrowLastError("CreateThreadpoolWork failed"); + } } ~Impl() diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index 262785a0a..81c9064f6 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -192,6 +192,7 @@ struct HttpClient::Impl : public RefCounted cpr::Response Result = CprSession->Download(Write); ZEN_TRACE("GET {}", Result); CprSession->SetHeaderCallback({}); + CprSession->SetWriteCallback({}); return Result; } inline cpr::Response Head() @@ -431,10 +432,76 @@ public: std::error_code Write(std::string_view DataString) { + const uint8_t* DataPtr = (const uint8_t*)DataString.data(); + size_t DataSize = DataString.size(); + if (DataSize >= CacheBufferSize) + { + std::error_code Ec = Flush(); + if (Ec) + { + return Ec; + } + return AppendData(DataPtr, DataSize); + } + size_t CopySize = Min(DataSize, CacheBufferSize - m_CacheBufferOffset); + memcpy(&m_CacheBuffer[m_CacheBufferOffset], DataPtr, CopySize); + m_CacheBufferOffset += CopySize; + DataSize -= CopySize; + if (m_CacheBufferOffset == CacheBufferSize) + { + AppendData(m_CacheBuffer, CacheBufferSize); + if (DataSize > 0) + { + ZEN_ASSERT(DataSize < CacheBufferSize); + memcpy(m_CacheBuffer, DataPtr + CopySize, DataSize); + } + m_CacheBufferOffset = DataSize; + } + else + { + ZEN_ASSERT(DataSize == 0); + } + return {}; + } + + IoBuffer DetachToIoBuffer() + { + if (std::error_code Ec = Flush(); Ec) + { + ThrowSystemError(Ec.value(), Ec.message()); + } + ZEN_ASSERT(m_FileHandle != nullptr); + void* FileHandle = m_FileHandle; + IoBuffer Buffer(IoBuffer::File, FileHandle, 0, m_WriteOffset, /*IsWholeFile*/ true); + Buffer.SetDeleteOnClose(true); + m_FileHandle = 0; + m_WriteOffset = 0; + return Buffer; + } + + uint64_t GetSize() const { return m_WriteOffset; } + void ResetWritePos(uint64_t WriteOffset) + { + Flush(); + m_WriteOffset = WriteOffset; + } + +private: + std::error_code Flush() + { + if (m_CacheBufferOffset == 0) + { + return {}; + } + std::error_code Res = AppendData(m_CacheBuffer, m_CacheBufferOffset); + m_CacheBufferOffset = 0; + return Res; + } + + std::error_code AppendData(const void* Data, uint64_t Size) + { ZEN_ASSERT(m_FileHandle != nullptr); const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; - const void* Data = DataString.data(); - std::size_t Size = DataString.size(); while (Size) { @@ -476,23 +543,11 @@ public: return {}; } - IoBuffer DetachToIoBuffer() - { - ZEN_ASSERT(m_FileHandle != nullptr); - void* FileHandle = m_FileHandle; - IoBuffer Buffer(IoBuffer::File, FileHandle, 0, m_WriteOffset, /*IsWholeFile*/ true); - Buffer.SetDeleteOnClose(true); - m_FileHandle = 0; - m_WriteOffset = 0; - return Buffer; - } - - uint64_t GetSize() const { return m_WriteOffset; } - void ResetWritePos(uint64_t WriteOffset) { m_WriteOffset = WriteOffset; } - -private: - void* m_FileHandle; - std::uint64_t m_WriteOffset; + void* m_FileHandle; + std::uint64_t m_WriteOffset; + static constexpr uint64_t CacheBufferSize = 512u * 1024u; + uint8_t m_CacheBuffer[CacheBufferSize]; + std::uint64_t m_CacheBufferOffset = 0; }; ////////////////////////////////////////////////////////////////////////// @@ -851,23 +906,28 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold std::unique_ptr<TempPayloadFile> PayloadFile; cpr::Response Response = DoWithRetry( [&]() { - auto DownloadCallback = [&](std::string data, intptr_t) { - if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024)) + auto GetHeader = [&](std::string header) -> std::pair<std::string, std::string> { + size_t DelimiterPos = header.find(':'); + if (DelimiterPos != std::string::npos) { - PayloadFile = std::make_unique<TempPayloadFile>(); - std::error_code Ec = PayloadFile->Open(TempFolderPath); - if (Ec) - { - ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", - TempFolderPath.string(), - Ec.message()); - return false; - } - PayloadFile->Write(PayloadString); - PayloadString.clear(); + std::string Key = header.substr(0, DelimiterPos); + constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n"); + Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters); + Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters); + + std::string Value = header.substr(DelimiterPos + 1); + Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters); + Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters); + + return std::make_pair(Key, Value); } + return std::make_pair(header, ""); + }; + + auto DownloadCallback = [&](std::string data, intptr_t) { if (PayloadFile) { + ZEN_ASSERT(PayloadString.empty()); std::error_code Ec = PayloadFile->Write(data); if (Ec) { @@ -886,9 +946,46 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold cpr::Response Response; { + std::vector<std::pair<std::string, std::string>> ReceivedHeaders; + auto HeaderCallback = [&](std::string header, intptr_t) { + std::pair<std::string, std::string> Header = GetHeader(header); + if (Header.first == "Content-Length"sv) + { + std::optional<size_t> ContentSize = ParseInt<size_t>(Header.second); + if (ContentSize.has_value()) + { + if (ContentSize.value() > 1024 * 1024) + { + PayloadFile = std::make_unique<TempPayloadFile>(); + std::error_code Ec = PayloadFile->Open(TempFolderPath); + if (Ec) + { + ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", + TempFolderPath.string(), + Ec.message()); + PayloadFile.reset(); + } + } + else + { + PayloadString.reserve(ContentSize.value()); + } + } + } + if (!Header.first.empty()) + { + ReceivedHeaders.emplace_back(std::move(Header)); + } + return 1; + }; + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Response = Sess.Download(cpr::WriteCallback{DownloadCallback}); + Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback}); + for (const std::pair<std::string, std::string>& H : ReceivedHeaders) + { + Response.header.insert_or_assign(H.first, H.second); + } } if (m_ConnectionSettings.AllowResume) { @@ -899,7 +996,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold } if (auto It = Response.header.find("Accept-Ranges"); It != Response.header.end()) { - return It->second == "bytes"; + return It->second == "bytes"sv; } return false; }; @@ -923,53 +1020,44 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold std::vector<std::pair<std::string, std::string>> ReceivedHeaders; auto HeaderCallback = [&](std::string header, intptr_t) { - size_t DelimiterPos = header.find(':'); - if (DelimiterPos != std::string::npos) + std::pair<std::string, std::string> Header = GetHeader(header); + if (!Header.first.empty()) { - std::string Key = header.substr(0, DelimiterPos); - constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n"); - Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters); - Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters); - - std::string Value = header.substr(DelimiterPos + 1); - Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters); - Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters); - - ReceivedHeaders.push_back({Key, Value}); + ReceivedHeaders.emplace_back(std::move(Header)); + } - if (Key == "Content-Range"sv) + if (Header.first == "Content-Range"sv) + { + if (Header.second.starts_with("bytes "sv)) { - if (Value.starts_with("bytes ")) + size_t RangeStartEnd = Header.second.find('-', 6); + if (RangeStartEnd != std::string::npos) { - size_t RangeStartEnd = Value.find('-', 6); - if (RangeStartEnd != std::string::npos) + const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6)); + if (Start) { - const auto Start = ParseInt<uint64_t>(Value.substr(6, RangeStartEnd - 6)); - if (Start) + uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); + if (Start.value() == DownloadedSize) { - uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); - if (Start.value() == DownloadedSize) - { - return 1; - } - else if (Start.value() > DownloadedSize) - { - return 0; - } - if (PayloadFile) - { - PayloadFile->ResetWritePos(Start.value()); - } - else - { - PayloadString = PayloadString.substr(0, Start.value()); - } return 1; } + else if (Start.value() > DownloadedSize) + { + return 0; + } + if (PayloadFile) + { + PayloadFile->ResetWritePos(Start.value()); + } + else + { + PayloadString = PayloadString.substr(0, Start.value()); + } + return 1; } } - return 0; } + return 0; } return 1; }; diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp index 8715f5447..b96118484 100644 --- a/src/zenserver/main.cpp +++ b/src/zenserver/main.cpp @@ -123,7 +123,7 @@ ZenEntryPoint::Run() Entry->Pid.load(), m_ServerOptions.OwnerPid); - if (Entry->AddSponsorProcess(m_ServerOptions.OwnerPid)) + if (Entry->AddSponsorProcess(m_ServerOptions.OwnerPid, 2000)) { std::exit(0); } @@ -193,7 +193,8 @@ ZenEntryPoint::Run() if (m_ServerOptions.OwnerPid) { - Entry->AddSponsorProcess(m_ServerOptions.OwnerPid); + // We are adding a sponsor process to our own entry, can't wait for pick since the code is not run until later + Entry->AddSponsorProcess(m_ServerOptions.OwnerPid, 0); } ZenServer Server; diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index e452c658e..3a7922aaf 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -3210,37 +3210,6 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, return ConvertResult(ContainerResult); } -std::pair<HttpResponseCode, std::string> -ProjectStore::WriteBlock(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload) -{ - ZEN_TRACE_CPU("Store::WriteBlock"); - - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); - if (!Project) - { - return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown project '{}'", ProjectId)}; - } - Project->TouchProject(); - - ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); - if (!Oplog) - { - return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown oplog '{}/{}'", ProjectId, OplogId)}; - } - Project->TouchOplog(OplogId); - - if (!IterateBlock(std::move(Payload), [this](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { - IoBuffer Compressed = Chunk.GetCompressed().Flatten().AsIoBuffer(); - m_CidStore.AddChunk(Compressed, AttachmentRawHash); - ZEN_DEBUG("Saved attachment {} from block, size {}", AttachmentRawHash, Compressed.GetSize()); - })) - { - return {HttpResponseCode::BadRequest, "Invalid chunk in block"}; - } - - return {HttpResponseCode::OK, {}}; -} - bool ProjectStore::Rpc(HttpServerRequest& HttpReq, const std::string_view ProjectId, @@ -4736,7 +4705,7 @@ TEST_CASE("project.store.block") } CompressedBuffer Block = GenerateBlock(std::move(Chunks)); IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer(); - CHECK(IterateBlock(std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {})); + CHECK(IterateBlock(Block.DecodeRawHash(), std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {})); } #endif diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 75844f84e..269fe7336 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -369,10 +369,6 @@ public: const HttpServerRequest::QueryParams& Params, CbObject& OutResponse); - std::pair<HttpResponseCode, std::string> WriteBlock(const std::string_view ProjectId, - const std::string_view OplogId, - IoBuffer&& Payload); - bool Rpc(HttpServerRequest& HttpReq, const std::string_view ProjectId, const std::string_view OplogId, diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 65ef099e4..d8402366d 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -113,21 +113,19 @@ IsCancelled(JobContext* OptionalContext) return OptionalContext->IsCancelled(); } +std::string +GetStats(const RemoteProjectStore::Stats& Stats, uint64_t ElapsedWallTimeMS) +{ + return fmt::format("Sent: {} ({}/s) Recv: {} ({}/s)", + NiceBytes(Stats.m_SentBytes), + NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000) / ElapsedWallTimeMS) : 0u), + NiceBytes(Stats.m_ReceivedBytes), + NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000) / ElapsedWallTimeMS) : 0u)); +} + void -ReportRemoteStoreStats(JobContext* OptionalContext, - const RemoteProjectStore::RemoteStoreInfo& RemoteStoreInfo, - const RemoteProjectStore::Stats& Stats, - uint64_t ElapsedWallTimeMS) +LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats) { - ReportMessage( - OptionalContext, - fmt::format("Remote store '{}': " - "Sent: {} ({}/s) Recv: {} ({}/s)", - RemoteStoreInfo.Description, - NiceBytes(Stats.m_SentBytes), - NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000) / ElapsedWallTimeMS) : 0u), - NiceBytes(Stats.m_ReceivedBytes), - NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000) / ElapsedWallTimeMS) : 0u))); ZEN_INFO("Oplog request count: {}. Average request size: {}. Peak request speed: {}", Stats.m_RequestCount, NiceLatencyNs(Stats.m_RequestCount > 0 ? (Stats.m_ReceivedBytes + Stats.m_SentBytes) / Stats.m_RequestCount : 0u), @@ -144,9 +142,19 @@ ReportRemoteStoreStats(JobContext* OptionalContext, } bool -IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) +IterateBlock(const IoHash& BlockHash, + IoBuffer&& CompressedBlock, + std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) { - IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer(); + IoHash RawHash; + uint64_t RawSize; + IoBuffer BlockPayload = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(CompressedBlock)), RawHash, RawSize).Decompress().AsIoBuffer(); + if (RawHash != BlockHash) + { + ZEN_WARN("Header rawhash for downloaded block {} does not match, got {}", BlockHash, RawHash); + return false; + } MemoryView BlockView = BlockPayload.GetView(); const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData()); @@ -1711,19 +1719,29 @@ UploadAttachments(WorkerThreadPool& WorkerPool, ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } } - ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", Remaining), AttachmentsToSave, Remaining); + uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs(); + ReportProgress( + OptionalContext, + fmt::format("Saving attachments, {} remaining... {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)), + AttachmentsToSave, + Remaining); } + uint64_t ElapsedTimeMS = Timer.GetElapsedTimeMs(); if (AttachmentsToSave > 0) { - ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0); + ReportProgress(OptionalContext, + fmt::format("Saving attachments, {} remaining. {}", 0, GetStats(RemoteStore.GetStats(), ElapsedTimeMS)), + AttachmentsToSave, + 0); } ReportMessage(OptionalContext, - fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {}", + fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {} {}", AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(), BlockAttachmentCountToUpload, LargeAttachmentCountToUpload, BulkAttachmentCountToUpload, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()))); + NiceTimeSpanMs(ElapsedTimeMS), + GetStats(RemoteStore.GetStats(), ElapsedTimeMS))); } RemoteProjectStore::Result @@ -2077,10 +2095,10 @@ SaveOplog(CidStore& ChunkStore, RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - ReportRemoteStoreStats(OptionalContext, RemoteStoreInfo, RemoteStore.GetStats(), TransferWallTimeMS); + LogRemoteStoreStatsDetails(RemoteStore.GetStats()); ReportMessage(OptionalContext, - fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({})", + fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}", RemoteStoreInfo.ContainerName, RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), @@ -2088,7 +2106,8 @@ SaveOplog(CidStore& ChunkStore, Info.AttachmentBlocksUploaded.load(), NiceBytes(Info.AttachmentBlockBytesUploaded.load()), Info.AttachmentsUploaded.load(), - NiceBytes(Info.AttachmentBytesUploaded.load()))); + NiceBytes(Info.AttachmentBytesUploaded.load()), + GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); return Result; }; @@ -2360,7 +2379,8 @@ LoadOplog(CidStore& ChunkStore, Info.OplogSizeBytes = LoadContainerResult.ContainerObject.GetSize(); AsyncRemoteResult RemoteResult; - Latch AttachmentsWorkLatch(1); + Latch AttachmentsDownloadLatch(1); + Latch AttachmentsWriteLatch(1); std::atomic_size_t AttachmentCount = 0; Stopwatch LoadAttachmentsTimer; @@ -2381,7 +2401,9 @@ LoadOplog(CidStore& ChunkStore, &Oplog, &ChunkStore, &NetworkWorkerPool, - &AttachmentsWorkLatch, + &WorkerPool, + &AttachmentsDownloadLatch, + &AttachmentsWriteLatch, &AttachmentCount, &RemoteResult, &BlockCountToDownload, @@ -2400,11 +2422,13 @@ LoadOplog(CidStore& ChunkStore, BlockCountToDownload++; if (BlockHash == IoHash::Zero) { - AttachmentsWorkLatch.AddCount(1); + AttachmentsDownloadLatch.AddCount(1); AttachmentCount.fetch_add(1); NetworkWorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, - &AttachmentsWorkLatch, + &WorkerPool, + &AttachmentsDownloadLatch, + &AttachmentsWriteLatch, &RemoteResult, Chunks = std::move(Chunks), &Info, @@ -2412,7 +2436,7 @@ LoadOplog(CidStore& ChunkStore, &DownloadStartMS, IgnoreMissingAttachments, OptionalContext]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { return; @@ -2442,26 +2466,37 @@ LoadOplog(CidStore& ChunkStore, { return; } - for (const auto& It : Result.Chunks) - { - uint64_t ChunkSize = It.second.GetCompressedSize(); - Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); - CidStore::InsertResult InsertResult = - ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly); - if (InsertResult.New) + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork([&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) { - Info.AttachmentBytesStored.fetch_add(ChunkSize); - Info.AttachmentsStored.fetch_add(1); + return; } - } + for (const auto& It : Chunks) + { + uint64_t ChunkSize = It.second.GetCompressedSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + CidStore::InsertResult InsertResult = ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), + It.first, + CidStore::InsertMode::kCopyOnly); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(ChunkSize); + Info.AttachmentsStored.fetch_add(1); + } + } + }); }); return; } - AttachmentsWorkLatch.AddCount(1); + AttachmentsDownloadLatch.AddCount(1); AttachmentCount.fetch_add(1); - NetworkWorkerPool.ScheduleWork([&AttachmentsWorkLatch, + NetworkWorkerPool.ScheduleWork([&AttachmentsDownloadLatch, + &AttachmentsWriteLatch, &ChunkStore, &RemoteStore, + &WorkerPool, BlockHash, &RemoteResult, Chunks = std::move(Chunks), @@ -2470,7 +2505,7 @@ LoadOplog(CidStore& ChunkStore, &DownloadStartMS, IgnoreMissingAttachments, OptionalContext]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { return; @@ -2503,38 +2538,56 @@ LoadOplog(CidStore& ChunkStore, NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), NiceBytes(BlockSize)); Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); - std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; - WantedChunks.reserve(Chunks.size()); - WantedChunks.insert(Chunks.begin(), Chunks.end()); - bool StoreChunksOK = - IterateBlock(std::move(BlockResult.Bytes), - [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { - if (WantedChunks.contains(AttachmentRawHash)) - { - uint64_t ChunkSize = Chunk.GetCompressedSize(); - CidStore::InsertResult InsertResult = - ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); - if (InsertResult.New) + + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork([&AttachmentsWriteLatch, + &RemoteResult, + &Info, + &ChunkStore, + BlockHash, + Chunks = std::move(Chunks), + Bytes = std::move(BlockResult.Bytes), + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + ZEN_ASSERT(Bytes.Size() > 0); + std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; + WantedChunks.reserve(Chunks.size()); + WantedChunks.insert(Chunks.begin(), Chunks.end()); + bool StoreChunksOK = + IterateBlock(BlockHash, + IoBuffer(Bytes), + [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { + if (WantedChunks.contains(AttachmentRawHash)) { - Info.AttachmentBytesStored.fetch_add(ChunkSize); - Info.AttachmentsStored.fetch_add(1); + uint64_t ChunkSize = Chunk.GetCompressedSize(); + CidStore::InsertResult InsertResult = + ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(ChunkSize); + Info.AttachmentsStored.fetch_add(1); + } + WantedChunks.erase(AttachmentRawHash); } - WantedChunks.erase(AttachmentRawHash); - } - }); - if (!StoreChunksOK) - { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} has invalid format ({}): {}", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Invalid format for block {}", BlockHash), - {}); - return; - } - ZEN_ASSERT(WantedChunks.empty()); + }); + if (!StoreChunksOK) + { + ReportMessage(OptionalContext, + fmt::format("Block attachment {} has invalid format ({}): {}", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Invalid format for block {}", BlockHash), + {}); + return; + } + ZEN_ASSERT(WantedChunks.empty()); + }); }); }; @@ -2542,7 +2595,9 @@ LoadOplog(CidStore& ChunkStore, &Oplog, &ChunkStore, &NetworkWorkerPool, - &AttachmentsWorkLatch, + &WorkerPool, + &AttachmentsDownloadLatch, + &AttachmentsWriteLatch, &RemoteResult, &Attachments, &AttachmentCount, @@ -2562,19 +2617,21 @@ LoadOplog(CidStore& ChunkStore, Oplog.CaptureAddedAttachments(std::vector<IoHash>{RawHash}); - AttachmentsWorkLatch.AddCount(1); + AttachmentsDownloadLatch.AddCount(1); AttachmentCount.fetch_add(1); NetworkWorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, + &WorkerPool, &RemoteResult, - &AttachmentsWorkLatch, + &AttachmentsDownloadLatch, + &AttachmentsWriteLatch, RawHash, &LoadAttachmentsTimer, &DownloadStartMS, &Info, IgnoreMissingAttachments, OptionalContext]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { return; @@ -2607,12 +2664,28 @@ LoadOplog(CidStore& ChunkStore, return; } Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); - CidStore::InsertResult InsertResult = ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); - if (InsertResult.New) - { - Info.AttachmentBytesStored.fetch_add(AttachmentSize); - Info.AttachmentsStored.fetch_add(1); - } + + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork([&AttachmentsWriteLatch, + &RemoteResult, + &Info, + &ChunkStore, + RawHash, + AttachmentSize, + Bytes = std::move(AttachmentResult.Bytes), + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(AttachmentSize); + Info.AttachmentsStored.fetch_add(1); + } + }); }); }; @@ -2646,10 +2719,10 @@ LoadOplog(CidStore& ChunkStore, BlockCountToDownload, FilesToDechunk.size())); - AttachmentsWorkLatch.CountDown(); - while (!AttachmentsWorkLatch.Wait(1000)) + AttachmentsDownloadLatch.CountDown(); + while (!AttachmentsDownloadLatch.Wait(1000)) { - ptrdiff_t Remaining = AttachmentsWorkLatch.Remaining(); + ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining(); if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) @@ -2657,16 +2730,47 @@ LoadOplog(CidStore& ChunkStore, RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); } } - ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", Remaining), AttachmentCount.load(), Remaining); + uint64_t PartialTransferWallTimeMS = TransferWallTimeMS; + if (DownloadStartMS != (uint64_t)-1) + { + PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); + } + ReportProgress( + OptionalContext, + fmt::format("Loading attachments, {} remaining. {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)), + AttachmentCount.load(), + Remaining); + } + if (DownloadStartMS != (uint64_t)-1) + { + TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); } + if (AttachmentCount.load() > 0) { - ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", 0), AttachmentCount.load(), 0); + ReportProgress(OptionalContext, + fmt::format("Loading attachments, {} remaining. {}", 0, GetStats(RemoteStore.GetStats(), TransferWallTimeMS)), + AttachmentCount.load(), + 0); } - if (DownloadStartMS != (uint64_t)-1) + AttachmentsWriteLatch.CountDown(); + while (!AttachmentsWriteLatch.Wait(1000)) { - TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); + ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining(); + if (IsCancelled(OptionalContext)) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } + } + ReportProgress(OptionalContext, fmt::format("Writing attachments, {} remaining.", Remaining), AttachmentCount.load(), Remaining); + } + + if (AttachmentCount.load() > 0) + { + ReportProgress(OptionalContext, fmt::format("Writing attachments, {} remaining.", 0), AttachmentCount.load(), 0); } if (Result.ErrorCode == 0) @@ -2811,10 +2915,10 @@ LoadOplog(CidStore& ChunkStore, Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - ReportRemoteStoreStats(OptionalContext, RemoteStoreInfo, RemoteStore.GetStats(), TransferWallTimeMS); + LogRemoteStoreStatsDetails(RemoteStore.GetStats()); ReportMessage(OptionalContext, - fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {}", + fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {} {}", RemoteStoreInfo.ContainerName, Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE", NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), @@ -2825,7 +2929,8 @@ LoadOplog(CidStore& ChunkStore, NiceBytes(Info.AttachmentBytesDownloaded.load()), Info.AttachmentsStored.load(), NiceBytes(Info.AttachmentBytesStored.load()), - Info.MissingAttachmentCount.load())); + Info.MissingAttachmentCount.load(), + GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); return Result; } diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index b00aa231f..d4ccd8c7b 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -162,6 +162,8 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, JobContext* OptionalContext); CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks); -bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); +bool IterateBlock(const IoHash& BlockHash, + IoBuffer&& CompressedBlock, + std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); } // namespace zen diff --git a/src/zenutil/include/zenutil/zenserverprocess.h b/src/zenutil/include/zenutil/zenserverprocess.h index 1bd00acb7..f7204fb43 100644 --- a/src/zenutil/include/zenutil/zenserverprocess.h +++ b/src/zenutil/include/zenutil/zenserverprocess.h @@ -168,7 +168,7 @@ public: bool IsShutdownRequested() const; void SignalReady(); bool IsReady() const; - bool AddSponsorProcess(uint32_t Pid); + bool AddSponsorProcess(uint32_t Pid, uint64_t Timeout = 0); }; static_assert(sizeof(ZenServerEntry) == 64); diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp index f5bc088a5..34eec9790 100644 --- a/src/zenutil/zenserverprocess.cpp +++ b/src/zenutil/zenserverprocess.cpp @@ -403,7 +403,7 @@ ZenServerState::ZenServerEntry::IsReady() const } bool -ZenServerState::ZenServerEntry::AddSponsorProcess(uint32_t PidToAdd) +ZenServerState::ZenServerEntry::AddSponsorProcess(uint32_t PidToAdd, uint64_t Timeout) { uint32_t ServerPid = Pid.load(); auto WaitForPickup = [&](uint32_t AddedSlotIndex) { @@ -427,13 +427,13 @@ ZenServerState::ZenServerEntry::AddSponsorProcess(uint32_t PidToAdd) { if (SponsorPids[SponsorIndex].load(std::memory_order_relaxed) == PidToAdd) { - return WaitForPickup(SponsorIndex); + return Timeout == 0 ? true : WaitForPickup(SponsorIndex); } uint32_t Expected = 0; if (SponsorPids[SponsorIndex].compare_exchange_strong(Expected, PidToAdd)) { // Success! - return WaitForPickup(SponsorIndex); + return Timeout == 0 ? true : WaitForPickup(SponsorIndex); } } @@ -865,6 +865,8 @@ ZenServerInstance::OnServerReady() m_BasePort = Entry->EffectiveListenPort; CreateShutdownEvent(m_BasePort); + + ZEN_DEBUG("Server '{}' is ready on port {}", m_Name, m_BasePort); } std::string |