diff options
| author | Stefan Boberg <[email protected]> | 2026-03-10 18:44:45 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-10 18:44:45 +0100 |
| commit | da0af02b6f4adf4592168667cd6a68c16030eb87 (patch) | |
| tree | 61375bdeed179a353c9bf318701968fd2e740894 /src | |
| parent | HttpClient using libcurl, Unix Sockets for HTTP. HTTPS support (#770) (diff) | |
| download | zen-da0af02b6f4adf4592168667cd6a68c16030eb87.tar.xz zen-da0af02b6f4adf4592168667cd6a68c16030eb87.zip | |
minor zenstore/blockstore fixes (#821)
- Fix clang-format error accidentally introduced by recent PR
- Fix `FileSize()` CAS race that repeatedly invalidated the cache when concurrent callers both missed; remove `store(0)` on CAS failure
- Fix `WriteChunks` not accounting for initial alignment padding in `m_TotalSize`, causing drift vs `WriteChunk`'s correct accounting
- Fix Create retry sleep computing negative values (100 - N*100 instead of 100 + N*100), matching the Open retry pattern
- Fix `~BlockStore` error log missing format placeholder for `Ex.what()`
- Fix `GetFreeBlockIndex` infinite loop when all indexes have orphan files on disk but aren't in `m_ChunkBlocks`; bound probe to `m_MaxBlockCount`
- Fix `IterateBlock` ignoring `SmallSizeCallback` return value for single out-of-bounds chunks, preventing early termination
- Fix `BlockStoreCompactState::IterateBlocks` iterating map by value instead of const reference
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/bench_cmd.cpp | 108 | ||||
| -rw-r--r-- | src/zen/cmds/bench_cmd.h | 10 | ||||
| -rw-r--r-- | src/zen/zen.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 24 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 2 |
5 files changed, 63 insertions, 83 deletions
diff --git a/src/zen/cmds/bench_cmd.cpp b/src/zen/cmds/bench_cmd.cpp index 658b42da6..908d62257 100644 --- a/src/zen/cmds/bench_cmd.cpp +++ b/src/zen/cmds/bench_cmd.cpp @@ -28,12 +28,9 @@ namespace zen { ////////////////////////////////////////////////////////////////////////// // BenchPurgeSubCmd -BenchPurgeSubCmd::BenchPurgeSubCmd() -: ZenSubCmdBase("purge", "Purge standby memory (system cache)") +BenchPurgeSubCmd::BenchPurgeSubCmd() : ZenSubCmdBase("purge", "Purge standby memory (system cache)") { - SubOptions().add_options()("single", - "Do not spawn child processes", - cxxopts::value<bool>(m_SingleProcess)->default_value("false")); + SubOptions().add_options()("single", "Do not spawn child processes", cxxopts::value<bool>(m_SingleProcess)->default_value("false")); } void @@ -104,17 +101,10 @@ BenchPurgeSubCmd::Run(const ZenCliOptions& GlobalOptions) ////////////////////////////////////////////////////////////////////////// // BenchHttpSubCmd - -BenchHttpSubCmd::BenchHttpSubCmd() -: ZenSubCmdBase("http", "Benchmark an HTTP server") +BenchHttpSubCmd::BenchHttpSubCmd() : ZenSubCmdBase("http", "Benchmark an HTTP server") { SubOptions().add_option("", "u", "url", "URL to benchmark", cxxopts::value<std::string>(m_Url), "<url>"); - SubOptions().add_option("", - "n", - "count", - "Number of requests to send", - cxxopts::value<int>(m_Count)->default_value("100"), - "<count>"); + SubOptions().add_option("", "n", "count", "Number of requests to send", cxxopts::value<int>(m_Count)->default_value("100"), "<count>"); SubOptions().add_option("", "c", "concurrency", @@ -127,12 +117,8 @@ BenchHttpSubCmd::BenchHttpSubCmd() "HTTP method to use (GET, HEAD)", cxxopts::value<std::string>(m_Method)->default_value("GET"), "<method>"); - SubOptions().add_option("", - "", - "unix-socket", - "Unix domain socket path (overrides TCP)", - cxxopts::value<std::string>(m_SocketPath), - "<path>"); + SubOptions() + .add_option("", "", "unix-socket", "Unix domain socket path (overrides TCP)", cxxopts::value<std::string>(m_SocketPath), "<path>"); SubOptions().add_options()("no-keepalive", "Close connection after each request (disables keep-alive)", cxxopts::value<bool>(m_NoKeepAlive)->default_value("false")); @@ -179,8 +165,7 @@ BenchHttpSubCmd::Run(const ZenCliOptions& GlobalOptions) if (m_Method != "GET" && m_Method != "HEAD") { - throw OptionParseException(fmt::format("Unsupported HTTP method '{}'. Supported: GET, HEAD", m_Method), - SubOptions().help()); + throw OptionParseException(fmt::format("Unsupported HTTP method '{}'. Supported: GET, HEAD", m_Method), SubOptions().help()); } auto [BaseUri, Path] = SplitUrl(m_Url); @@ -201,16 +186,16 @@ BenchHttpSubCmd::Run(const ZenCliOptions& GlobalOptions) // (non-zenserver, timeout, unreachable) is silently ignored. try { - HttpClientSettings ProbeSettings{.ConnectTimeout = std::chrono::milliseconds(2000), - .Timeout = std::chrono::milliseconds(2000), - .UnixSocketPath = m_SocketPath}; - HttpClient ProbeHttp(BaseUri, ProbeSettings); + HttpClientSettings ProbeSettings{.ConnectTimeout = std::chrono::milliseconds(2000), + .Timeout = std::chrono::milliseconds(2000), + .UnixSocketPath = m_SocketPath}; + HttpClient ProbeHttp(BaseUri, ProbeSettings); HttpClient::Response ProbeResp = ProbeHttp.Get("/health/info"); if (ProbeResp.IsSuccess()) { - CbObject Info = ProbeResp.AsObject(); - std::string_view BuildVersion = Info["BuildVersion"].AsString(); + CbObject Info = ProbeResp.AsObject(); + std::string_view BuildVersion = Info["BuildVersion"].AsString(); if (!BuildVersion.empty()) { @@ -222,15 +207,15 @@ BenchHttpSubCmd::Run(const ZenCliOptions& GlobalOptions) std::string_view OS = Info["OS"].AsString(); std::string_view Arch = Info["Arch"].AsString(); - CbObjectView System = Info["System"].AsObjectView(); - int64_t LpCount = System["lp_count"].AsInt64(); - int64_t TotalMemMiB = System["total_memory_mb"].AsInt64(); + CbObjectView System = Info["System"].AsObjectView(); + int64_t LpCount = System["lp_count"].AsInt64(); + int64_t TotalMemMiB = System["total_memory_mb"].AsInt64(); ZEN_CONSOLE(" : {}, {}, {} logical processors, {} RAM", - OS, - Arch, - LpCount, - NiceBytes(static_cast<uint64_t>(TotalMemMiB) * 1024 * 1024)); + OS, + Arch, + LpCount, + NiceBytes(static_cast<uint64_t>(TotalMemMiB) * 1024 * 1024)); } } } @@ -251,8 +236,8 @@ BenchHttpSubCmd::Run(const ZenCliOptions& GlobalOptions) void BenchHttpSubCmd::RunFixedCount(const std::string& BaseUri, const std::string& Path) { - std::atomic<int> NextRequest{0}; - std::vector<double> AllLatencies; + std::atomic<int> NextRequest{0}; + std::vector<double> AllLatencies; AllLatencies.reserve(m_Count); std::mutex LatencyMutex; std::atomic<int> ErrorCount{0}; @@ -264,8 +249,7 @@ BenchHttpSubCmd::RunFixedCount(const std::string& BaseUri, const std::string& Pa auto WorkerFn = [&]() { std::vector<double> LocalLatencies; - HttpClientSettings Settings{.UnixSocketPath = m_SocketPath, - .ForbidReuseConnection = m_NoKeepAlive}; + HttpClientSettings Settings{.UnixSocketPath = m_SocketPath, .ForbidReuseConnection = m_NoKeepAlive}; HttpClient Http(BaseUri, Settings); while (true) @@ -346,17 +330,17 @@ BenchHttpSubCmd::RunFixedCount(const std::string& BaseUri, const std::string& Pa ZEN_CONSOLE(" Requests : {:L} total, {:L} success, {:L} errors", TotalCount, SuccessCount, ErrorCount.load()); ZEN_CONSOLE(" Latency : min={:.1f}ms mean={:.1f}ms p50={:.1f}ms p95={:.1f}ms p99={:.1f}ms max={:.1f}ms", - PercentileMs(0), - MeanMs, - PercentileMs(50), - PercentileMs(95), - PercentileMs(99), - PercentileMs(100)); + PercentileMs(0), + MeanMs, + PercentileMs(50), + PercentileMs(95), + PercentileMs(99), + PercentileMs(100)); ZEN_CONSOLE(" Throughput: {:.1f} req/s down: {}/s up: {}/s (elapsed: {:.2f}s)", - Rps, - NiceBytes(DownBytesPerSec), - NiceBytes(UpBytesPerSec), - TotalSeconds); + Rps, + NiceBytes(DownBytesPerSec), + NiceBytes(UpBytesPerSec), + TotalSeconds); } void @@ -380,8 +364,7 @@ BenchHttpSubCmd::RunContinuous(const std::string& BaseUri, const std::string& Pa Stopwatch RunTimer; auto WorkerFn = [&]() { - HttpClientSettings Settings{.UnixSocketPath = m_SocketPath, - .ForbidReuseConnection = m_NoKeepAlive}; + HttpClientSettings Settings{.UnixSocketPath = m_SocketPath, .ForbidReuseConnection = m_NoKeepAlive}; HttpClient Http(BaseUri, Settings); while (!s_BenchAbort.load(std::memory_order_relaxed)) @@ -435,11 +418,11 @@ BenchHttpSubCmd::RunContinuous(const std::string& BaseUri, const std::string& Pa int64_t UpBytes = IntervalUploadBytes.exchange(0); // Snapshot and reset latency histogram - uint64_t HistCount = LatencyHistogram.Count(); - int64_t HistMin = LatencyHistogram.Min(); - int64_t HistMax = LatencyHistogram.Max(); - double HistMean = LatencyHistogram.Mean(); - metrics::SampleSnapshot Snap = LatencyHistogram.Snapshot(); + uint64_t HistCount = LatencyHistogram.Count(); + int64_t HistMin = LatencyHistogram.Min(); + int64_t HistMax = LatencyHistogram.Max(); + double HistMean = LatencyHistogram.Mean(); + metrics::SampleSnapshot Snap = LatencyHistogram.Snapshot(); LatencyHistogram.Clear(); // Format elapsed as HH:MM:SS @@ -451,7 +434,8 @@ BenchHttpSubCmd::RunContinuous(const std::string& BaseUri, const std::string& Pa if (HistCount > 0) { ZEN_CONSOLE( - "[{:02d}:{:02d}:{:02d}] req/s: {:L} errors: {:L} lat(ms): min={:.1f} mean={:.1f} p95={:.1f} p99={:.1f} max={:.1f} down: {}/s up: {}/s", + "[{:02d}:{:02d}:{:02d}] req/s: {:L} errors: {:L} lat(ms): min={:.1f} mean={:.1f} p95={:.1f} p99={:.1f} max={:.1f} " + "down: {}/s up: {}/s", Hours, Minutes, Secs, @@ -467,11 +451,7 @@ BenchHttpSubCmd::RunContinuous(const std::string& BaseUri, const std::string& Pa } else { - ZEN_CONSOLE("[{:02d}:{:02d}:{:02d}] req/s: 0 errors: {:L} (no successful requests)", - Hours, - Minutes, - Secs, - Errors); + ZEN_CONSOLE("[{:02d}:{:02d}:{:02d}] req/s: 0 errors: {:L} (no successful requests)", Hours, Minutes, Secs, Errors); } } }; @@ -493,8 +473,8 @@ BenchHttpSubCmd::RunContinuous(const std::string& BaseUri, const std::string& Pa std::signal(SIGINT, PrevSigInt); std::signal(SIGTERM, PrevSigTerm); - double TotalSeconds = RunTimer.GetElapsedTimeMs() / 1000.0; - int64_t TotalCount = TotalSuccessCount.load() + TotalErrorCount.load(); + double TotalSeconds = RunTimer.GetElapsedTimeMs() / 1000.0; + int64_t TotalCount = TotalSuccessCount.load() + TotalErrorCount.load(); uint64_t DownPerSec = TotalSeconds > 0.0 ? static_cast<uint64_t>(TotalDownloadBytes.load() / TotalSeconds) : 0; uint64_t UpPerSec = TotalSeconds > 0.0 ? static_cast<uint64_t>(TotalUploadBytes.load() / TotalSeconds) : 0; diff --git a/src/zen/cmds/bench_cmd.h b/src/zen/cmds/bench_cmd.h index f332b3fcc..6700ee410 100644 --- a/src/zen/cmds/bench_cmd.h +++ b/src/zen/cmds/bench_cmd.h @@ -28,11 +28,11 @@ private: std::string m_Url; std::string m_SocketPath; - int m_Count = 100; - int m_Concurrency = 1; - std::string m_Method = "GET"; - bool m_NoKeepAlive = false; - bool m_Continuous = false; + int m_Count = 100; + int m_Concurrency = 1; + std::string m_Method = "GET"; + bool m_NoKeepAlive = false; + bool m_Continuous = false; }; class BenchCommand : public ZenCmdWithSubCommands diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index 86c29344e..7b1b6e7d7 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -296,7 +296,7 @@ ZenCmdWithSubCommands::Run(const ZenCliOptions& GlobalOptions, int argc, char** PrintHelp(); ExtendableStringBuilder<128> VerbList; - for (bool First = true; ZenSubCmdBase* SubCmd : m_SubCommands) + for (bool First = true; ZenSubCmdBase * SubCmd : m_SubCommands) { if (!First) { diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 6197c7f24..6528fcb2f 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -98,7 +98,7 @@ BlockStoreFile::Create(uint64_t InitialSize) return false; } ZEN_WARN("Failed to create cas block '{}', reason: '{}', retries left: {}.", m_Path, Ec.message(), RetriesLeft); - Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms + Sleep(100 + (3 - RetriesLeft) * 100); // Total 600 ms RetriesLeft--; return true; }); @@ -125,11 +125,7 @@ BlockStoreFile::FileSize() const return 0; } uint64_t Expected = 0; - if (!m_CachedFileSize.compare_exchange_strong(Expected, Size)) - { - // Force a new check next time file size is fetched - m_CachedFileSize.store(0); - } + m_CachedFileSize.compare_exchange_strong(Expected, Size); return Size; } return CachedSize; @@ -388,7 +384,7 @@ BlockStore::~BlockStore() } catch (const std::exception& Ex) { - ZEN_ERROR("~BlockStore() failed with: ", Ex.what()); + ZEN_ERROR("~BlockStore() failed with: {}", Ex.what()); } } @@ -624,7 +620,7 @@ BlockStore::GetFreeBlockIndex(uint32_t ProbeIndex, RwLock::ExclusiveLockScope&, return (uint32_t)m_MaxBlockCount; } - while (true) + for (uint64_t ProbeCount = 0; ProbeCount < m_MaxBlockCount; ++ProbeCount) { if (!m_ChunkBlocks.contains(ProbeIndex)) { @@ -645,7 +641,7 @@ BlockStore::GetFreeBlockIndex(uint32_t ProbeIndex, RwLock::ExclusiveLockScope&, } ProbeIndex = (ProbeIndex + 1) & (m_MaxBlockCount - 1); } - return ProbeIndex; + return (uint32_t)m_MaxBlockCount; } void @@ -808,6 +804,7 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con Count++; RangeSize = NextRangeSize; } + uint32_t AlignmentPadding = AlignedInsertOffset - m_CurrentInsertOffset; m_CurrentInsertOffset = AlignedInsertOffset + RangeSize; Ref<BlockStoreFile> WriteBlock = m_WriteBlock; AddActiveWriteBlock(InsertLock, WriteBlockIndex); @@ -829,13 +826,13 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con WriteBuffer.MidInline(RoundUp(SourceBuffer.GetSize(), Alignment)); } WriteBlock->Write(Buffer.data(), RangeSize, AlignedInsertOffset); - m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed); + m_TotalSize.fetch_add(AlignmentPadding + RangeSize, std::memory_order::relaxed); } else { MemoryView SourceBuffer = Datas[Offset]; WriteBlock->Write(SourceBuffer.GetData(), SourceBuffer.GetSize(), AlignedInsertOffset); - m_TotalSize.fetch_add(SourceBuffer.GetSize(), std::memory_order::relaxed); + m_TotalSize.fetch_add(AlignmentPadding + SourceBuffer.GetSize(), std::memory_order::relaxed); } uint32_t ChunkOffset = AlignedInsertOffset; @@ -1087,7 +1084,10 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, BlockIndex, BlockSize); - SmallSizeCallback(ChunkIndex, nullptr, 0); + if (!SmallSizeCallback(ChunkIndex, nullptr, 0)) + { + return false; + } LocationIndexOffset++; continue; } diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index bc9b5da40..234964f5c 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -262,7 +262,7 @@ public: { std::vector<uint32_t> BlockOrder; BlockOrder.reserve(m_BlockIndexToChunkMapIndex.size()); - for (auto It : m_BlockIndexToChunkMapIndex) + for (const auto& It : m_BlockIndexToChunkMapIndex) { BlockOrder.push_back(It.first); } |