aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-10 18:44:45 +0100
committerGitHub Enterprise <[email protected]>2026-03-10 18:44:45 +0100
commitda0af02b6f4adf4592168667cd6a68c16030eb87 (patch)
tree61375bdeed179a353c9bf318701968fd2e740894 /src
parentHttpClient using libcurl, Unix Sockets for HTTP. HTTPS support (#770) (diff)
downloadzen-da0af02b6f4adf4592168667cd6a68c16030eb87.tar.xz
zen-da0af02b6f4adf4592168667cd6a68c16030eb87.zip
minor zenstore/blockstore fixes (#821)
- Fix clang-format error accidentally introduced by recent PR - Fix `FileSize()` CAS race that repeatedly invalidated the cache when concurrent callers both missed; remove `store(0)` on CAS failure - Fix `WriteChunks` not accounting for initial alignment padding in `m_TotalSize`, causing drift vs `WriteChunk`'s correct accounting - Fix Create retry sleep computing negative values (100 - N*100 instead of 100 + N*100), matching the Open retry pattern - Fix `~BlockStore` error log missing format placeholder for `Ex.what()` - Fix `GetFreeBlockIndex` infinite loop when all indexes have orphan files on disk but aren't in `m_ChunkBlocks`; bound probe to `m_MaxBlockCount` - Fix `IterateBlock` ignoring `SmallSizeCallback` return value for single out-of-bounds chunks, preventing early termination - Fix `BlockStoreCompactState::IterateBlocks` iterating map by value instead of const reference
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/bench_cmd.cpp108
-rw-r--r--src/zen/cmds/bench_cmd.h10
-rw-r--r--src/zen/zen.cpp2
-rw-r--r--src/zenstore/blockstore.cpp24
-rw-r--r--src/zenstore/include/zenstore/blockstore.h2
5 files changed, 63 insertions, 83 deletions
diff --git a/src/zen/cmds/bench_cmd.cpp b/src/zen/cmds/bench_cmd.cpp
index 658b42da6..908d62257 100644
--- a/src/zen/cmds/bench_cmd.cpp
+++ b/src/zen/cmds/bench_cmd.cpp
@@ -28,12 +28,9 @@ namespace zen {
//////////////////////////////////////////////////////////////////////////
// BenchPurgeSubCmd
-BenchPurgeSubCmd::BenchPurgeSubCmd()
-: ZenSubCmdBase("purge", "Purge standby memory (system cache)")
+BenchPurgeSubCmd::BenchPurgeSubCmd() : ZenSubCmdBase("purge", "Purge standby memory (system cache)")
{
- SubOptions().add_options()("single",
- "Do not spawn child processes",
- cxxopts::value<bool>(m_SingleProcess)->default_value("false"));
+ SubOptions().add_options()("single", "Do not spawn child processes", cxxopts::value<bool>(m_SingleProcess)->default_value("false"));
}
void
@@ -104,17 +101,10 @@ BenchPurgeSubCmd::Run(const ZenCliOptions& GlobalOptions)
//////////////////////////////////////////////////////////////////////////
// BenchHttpSubCmd
-
-BenchHttpSubCmd::BenchHttpSubCmd()
-: ZenSubCmdBase("http", "Benchmark an HTTP server")
+BenchHttpSubCmd::BenchHttpSubCmd() : ZenSubCmdBase("http", "Benchmark an HTTP server")
{
SubOptions().add_option("", "u", "url", "URL to benchmark", cxxopts::value<std::string>(m_Url), "<url>");
- SubOptions().add_option("",
- "n",
- "count",
- "Number of requests to send",
- cxxopts::value<int>(m_Count)->default_value("100"),
- "<count>");
+ SubOptions().add_option("", "n", "count", "Number of requests to send", cxxopts::value<int>(m_Count)->default_value("100"), "<count>");
SubOptions().add_option("",
"c",
"concurrency",
@@ -127,12 +117,8 @@ BenchHttpSubCmd::BenchHttpSubCmd()
"HTTP method to use (GET, HEAD)",
cxxopts::value<std::string>(m_Method)->default_value("GET"),
"<method>");
- SubOptions().add_option("",
- "",
- "unix-socket",
- "Unix domain socket path (overrides TCP)",
- cxxopts::value<std::string>(m_SocketPath),
- "<path>");
+ SubOptions()
+ .add_option("", "", "unix-socket", "Unix domain socket path (overrides TCP)", cxxopts::value<std::string>(m_SocketPath), "<path>");
SubOptions().add_options()("no-keepalive",
"Close connection after each request (disables keep-alive)",
cxxopts::value<bool>(m_NoKeepAlive)->default_value("false"));
@@ -179,8 +165,7 @@ BenchHttpSubCmd::Run(const ZenCliOptions& GlobalOptions)
if (m_Method != "GET" && m_Method != "HEAD")
{
- throw OptionParseException(fmt::format("Unsupported HTTP method '{}'. Supported: GET, HEAD", m_Method),
- SubOptions().help());
+ throw OptionParseException(fmt::format("Unsupported HTTP method '{}'. Supported: GET, HEAD", m_Method), SubOptions().help());
}
auto [BaseUri, Path] = SplitUrl(m_Url);
@@ -201,16 +186,16 @@ BenchHttpSubCmd::Run(const ZenCliOptions& GlobalOptions)
// (non-zenserver, timeout, unreachable) is silently ignored.
try
{
- HttpClientSettings ProbeSettings{.ConnectTimeout = std::chrono::milliseconds(2000),
- .Timeout = std::chrono::milliseconds(2000),
- .UnixSocketPath = m_SocketPath};
- HttpClient ProbeHttp(BaseUri, ProbeSettings);
+ HttpClientSettings ProbeSettings{.ConnectTimeout = std::chrono::milliseconds(2000),
+ .Timeout = std::chrono::milliseconds(2000),
+ .UnixSocketPath = m_SocketPath};
+ HttpClient ProbeHttp(BaseUri, ProbeSettings);
HttpClient::Response ProbeResp = ProbeHttp.Get("/health/info");
if (ProbeResp.IsSuccess())
{
- CbObject Info = ProbeResp.AsObject();
- std::string_view BuildVersion = Info["BuildVersion"].AsString();
+ CbObject Info = ProbeResp.AsObject();
+ std::string_view BuildVersion = Info["BuildVersion"].AsString();
if (!BuildVersion.empty())
{
@@ -222,15 +207,15 @@ BenchHttpSubCmd::Run(const ZenCliOptions& GlobalOptions)
std::string_view OS = Info["OS"].AsString();
std::string_view Arch = Info["Arch"].AsString();
- CbObjectView System = Info["System"].AsObjectView();
- int64_t LpCount = System["lp_count"].AsInt64();
- int64_t TotalMemMiB = System["total_memory_mb"].AsInt64();
+ CbObjectView System = Info["System"].AsObjectView();
+ int64_t LpCount = System["lp_count"].AsInt64();
+ int64_t TotalMemMiB = System["total_memory_mb"].AsInt64();
ZEN_CONSOLE(" : {}, {}, {} logical processors, {} RAM",
- OS,
- Arch,
- LpCount,
- NiceBytes(static_cast<uint64_t>(TotalMemMiB) * 1024 * 1024));
+ OS,
+ Arch,
+ LpCount,
+ NiceBytes(static_cast<uint64_t>(TotalMemMiB) * 1024 * 1024));
}
}
}
@@ -251,8 +236,8 @@ BenchHttpSubCmd::Run(const ZenCliOptions& GlobalOptions)
void
BenchHttpSubCmd::RunFixedCount(const std::string& BaseUri, const std::string& Path)
{
- std::atomic<int> NextRequest{0};
- std::vector<double> AllLatencies;
+ std::atomic<int> NextRequest{0};
+ std::vector<double> AllLatencies;
AllLatencies.reserve(m_Count);
std::mutex LatencyMutex;
std::atomic<int> ErrorCount{0};
@@ -264,8 +249,7 @@ BenchHttpSubCmd::RunFixedCount(const std::string& BaseUri, const std::string& Pa
auto WorkerFn = [&]() {
std::vector<double> LocalLatencies;
- HttpClientSettings Settings{.UnixSocketPath = m_SocketPath,
- .ForbidReuseConnection = m_NoKeepAlive};
+ HttpClientSettings Settings{.UnixSocketPath = m_SocketPath, .ForbidReuseConnection = m_NoKeepAlive};
HttpClient Http(BaseUri, Settings);
while (true)
@@ -346,17 +330,17 @@ BenchHttpSubCmd::RunFixedCount(const std::string& BaseUri, const std::string& Pa
ZEN_CONSOLE(" Requests : {:L} total, {:L} success, {:L} errors", TotalCount, SuccessCount, ErrorCount.load());
ZEN_CONSOLE(" Latency : min={:.1f}ms mean={:.1f}ms p50={:.1f}ms p95={:.1f}ms p99={:.1f}ms max={:.1f}ms",
- PercentileMs(0),
- MeanMs,
- PercentileMs(50),
- PercentileMs(95),
- PercentileMs(99),
- PercentileMs(100));
+ PercentileMs(0),
+ MeanMs,
+ PercentileMs(50),
+ PercentileMs(95),
+ PercentileMs(99),
+ PercentileMs(100));
ZEN_CONSOLE(" Throughput: {:.1f} req/s down: {}/s up: {}/s (elapsed: {:.2f}s)",
- Rps,
- NiceBytes(DownBytesPerSec),
- NiceBytes(UpBytesPerSec),
- TotalSeconds);
+ Rps,
+ NiceBytes(DownBytesPerSec),
+ NiceBytes(UpBytesPerSec),
+ TotalSeconds);
}
void
@@ -380,8 +364,7 @@ BenchHttpSubCmd::RunContinuous(const std::string& BaseUri, const std::string& Pa
Stopwatch RunTimer;
auto WorkerFn = [&]() {
- HttpClientSettings Settings{.UnixSocketPath = m_SocketPath,
- .ForbidReuseConnection = m_NoKeepAlive};
+ HttpClientSettings Settings{.UnixSocketPath = m_SocketPath, .ForbidReuseConnection = m_NoKeepAlive};
HttpClient Http(BaseUri, Settings);
while (!s_BenchAbort.load(std::memory_order_relaxed))
@@ -435,11 +418,11 @@ BenchHttpSubCmd::RunContinuous(const std::string& BaseUri, const std::string& Pa
int64_t UpBytes = IntervalUploadBytes.exchange(0);
// Snapshot and reset latency histogram
- uint64_t HistCount = LatencyHistogram.Count();
- int64_t HistMin = LatencyHistogram.Min();
- int64_t HistMax = LatencyHistogram.Max();
- double HistMean = LatencyHistogram.Mean();
- metrics::SampleSnapshot Snap = LatencyHistogram.Snapshot();
+ uint64_t HistCount = LatencyHistogram.Count();
+ int64_t HistMin = LatencyHistogram.Min();
+ int64_t HistMax = LatencyHistogram.Max();
+ double HistMean = LatencyHistogram.Mean();
+ metrics::SampleSnapshot Snap = LatencyHistogram.Snapshot();
LatencyHistogram.Clear();
// Format elapsed as HH:MM:SS
@@ -451,7 +434,8 @@ BenchHttpSubCmd::RunContinuous(const std::string& BaseUri, const std::string& Pa
if (HistCount > 0)
{
ZEN_CONSOLE(
- "[{:02d}:{:02d}:{:02d}] req/s: {:L} errors: {:L} lat(ms): min={:.1f} mean={:.1f} p95={:.1f} p99={:.1f} max={:.1f} down: {}/s up: {}/s",
+ "[{:02d}:{:02d}:{:02d}] req/s: {:L} errors: {:L} lat(ms): min={:.1f} mean={:.1f} p95={:.1f} p99={:.1f} max={:.1f} "
+ "down: {}/s up: {}/s",
Hours,
Minutes,
Secs,
@@ -467,11 +451,7 @@ BenchHttpSubCmd::RunContinuous(const std::string& BaseUri, const std::string& Pa
}
else
{
- ZEN_CONSOLE("[{:02d}:{:02d}:{:02d}] req/s: 0 errors: {:L} (no successful requests)",
- Hours,
- Minutes,
- Secs,
- Errors);
+ ZEN_CONSOLE("[{:02d}:{:02d}:{:02d}] req/s: 0 errors: {:L} (no successful requests)", Hours, Minutes, Secs, Errors);
}
}
};
@@ -493,8 +473,8 @@ BenchHttpSubCmd::RunContinuous(const std::string& BaseUri, const std::string& Pa
std::signal(SIGINT, PrevSigInt);
std::signal(SIGTERM, PrevSigTerm);
- double TotalSeconds = RunTimer.GetElapsedTimeMs() / 1000.0;
- int64_t TotalCount = TotalSuccessCount.load() + TotalErrorCount.load();
+ double TotalSeconds = RunTimer.GetElapsedTimeMs() / 1000.0;
+ int64_t TotalCount = TotalSuccessCount.load() + TotalErrorCount.load();
uint64_t DownPerSec = TotalSeconds > 0.0 ? static_cast<uint64_t>(TotalDownloadBytes.load() / TotalSeconds) : 0;
uint64_t UpPerSec = TotalSeconds > 0.0 ? static_cast<uint64_t>(TotalUploadBytes.load() / TotalSeconds) : 0;
diff --git a/src/zen/cmds/bench_cmd.h b/src/zen/cmds/bench_cmd.h
index f332b3fcc..6700ee410 100644
--- a/src/zen/cmds/bench_cmd.h
+++ b/src/zen/cmds/bench_cmd.h
@@ -28,11 +28,11 @@ private:
std::string m_Url;
std::string m_SocketPath;
- int m_Count = 100;
- int m_Concurrency = 1;
- std::string m_Method = "GET";
- bool m_NoKeepAlive = false;
- bool m_Continuous = false;
+ int m_Count = 100;
+ int m_Concurrency = 1;
+ std::string m_Method = "GET";
+ bool m_NoKeepAlive = false;
+ bool m_Continuous = false;
};
class BenchCommand : public ZenCmdWithSubCommands
diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp
index 86c29344e..7b1b6e7d7 100644
--- a/src/zen/zen.cpp
+++ b/src/zen/zen.cpp
@@ -296,7 +296,7 @@ ZenCmdWithSubCommands::Run(const ZenCliOptions& GlobalOptions, int argc, char**
PrintHelp();
ExtendableStringBuilder<128> VerbList;
- for (bool First = true; ZenSubCmdBase* SubCmd : m_SubCommands)
+ for (bool First = true; ZenSubCmdBase * SubCmd : m_SubCommands)
{
if (!First)
{
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 6197c7f24..6528fcb2f 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -98,7 +98,7 @@ BlockStoreFile::Create(uint64_t InitialSize)
return false;
}
ZEN_WARN("Failed to create cas block '{}', reason: '{}', retries left: {}.", m_Path, Ec.message(), RetriesLeft);
- Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms
+ Sleep(100 + (3 - RetriesLeft) * 100); // Total 600 ms
RetriesLeft--;
return true;
});
@@ -125,11 +125,7 @@ BlockStoreFile::FileSize() const
return 0;
}
uint64_t Expected = 0;
- if (!m_CachedFileSize.compare_exchange_strong(Expected, Size))
- {
- // Force a new check next time file size is fetched
- m_CachedFileSize.store(0);
- }
+ m_CachedFileSize.compare_exchange_strong(Expected, Size);
return Size;
}
return CachedSize;
@@ -388,7 +384,7 @@ BlockStore::~BlockStore()
}
catch (const std::exception& Ex)
{
- ZEN_ERROR("~BlockStore() failed with: ", Ex.what());
+ ZEN_ERROR("~BlockStore() failed with: {}", Ex.what());
}
}
@@ -624,7 +620,7 @@ BlockStore::GetFreeBlockIndex(uint32_t ProbeIndex, RwLock::ExclusiveLockScope&,
return (uint32_t)m_MaxBlockCount;
}
- while (true)
+ for (uint64_t ProbeCount = 0; ProbeCount < m_MaxBlockCount; ++ProbeCount)
{
if (!m_ChunkBlocks.contains(ProbeIndex))
{
@@ -645,7 +641,7 @@ BlockStore::GetFreeBlockIndex(uint32_t ProbeIndex, RwLock::ExclusiveLockScope&,
}
ProbeIndex = (ProbeIndex + 1) & (m_MaxBlockCount - 1);
}
- return ProbeIndex;
+ return (uint32_t)m_MaxBlockCount;
}
void
@@ -808,6 +804,7 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con
Count++;
RangeSize = NextRangeSize;
}
+ uint32_t AlignmentPadding = AlignedInsertOffset - m_CurrentInsertOffset;
m_CurrentInsertOffset = AlignedInsertOffset + RangeSize;
Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
AddActiveWriteBlock(InsertLock, WriteBlockIndex);
@@ -829,13 +826,13 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con
WriteBuffer.MidInline(RoundUp(SourceBuffer.GetSize(), Alignment));
}
WriteBlock->Write(Buffer.data(), RangeSize, AlignedInsertOffset);
- m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed);
+ m_TotalSize.fetch_add(AlignmentPadding + RangeSize, std::memory_order::relaxed);
}
else
{
MemoryView SourceBuffer = Datas[Offset];
WriteBlock->Write(SourceBuffer.GetData(), SourceBuffer.GetSize(), AlignedInsertOffset);
- m_TotalSize.fetch_add(SourceBuffer.GetSize(), std::memory_order::relaxed);
+ m_TotalSize.fetch_add(AlignmentPadding + SourceBuffer.GetSize(), std::memory_order::relaxed);
}
uint32_t ChunkOffset = AlignedInsertOffset;
@@ -1087,7 +1084,10 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
BlockIndex,
BlockSize);
- SmallSizeCallback(ChunkIndex, nullptr, 0);
+ if (!SmallSizeCallback(ChunkIndex, nullptr, 0))
+ {
+ return false;
+ }
LocationIndexOffset++;
continue;
}
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index bc9b5da40..234964f5c 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -262,7 +262,7 @@ public:
{
std::vector<uint32_t> BlockOrder;
BlockOrder.reserve(m_BlockIndexToChunkMapIndex.size());
- for (auto It : m_BlockIndexToChunkMapIndex)
+ for (const auto& It : m_BlockIndexToChunkMapIndex)
{
BlockOrder.push_back(It.first);
}