aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-04-22 14:59:28 +0200
committerGitHub Enterprise <[email protected]>2026-04-22 14:59:28 +0200
commitd9f113ec0f1c18bfcef91e8420650d99e6670a43 (patch)
tree7afaa761f6be0f3831728b05691f6d3d2ac4353f
parentfix consul test timeout (#1010) (diff)
downloadarchived-zen-d9f113ec0f1c18bfcef91e8420650d99e6670a43.tar.xz
archived-zen-d9f113ec0f1c18bfcef91e8420650d99e6670a43.zip
hub execution stats (#1011)
- Improvement: Hub hydration and dehydration completion logs now include per-phase wall time, bytes transferred, bits/s throughput, number of unique worker threads used, and the storage source/target URI - Improvement: Hub storage server instance lifecycle logs now report elapsed time for spawn and shutdown - Improvement: Hub deprovisioning now logs GC completion status and elapsed time; a GC that does not complete within the 5s deadline is logged as a warning and shutdown proceeds anyway
-rw-r--r--CHANGELOG.md3
-rw-r--r--src/zenserver/hub/hub.cpp20
-rw-r--r--src/zenserver/hub/hydration.cpp686
-rw-r--r--src/zenserver/hub/storageserverinstance.cpp45
-rw-r--r--src/zenserver/hub/storageserverinstance.h1
5 files changed, 491 insertions, 264 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 732c19774..598d5ba29 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,9 @@
- Improvement: Dashboard banner displays the zenserver version next to the wordmark
- Improvement: Zen log messages are forwarded to trace as typed `ZenLog.*` events that preserve structured `fmt` args end-to-end, so the zen trace analyzer and other downstream consumers can re-render messages with full formatter support (nested widths, chrono specs, etc.). `--trace=log` now routes to the zen log channel (the upstream UE `log` channel is unused since zen no longer emits `Logging.*` events)
- Improvement: Hub Consul client HTTP timeout defaults raised to 1s connect / 2s total so transient latency to a slow Consul agent no longer fails registration calls
+- Improvement: Hub hydration and dehydration completion logs now include per-phase wall time, bytes transferred, bits/s throughput, number of unique worker threads used, and the storage source/target URI
+- Improvement: Hub storage server instance lifecycle logs now report elapsed time for spawn and shutdown
+- Improvement: Hub deprovisioning now logs GC completion status and elapsed time; a GC that does not complete within the 5s deadline is logged as a warning and shutdown proceeds anyway
- Bugfix: `zen builds ls` no longer fails against cloud build storage (`--host`/`--url`) when `--storage-path` is not supplied
- Bugfix: `NamedEvent` construction on Linux/macOS no longer races against a concurrent destructor's `unlink()` of the backing file; the IPC key is now derived via `fstat()` on the open fd instead of `ftok()` re-stat'ing the path
- Bugfix: Hub provision requests now return 202 Accepted when the module is `Recovering` or `Waking` instead of rejecting
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index 978814b46..c03c1a9a0 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -9,6 +9,7 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
+#include <zencore/string.h>
#include <zencore/timer.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpclient.h>
@@ -878,15 +879,17 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, si
if (OldState == HubInstanceState::Provisioned)
{
ZEN_INFO("Triggering GC for module {}", ModuleId);
+ Stopwatch GcTimer;
HttpClient GcClient(fmt::format("http://localhost:{}", Port));
HttpClient::KeyValueMap Params;
Params.Entries.insert({"smallobjects", "true"});
Params.Entries.insert({"skipcid", "false"});
- HttpClient::Response Response = GcClient.Post("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject), Params);
- Stopwatch Timer;
- while (Response && Timer.GetElapsedTimeMs() < 5000)
+ HttpClient::Response Response = GcClient.Post("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject), Params);
+ bool GcCompleted = false;
+ Stopwatch DeadlineTimer;
+ while (Response && DeadlineTimer.GetElapsedTimeMs() < 5000)
{
Response = GcClient.Get("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject));
if (Response)
@@ -894,11 +897,22 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, si
bool Complete = Response.AsObject()["Status"].AsString() != "Running";
if (Complete)
{
+ GcCompleted = true;
break;
}
Sleep(50);
}
}
+ if (GcCompleted)
+ {
+ ZEN_INFO("GC for module {} completed in {}", ModuleId, NiceLatencyNs(GcTimer.GetElapsedTimeUs() * 1000));
+ }
+ else
+ {
+ ZEN_WARN("GC for module {} did not complete after {}, proceeding with shutdown",
+ ModuleId,
+ NiceLatencyNs(GcTimer.GetElapsedTimeUs() * 1000));
+ }
}
Instance.Deprovision();
}
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp
index 2f224c357..c7f25bab6 100644
--- a/src/zenserver/hub/hydration.cpp
+++ b/src/zenserver/hub/hydration.cpp
@@ -14,6 +14,7 @@
#include <zencore/parallelwork.h>
#include <zencore/stream.h>
#include <zencore/system.h>
+#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zenutil/cloud/imdscredentials.h>
#include <zenutil/cloud/s3client.h>
@@ -26,7 +27,6 @@
#if ZEN_WITH_TESTS
# include <zencore/testing.h>
# include <zencore/testutils.h>
-# include <zencore/thread.h>
# include <zencore/workthreadpool.h>
# include <zenutil/cloud/minioprocess.h>
# include <cstring>
@@ -80,6 +80,68 @@ namespace hydration_impl {
}
///////////////////////////////////////////////////////////////////////
+ // Hydration / dehydration statistics. Atomics so they are safe to update
+ // from parallel worker lambdas. Summary is emitted once after the operation
+ // completes (success or failure).
+
+ struct PhaseStats
+ {
+ std::atomic<uint64_t> Files{0}; // host-side: count of work scheduled in this phase
+ std::atomic<uint64_t> Bytes{0}; // lambda-side: bytes transferred on successful completion
+ std::atomic<uint64_t> ElapsedUs{0}; // wall time around Work.Wait()
+
+ RwLock ThreadIdsLock;
+ std::unordered_set<int> ThreadIds;
+
+ void RecordThread()
+ {
+ int Tid = zen::GetCurrentThreadId();
+ ThreadIdsLock.WithExclusiveLock([&] { ThreadIds.insert(Tid); });
+ }
+ };
+
+ struct DehydrateStatistics
+ {
+ PhaseStats Hash;
+ PhaseStats Upload;
+ PhaseStats Touch; // Touch shares Upload's ParallelWork / ElapsedUs
+
+ std::atomic<uint64_t> LoadStateUs{0};
+ std::atomic<uint64_t> DirScanUs{0};
+ std::atomic<uint64_t> ListExistingUs{0};
+ std::atomic<uint64_t> MetadataSaveUs{0};
+ std::atomic<uint64_t> CleanUs{0};
+
+ std::atomic<uint64_t> TotalFiles{0};
+ std::atomic<uint64_t> TotalBytes{0};
+ std::atomic<uint64_t> TotalUs{0};
+ };
+
+ struct HydrateStatistics
+ {
+ PhaseStats Download;
+
+ std::atomic<uint64_t> LoadMetadataUs{0};
+ std::atomic<uint64_t> CleanUs{0};
+ std::atomic<uint64_t> RenameOrCopyUs{0};
+ std::atomic<uint64_t> VerifyScanUs{0};
+
+ std::atomic<uint64_t> TotalFiles{0};
+ std::atomic<uint64_t> TotalBytes{0};
+ std::atomic<uint64_t> TotalUs{0};
+ };
+
+ // Bits-per-second rate computed at microsecond precision. Zero-safe.
+ inline uint64_t BitsPerSecond(uint64_t Bytes, uint64_t ElapsedUs)
+ {
+ if (ElapsedUs == 0)
+ {
+ return 0;
+ }
+ return Bytes * 8 * 1'000'000ull / ElapsedUs;
+ }
+
+ ///////////////////////////////////////////////////////////////////////
// Per-module storage interface driven by IncrementalHydrator.
class StorageBase
@@ -87,23 +149,26 @@ namespace hydration_impl {
public:
virtual ~StorageBase() = default;
- virtual void SaveMetadata(const CbObject& Data) = 0;
- virtual CbObject LoadMetadata() = 0;
- virtual CbObject GetSettings() = 0;
- virtual void ParseSettings(const CbObjectView& Settings) = 0;
- virtual std::vector<IoHash> List() = 0;
+ virtual std::string Describe() const = 0;
+ virtual void SaveMetadata(const CbObject& Data) = 0;
+ virtual CbObject LoadMetadata() = 0;
+ virtual CbObject GetSettings() = 0;
+ virtual void ParseSettings(const CbObjectView& Settings) = 0;
+ virtual std::vector<IoHash> List() = 0;
virtual void Put(ParallelWork& Work,
WorkerThreadPool& WorkerPool,
const IoHash& Hash,
uint64_t Size,
- const std::filesystem::path& SourcePath) = 0;
+ const std::filesystem::path& SourcePath,
+ PhaseStats& Stats) = 0;
virtual void Get(ParallelWork& Work,
WorkerThreadPool& WorkerPool,
const IoHash& Hash,
uint64_t Size,
- const std::filesystem::path& DestinationPath) = 0;
- virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) = 0;
- virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) = 0;
+ const std::filesystem::path& DestinationPath,
+ PhaseStats& Stats) = 0;
+ virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats) = 0;
+ virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) = 0;
};
class FileStorage : public StorageBase
@@ -114,6 +179,7 @@ namespace hydration_impl {
explicit FileStorage(std::filesystem::path ModulePath);
+ virtual std::string Describe() const override { return fmt::format("file://{}", m_StoragePath.generic_string()); }
virtual void SaveMetadata(const CbObject& Data) override;
virtual CbObject LoadMetadata() override;
virtual CbObject GetSettings() override { return {}; }
@@ -123,13 +189,15 @@ namespace hydration_impl {
WorkerThreadPool& WorkerPool,
const IoHash& Hash,
uint64_t Size,
- const std::filesystem::path& SourcePath) override;
+ const std::filesystem::path& SourcePath,
+ PhaseStats& Stats) override;
virtual void Get(ParallelWork& Work,
WorkerThreadPool& WorkerPool,
const IoHash& Hash,
uint64_t Size,
- const std::filesystem::path& DestinationPath) override;
- virtual void Touch(ParallelWork&, WorkerThreadPool&, const IoHash&) override {}
+ const std::filesystem::path& DestinationPath,
+ PhaseStats& Stats) override;
+ virtual void Touch(ParallelWork&, WorkerThreadPool&, const IoHash&, PhaseStats&) override {}
virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override;
private:
@@ -147,6 +215,7 @@ namespace hydration_impl {
S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize);
+ virtual std::string Describe() const override { return fmt::format("s3://{}/{}", m_Client.BucketName(), m_KeyPrefix); }
virtual void SaveMetadata(const CbObject& Data) override;
virtual CbObject LoadMetadata() override;
virtual CbObject GetSettings() override;
@@ -156,13 +225,15 @@ namespace hydration_impl {
WorkerThreadPool& WorkerPool,
const IoHash& Hash,
uint64_t Size,
- const std::filesystem::path& SourcePath) override;
+ const std::filesystem::path& SourcePath,
+ PhaseStats& Stats) override;
virtual void Get(ParallelWork& Work,
WorkerThreadPool& WorkerPool,
const IoHash& Hash,
uint64_t Size,
- const std::filesystem::path& DestinationPath) override;
- virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash) override;
+ const std::filesystem::path& DestinationPath,
+ PhaseStats& Stats) override;
+ virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats) override;
virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override;
private:
@@ -232,43 +303,48 @@ namespace hydration_impl {
WorkerThreadPool& WorkerPool,
const IoHash& Hash,
uint64_t Size,
- const std::filesystem::path& SourcePath)
+ const std::filesystem::path& SourcePath,
+ PhaseStats& Stats)
{
- ZEN_UNUSED(Size);
- Work.ScheduleWork(WorkerPool,
- [this, Hash = IoHash(Hash), SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) {
- if (!AbortFlag.load())
- {
- std::filesystem::path DestPath = m_CASPath / fmt::format("{}", Hash);
- if (std::error_code Ec = CopyFile(SourcePath, DestPath, CopyFileOptions{.EnableClone = true}); Ec)
- {
- throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestPath));
- }
- }
- });
- }
-
- void FileStorage::Get(ParallelWork& Work,
- WorkerThreadPool& WorkerPool,
- const IoHash& Hash,
- uint64_t Size,
- const std::filesystem::path& DestinationPath)
- {
- ZEN_UNUSED(Size);
Work.ScheduleWork(
WorkerPool,
- [this, Hash = IoHash(Hash), DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) {
+ [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), &Stats](std::atomic<bool>& AbortFlag) {
+ Stats.RecordThread();
if (!AbortFlag.load())
{
- std::filesystem::path SourcePath = m_CASPath / fmt::format("{}", Hash);
- if (std::error_code Ec = CopyFile(SourcePath, DestinationPath, CopyFileOptions{.EnableClone = true}); Ec)
+ std::filesystem::path DestPath = m_CASPath / fmt::format("{}", Hash);
+ if (std::error_code Ec = CopyFile(SourcePath, DestPath, CopyFileOptions{.EnableClone = true}); Ec)
{
- throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestinationPath));
+ throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestPath));
}
+ Stats.Bytes.fetch_add(Size, std::memory_order_relaxed);
}
});
}
+ void FileStorage::Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath,
+ PhaseStats& Stats)
+ {
+ Work.ScheduleWork(WorkerPool,
+ [this, Hash = IoHash(Hash), Size, DestinationPath = std::filesystem::path(DestinationPath), &Stats](
+ std::atomic<bool>& AbortFlag) {
+ Stats.RecordThread();
+ if (!AbortFlag.load())
+ {
+ std::filesystem::path SourcePath = m_CASPath / fmt::format("{}", Hash);
+ if (std::error_code Ec = CopyFile(SourcePath, DestinationPath, CopyFileOptions{.EnableClone = true}); Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestinationPath));
+ }
+ Stats.Bytes.fetch_add(Size, std::memory_order_relaxed);
+ }
+ });
+ }
+
void FileStorage::Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool)
{
ZEN_UNUSED(Work);
@@ -366,47 +442,52 @@ namespace hydration_impl {
WorkerThreadPool& WorkerPool,
const IoHash& Hash,
uint64_t Size,
- const std::filesystem::path& SourcePath)
+ const std::filesystem::path& SourcePath,
+ PhaseStats& Stats)
{
- Work.ScheduleWork(WorkerPool,
- [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath)](std::atomic<bool>& AbortFlag) {
- if (AbortFlag.load())
- {
- return;
- }
- S3Client& Client = m_Client;
- std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
+ Work.ScheduleWork(
+ WorkerPool,
+ [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), &Stats](std::atomic<bool>& AbortFlag) {
+ Stats.RecordThread();
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ S3Client& Client = m_Client;
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
- if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
- {
- BasicFile File(SourcePath, BasicFile::Mode::kRead);
- S3Result Result = Client.PutObjectMultipart(
- Key,
- Size,
- [&File](uint64_t Offset, uint64_t ChunkSize) { return File.ReadRange(Offset, ChunkSize); },
- m_MultipartChunkSize);
- if (!Result.IsSuccess())
- {
- throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
- }
- }
- else
- {
- BasicFile File(SourcePath, BasicFile::Mode::kRead);
- S3Result Result = Client.PutObject(Key, File.ReadAll());
- if (!Result.IsSuccess())
- {
- throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
- }
- }
- });
+ if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
+ {
+ BasicFile File(SourcePath, BasicFile::Mode::kRead);
+ S3Result Result = Client.PutObjectMultipart(
+ Key,
+ Size,
+ [&File](uint64_t Offset, uint64_t ChunkSize) { return File.ReadRange(Offset, ChunkSize); },
+ m_MultipartChunkSize);
+ if (!Result.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
+ }
+ }
+ else
+ {
+ BasicFile File(SourcePath, BasicFile::Mode::kRead);
+ S3Result Result = Client.PutObject(Key, File.ReadAll());
+ if (!Result.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
+ }
+ }
+ Stats.Bytes.fetch_add(Size, std::memory_order_relaxed);
+ });
}
void S3Storage::Get(ParallelWork& Work,
WorkerThreadPool& WorkerPool,
const IoHash& Hash,
uint64_t Size,
- const std::filesystem::path& DestinationPath)
+ const std::filesystem::path& DestinationPath,
+ PhaseStats& Stats)
{
std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
@@ -433,7 +514,8 @@ namespace hydration_impl {
{
uint64_t ChunkSize = std::min<uint64_t>(m_MultipartChunkSize, Size - Offset);
- Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data](std::atomic<bool>& AbortFlag) {
+ Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data, &Stats](std::atomic<bool>& AbortFlag) {
+ Stats.RecordThread();
if (AbortFlag)
{
return;
@@ -449,56 +531,61 @@ namespace hydration_impl {
}
Data->Write(Chunk.Content.GetData(), Chunk.Content.GetSize(), Offset);
+ Stats.Bytes.fetch_add(ChunkSize, std::memory_order_relaxed);
});
Offset += ChunkSize;
}
}
else
{
- Work.ScheduleWork(WorkerPool,
- [this, Key = Key, DestinationPath = std::filesystem::path(DestinationPath)](std::atomic<bool>& AbortFlag) {
- if (AbortFlag)
- {
- return;
- }
- S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir);
- if (!Chunk.IsSuccess())
- {
- throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error);
- }
+ Work.ScheduleWork(
+ WorkerPool,
+ [this, Key = Key, Size, DestinationPath = std::filesystem::path(DestinationPath), &Stats](std::atomic<bool>& AbortFlag) {
+ Stats.RecordThread();
+ if (AbortFlag)
+ {
+ return;
+ }
+ S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir);
+ if (!Chunk.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error);
+ }
- if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef))
- {
- std::error_code Ec;
- std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (Ec)
- {
- WriteFile(DestinationPath, Chunk.Content);
- }
- else
- {
- Chunk.Content.SetDeleteOnClose(false);
- Chunk.Content = {};
- RenameFile(ChunkPath, DestinationPath, Ec);
- if (Ec)
- {
- Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath);
- Chunk.Content.SetDeleteOnClose(true);
- WriteFile(DestinationPath, Chunk.Content);
- }
- }
- }
- else
- {
- WriteFile(DestinationPath, Chunk.Content);
- }
- });
+ if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef))
+ {
+ std::error_code Ec;
+ std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (Ec)
+ {
+ WriteFile(DestinationPath, Chunk.Content);
+ }
+ else
+ {
+ Chunk.Content.SetDeleteOnClose(false);
+ Chunk.Content = {};
+ RenameFile(ChunkPath, DestinationPath, Ec);
+ if (Ec)
+ {
+ Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath);
+ Chunk.Content.SetDeleteOnClose(true);
+ WriteFile(DestinationPath, Chunk.Content);
+ }
+ }
+ }
+ else
+ {
+ WriteFile(DestinationPath, Chunk.Content);
+ }
+ Stats.Bytes.fetch_add(Size, std::memory_order_relaxed);
+ });
}
}
- void S3Storage::Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash)
+ void S3Storage::Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats)
{
- Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash)](std::atomic<bool>& AbortFlag) {
+ Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash), &Stats](std::atomic<bool>& AbortFlag) {
+ Stats.RecordThread();
if (AbortFlag.load())
{
return;
@@ -538,6 +625,91 @@ namespace hydration_impl {
///////////////////////////////////////////////////////////////////////
// IncrementalHydrator: the only HydrationStrategyBase implementation.
+ // Summary emission for hydrate/dehydrate operations.
+
+ void LogDehydrateSummary(std::string_view Prefix,
+ const DehydrateStatistics& Stats,
+ std::string_view ModuleId,
+ const std::filesystem::path& Source,
+ std::string_view Target)
+ {
+ const uint64_t HashUs = Stats.Hash.ElapsedUs.load();
+ const uint64_t UploadUs = Stats.Upload.ElapsedUs.load();
+ ZEN_INFO(
+ "{} module '{}': {} files ({}) in {}\n"
+ " Source: {}\n"
+ " Target: {}\n"
+ " Load state: {}\n"
+ " Dir scan: {}\n"
+ " Hash phase: {} {}/{} ({}) hashed, {}bits/s, {} threads\n"
+ " List existing: {}\n"
+ " Upload phase: {} {}/{} ({}) uploaded, {} ({}) touched, {}bits/s, {} threads\n"
+ " Metadata save: {}\n"
+ " Clean: {}",
+ Prefix,
+ ModuleId,
+ ThousandsNum(Stats.TotalFiles.load()),
+ NiceBytes(Stats.TotalBytes.load()),
+ NiceLatencyNs(Stats.TotalUs.load() * 1000),
+ Source.generic_string(),
+ Target,
+ NiceLatencyNs(Stats.LoadStateUs.load() * 1000),
+ NiceLatencyNs(Stats.DirScanUs.load() * 1000),
+ NiceLatencyNs(HashUs * 1000),
+ ThousandsNum(Stats.Hash.Files.load()),
+ ThousandsNum(Stats.TotalFiles.load()),
+ NiceBytes(Stats.Hash.Bytes.load()),
+ NiceNum(BitsPerSecond(Stats.Hash.Bytes.load(), HashUs)),
+ Stats.Hash.ThreadIds.size(),
+ NiceLatencyNs(Stats.ListExistingUs.load() * 1000),
+ NiceLatencyNs(UploadUs * 1000),
+ ThousandsNum(Stats.Upload.Files.load()),
+ ThousandsNum(Stats.TotalFiles.load()),
+ NiceBytes(Stats.Upload.Bytes.load()),
+ ThousandsNum(Stats.Touch.Files.load()),
+ NiceBytes(Stats.Touch.Bytes.load()),
+ NiceNum(BitsPerSecond(Stats.Upload.Bytes.load(), UploadUs)),
+ Stats.Upload.ThreadIds.size(),
+ NiceLatencyNs(Stats.MetadataSaveUs.load() * 1000),
+ NiceLatencyNs(Stats.CleanUs.load() * 1000));
+ }
+
+ void LogHydrateSummary(std::string_view Prefix,
+ const HydrateStatistics& Stats,
+ std::string_view ModuleId,
+ std::string_view Source,
+ const std::filesystem::path& Target)
+ {
+ const uint64_t DownloadUs = Stats.Download.ElapsedUs.load();
+ ZEN_INFO(
+ "{} module '{}': {} files ({}) in {}\n"
+ " Source: {}\n"
+ " Target: {}\n"
+ " Load metadata: {}\n"
+ " Download phase: {} {}/{} ({}) downloaded, {}bits/s, {} threads\n"
+ " Clean: {}\n"
+ " Rename/copy: {}\n"
+ " Verify scan: {}",
+ Prefix,
+ ModuleId,
+ ThousandsNum(Stats.TotalFiles.load()),
+ NiceBytes(Stats.TotalBytes.load()),
+ NiceLatencyNs(Stats.TotalUs.load() * 1000),
+ Source,
+ Target.generic_string(),
+ NiceLatencyNs(Stats.LoadMetadataUs.load() * 1000),
+ NiceLatencyNs(DownloadUs * 1000),
+ ThousandsNum(Stats.Download.Files.load()),
+ ThousandsNum(Stats.TotalFiles.load()),
+ NiceBytes(Stats.Download.Bytes.load()),
+ NiceNum(BitsPerSecond(Stats.Download.Bytes.load(), DownloadUs)),
+ Stats.Download.ThreadIds.size(),
+ NiceLatencyNs(Stats.CleanUs.load() * 1000),
+ NiceLatencyNs(Stats.RenameOrCopyUs.load() * 1000),
+ NiceLatencyNs(Stats.VerifyScanUs.load() * 1000));
+ }
+
+ ///////////////////////////////////////////////////////////////////////
// Holds a per-module StorageBase and threading context; drives the
// hydrate/dehydrate algorithm.
@@ -588,31 +760,41 @@ namespace hydration_impl {
void IncrementalHydrator::Dehydrate(const CbObject& CachedState)
{
- Stopwatch Timer;
+ Stopwatch TotalTimer;
+ DehydrateStatistics Stats;
+ const std::string StorageTarget = m_Storage->Describe();
const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
try
{
std::unordered_map<std::string, size_t> StateEntryLookup;
std::vector<Entry> StateEntries;
- for (CbFieldView FieldView : CachedState["Files"].AsArrayView())
{
- CbObjectView EntryView = FieldView.AsObjectView();
- std::filesystem::path RelativePath(EntryView["Path"].AsString());
- uint64_t Size = EntryView["Size"].AsUInt64();
- uint64_t ModTick = EntryView["ModTick"].AsUInt64();
- IoHash Hash = EntryView["Hash"].AsHash();
-
- StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size());
- StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash});
+ Stopwatch LoadStateTimer;
+ for (CbFieldView FieldView : CachedState["Files"].AsArrayView())
+ {
+ CbObjectView EntryView = FieldView.AsObjectView();
+ std::filesystem::path RelativePath(EntryView["Path"].AsString());
+ uint64_t Size = EntryView["Size"].AsUInt64();
+ uint64_t ModTick = EntryView["ModTick"].AsUInt64();
+ IoHash Hash = EntryView["Hash"].AsHash();
+
+ StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size());
+ StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash});
+ }
+ Stats.LoadStateUs = LoadStateTimer.GetElapsedTimeUs();
}
DirectoryContent DirContent;
- GetDirectoryContent(*m_Threading.WorkerPool,
- ServerStateDir,
- DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
- DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
- DirContent);
+ {
+ Stopwatch DirScanTimer;
+ GetDirectoryContent(*m_Threading.WorkerPool,
+ ServerStateDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
+ DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
+ DirContent);
+ Stats.DirScanUs = DirScanTimer.GetElapsedTimeUs();
+ }
ZEN_INFO("Dehydrating module '{}' from folder '{}'. {} ({}) files",
m_Config.ModuleId,
@@ -623,14 +805,13 @@ namespace hydration_impl {
std::vector<Entry> Entries;
Entries.resize(DirContent.Files.size());
- uint64_t TotalBytes = 0;
- uint64_t TotalFiles = 0;
- uint64_t HashedFiles = 0;
- uint64_t HashedBytes = 0;
+ uint64_t TotalBytes = 0;
+ uint64_t TotalFiles = 0;
std::unordered_set<IoHash> ExistsLookup;
{
+ Stopwatch HashTimer;
ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
@@ -669,7 +850,8 @@ namespace hydration_impl {
if (!FoundHash)
{
Work.ScheduleWork(*m_Threading.WorkerPool,
- [AbsPath, EntryIndex = TotalFiles, &Entries](std::atomic<bool>& AbortFlag) {
+ [AbsPath, EntryIndex = TotalFiles, &Entries, &Stats](std::atomic<bool>& AbortFlag) {
+ Stats.Hash.RecordThread();
if (AbortFlag)
{
return;
@@ -708,28 +890,33 @@ namespace hydration_impl {
{
CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath));
}
+ Stats.Hash.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed);
});
- HashedFiles++;
- HashedBytes += CurrentEntry.Size;
+ Stats.Hash.Files.fetch_add(1, std::memory_order_relaxed);
}
TotalFiles++;
TotalBytes += CurrentEntry.Size;
}
- std::vector<IoHash> ExistingEntries = m_Storage->List();
- ExistsLookup.insert(ExistingEntries.begin(), ExistingEntries.end());
+ {
+ Stopwatch ListTimer;
+ std::vector<IoHash> ExistingEntries = m_Storage->List();
+ ExistsLookup.insert(ExistingEntries.begin(), ExistingEntries.end());
+ Stats.ListExistingUs = ListTimer.GetElapsedTimeUs();
+ }
Work.Wait();
Entries.resize(TotalFiles);
+ Stats.Hash.ElapsedUs = HashTimer.GetElapsedTimeUs();
+ Stats.TotalFiles = TotalFiles;
+ Stats.TotalBytes = TotalBytes;
}
- uint64_t UploadedFiles = 0;
- uint64_t UploadedBytes = 0;
- uint64_t TouchedFiles = 0;
- uint64_t TouchedBytes = 0;
+ uint64_t UploadDurationMs = 0;
{
- ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
+ Stopwatch UploadTimer;
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (const Entry& CurrentEntry : Entries)
{
@@ -739,23 +926,25 @@ namespace hydration_impl {
*m_Threading.WorkerPool,
CurrentEntry.Hash,
CurrentEntry.Size,
- MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath));
- UploadedFiles++;
- UploadedBytes += CurrentEntry.Size;
+ MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath),
+ Stats.Upload);
+ Stats.Upload.Files.fetch_add(1, std::memory_order_relaxed);
}
else
{
// Refresh the backend's modification time so lifecycle-expiration policies
// do not evict CAS entries that are still referenced by this module.
- m_Storage->Touch(Work, *m_Threading.WorkerPool, CurrentEntry.Hash);
- TouchedFiles++;
- TouchedBytes += CurrentEntry.Size;
+ m_Storage->Touch(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, Stats.Touch);
+ Stats.Touch.Files.fetch_add(1, std::memory_order_relaxed);
+ Stats.Touch.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed);
}
}
Work.Wait();
- uint64_t UploadTimeMs = Timer.GetElapsedTimeMs();
+ Stats.Upload.ElapsedUs = UploadTimer.GetElapsedTimeUs();
+ UploadDurationMs = TotalTimer.GetElapsedTimeMs();
+ Stopwatch MetadataTimer;
UtcTime Now = UtcTime::Now();
std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z",
Now.Tm.tm_year + 1900,
@@ -771,7 +960,7 @@ namespace hydration_impl {
Meta << "ModuleId" << m_Config.ModuleId;
Meta << "HostName" << GetMachineName();
Meta << "UploadTimeUtc" << UploadTimeUtc;
- Meta << "UploadDurationMs" << UploadTimeMs;
+ Meta << "UploadDurationMs" << UploadDurationMs;
Meta << "TotalSizeBytes" << TotalBytes;
Meta << "StorageSettings" << m_Storage->GetSettings();
@@ -790,25 +979,18 @@ namespace hydration_impl {
Meta.EndArray();
m_Storage->SaveMetadata(Meta.Save());
+ Stats.MetadataSaveUs = MetadataTimer.GetElapsedTimeUs();
}
ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
+ {
+ Stopwatch CleanTimer;
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
+ Stats.CleanUs = CleanTimer.GetElapsedTimeUs();
+ }
- ZEN_INFO(
- "Dehydration of module '{}' completed from folder '{}'. Hashed {} ({}). Uploaded {} ({}). Touched {} ({}). Total {} ({}) "
- "in {}",
- m_Config.ModuleId,
- m_Config.ServerStateDir,
- HashedFiles,
- NiceBytes(HashedBytes),
- UploadedFiles,
- NiceBytes(UploadedBytes),
- TouchedFiles,
- NiceBytes(TouchedBytes),
- TotalFiles,
- NiceBytes(TotalBytes),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ Stats.TotalUs = TotalTimer.GetElapsedTimeUs();
+ LogDehydrateSummary("Dehydration complete", Stats, m_Config.ModuleId, ServerStateDir, StorageTarget);
}
catch (const std::exception& Ex)
{
@@ -816,18 +998,27 @@ namespace hydration_impl {
m_Config.ModuleId,
Ex.what(),
m_Config.ServerStateDir);
+ Stats.TotalUs = TotalTimer.GetElapsedTimeUs();
+ LogDehydrateSummary("Dehydration failed", Stats, m_Config.ModuleId, ServerStateDir, StorageTarget);
}
}
CbObject IncrementalHydrator::Hydrate()
{
- Stopwatch Timer;
+ Stopwatch TotalTimer;
+ HydrateStatistics Stats;
+ const std::string StorageSource = m_Storage->Describe();
const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
try
{
- CbObject Meta = m_Storage->LoadMetadata();
+ CbObject Meta;
+ {
+ Stopwatch LoadTimer;
+ Meta = m_Storage->LoadMetadata();
+ Stats.LoadMetadataUs = LoadTimer.GetElapsedTimeUs();
+ }
if (!Meta)
{
ZEN_INFO("No dehydrated state for module {} found, cleaning server state: '{}'",
@@ -856,6 +1047,9 @@ namespace hydration_impl {
}
}
+ Stats.TotalFiles = Entries.size();
+ Stats.TotalBytes = TotalSize;
+
ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files",
m_Config.ModuleId,
m_Config.ServerStateDir,
@@ -864,111 +1058,117 @@ namespace hydration_impl {
m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView());
- uint64_t DownloadedBytes = 0;
- uint64_t DownloadedFiles = 0;
-
{
+ Stopwatch DownloadTimer;
ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
for (const Entry& CurrentEntry : Entries)
{
std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath);
CreateDirectories(Path.parent_path());
- m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path);
- DownloadedBytes += CurrentEntry.Size;
- DownloadedFiles++;
+ m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path, Stats.Download);
+ Stats.Download.Files.fetch_add(1, std::memory_order_relaxed);
}
Work.Wait();
+ Stats.Download.ElapsedUs = DownloadTimer.GetElapsedTimeUs();
}
// Downloaded successfully - swap into ServerStateDir
ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
-
- // If the two paths share at least one common component they are on the same drive/volume
- // and atomic renames will succeed. Otherwise fall back to a full copy.
- auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end());
- if (ItTmp != TempDir.begin())
{
- DirectoryContent DirContent;
- GetDirectoryContent(*m_Threading.WorkerPool,
- TempDir,
- DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs,
- DirContent);
+ Stopwatch CleanTimer;
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
+ Stats.CleanUs = CleanTimer.GetElapsedTimeUs();
+ }
- for (const std::filesystem::path& AbsPath : DirContent.Directories)
+ {
+ Stopwatch RenameTimer;
+ // If the two paths share at least one common component they are on the same drive/volume
+ // and atomic renames will succeed. Otherwise fall back to a full copy.
+ auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end());
+ if (ItTmp != TempDir.begin())
{
- std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
- std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest);
- if (Ec)
+ DirectoryContent DirContent;
+ GetDirectoryContent(*m_Threading.WorkerPool,
+ TempDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs,
+ DirContent);
+
+ for (const std::filesystem::path& AbsPath : DirContent.Directories)
{
- throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest));
+ std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
+ std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest));
+ }
}
- }
- for (const std::filesystem::path& AbsPath : DirContent.Files)
- {
- std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
- std::error_code Ec = RenameFileWithRetry(AbsPath, Dest);
- if (Ec)
+ for (const std::filesystem::path& AbsPath : DirContent.Files)
{
- throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest));
+ std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
+ std::error_code Ec = RenameFileWithRetry(AbsPath, Dest);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest));
+ }
}
- }
- ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
- }
- else
- {
- // Slow path: TempDir and ServerStateDir are on different filesystems, so rename
- // would fail. Copy the tree instead and clean up the temp files afterwards.
- ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree");
- CopyTree(TempDir, ServerStateDir, {.EnableClone = true});
- ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ }
+ else
+ {
+ // Slow path: TempDir and ServerStateDir are on different filesystems, so rename
+ // would fail. Copy the tree instead and clean up the temp files afterwards.
+ ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree");
+ CopyTree(TempDir, ServerStateDir, {.EnableClone = true});
+ ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ }
+ Stats.RenameOrCopyUs = RenameTimer.GetElapsedTimeUs();
}
- // TODO: This could perhaps be done more efficently, but ok for now
- DirectoryContent DirContent;
- GetDirectoryContent(*m_Threading.WorkerPool,
- ServerStateDir,
- DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
- DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
- DirContent);
-
- CbObjectWriter HydrateState;
- HydrateState.BeginArray("Files");
- for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
+ CbObject StateObject;
{
- std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
+ Stopwatch VerifyTimer;
+ DirectoryContent DirContent;
+ GetDirectoryContent(*m_Threading.WorkerPool,
+ ServerStateDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
+ DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
+ DirContent);
- if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end())
+ CbObjectWriter HydrateState;
+ HydrateState.BeginArray("Files");
+ for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
{
- HydrateState.BeginObject();
+ std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
+
+ if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end())
+ {
+ HydrateState.BeginObject();
+ {
+ HydrateState << "Path" << RelativePath.generic_string();
+ HydrateState << "Size" << DirContent.FileSizes[FileIndex];
+ HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex];
+ HydrateState << "Hash" << Entries[It->second].Hash;
+ }
+ HydrateState.EndObject();
+ }
+ else
{
- HydrateState << "Path" << RelativePath.generic_string();
- HydrateState << "Size" << DirContent.FileSizes[FileIndex];
- HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex];
- HydrateState << "Hash" << Entries[It->second].Hash;
+ ZEN_ASSERT(false);
}
- HydrateState.EndObject();
- }
- else
- {
- ZEN_ASSERT(false);
}
- }
- HydrateState.EndArray();
+ HydrateState.EndArray();
- CbObject StateObject = HydrateState.Save();
+ StateObject = HydrateState.Save();
+ Stats.VerifyScanUs = VerifyTimer.GetElapsedTimeUs();
+ }
- ZEN_INFO("Hydration of module '{}' complete to folder '{}'. {} ({}) files in {}",
- m_Config.ModuleId,
- m_Config.ServerStateDir,
- DownloadedFiles,
- NiceBytes(DownloadedBytes),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ Stats.TotalUs = TotalTimer.GetElapsedTimeUs();
+ LogHydrateSummary("Hydration complete", Stats, m_Config.ModuleId, StorageSource, ServerStateDir);
return StateObject;
}
@@ -981,6 +1181,8 @@ namespace hydration_impl {
CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ Stats.TotalUs = TotalTimer.GetElapsedTimeUs();
+ LogHydrateSummary("Hydration failed", Stats, m_Config.ModuleId, StorageSource, ServerStateDir);
return {};
}
}
diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp
index 97edc5223..9d477fb10 100644
--- a/src/zenserver/hub/storageserverinstance.cpp
+++ b/src/zenserver/hub/storageserverinstance.cpp
@@ -8,6 +8,8 @@
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/string.h>
+#include <zencore/timer.h>
namespace zen {
@@ -29,6 +31,8 @@ StorageServerInstance::~StorageServerInstance()
void
StorageServerInstance::SpawnServerProcess()
{
+ Stopwatch SpawnTimer;
+
ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId);
m_ServerInstance.ResetDeadProcess();
@@ -84,11 +88,29 @@ StorageServerInstance::SpawnServerProcess()
}
m_ServerInstance.SpawnServerAndWaitUntilReady(m_Config.BasePort, AdditionalOptions.ToView());
- ZEN_DEBUG("Storage server instance for module '{}' started, listening on port {}", m_ModuleId, m_Config.BasePort);
+ ZEN_INFO("Storage server instance for module '{}' started, listening on port {}, spawn took {}",
+ m_ModuleId,
+ m_Config.BasePort,
+ NiceLatencyNs(SpawnTimer.GetElapsedTimeUs() * 1000));
m_ServerInstance.EnableShutdownOnDestroy();
}
+void
+StorageServerInstance::ShutdownServerProcess()
+{
+ if (!m_ServerInstance.IsRunning())
+ {
+ return;
+ }
+ Stopwatch ShutdownTimer;
+ // m_ServerInstance.Shutdown() never throws.
+ m_ServerInstance.Shutdown();
+ ZEN_INFO("Storage server instance for module '{}' shut down, took {}",
+ m_ModuleId,
+ NiceLatencyNs(ShutdownTimer.GetElapsedTimeUs() * 1000));
+}
+
ProcessMetrics
StorageServerInstance::GetProcessMetrics() const
{
@@ -128,11 +150,7 @@ StorageServerInstance::ProvisionLocked()
void
StorageServerInstance::DeprovisionLocked()
{
- if (m_ServerInstance.IsRunning())
- {
- // m_ServerInstance.Shutdown() never throws.
- m_ServerInstance.Shutdown();
- }
+ ShutdownServerProcess();
// Crashed or Hibernated: process already dead; skip Shutdown.
// Dehydrate preserves instance state for future re-provisioning. Failure means saved state
@@ -151,11 +169,7 @@ StorageServerInstance::DeprovisionLocked()
void
StorageServerInstance::ObliterateLocked()
{
- if (m_ServerInstance.IsRunning())
- {
- // m_ServerInstance.Shutdown() never throws.
- m_ServerInstance.Shutdown();
- }
+ ShutdownServerProcess();
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
@@ -168,14 +182,7 @@ void
StorageServerInstance::HibernateLocked()
{
// Signal server to shut down, but keep data around for later wake
-
- if (!m_ServerInstance.IsRunning())
- {
- return;
- }
-
- // m_ServerInstance.Shutdown() never throws.
- m_ServerInstance.Shutdown();
+ ShutdownServerProcess();
}
void
diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h
index c5917afc9..21ac1ada3 100644
--- a/src/zenserver/hub/storageserverinstance.h
+++ b/src/zenserver/hub/storageserverinstance.h
@@ -153,6 +153,7 @@ private:
#endif
void SpawnServerProcess();
+ void ShutdownServerProcess();
void Hydrate();
void Dehydrate();