aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-19 20:29:35 +0100
committerGitHub Enterprise <[email protected]>2026-03-19 20:29:35 +0100
commit43fb80586043da50f81e2e935afb2fbc6f55ec24 (patch)
tree2fbdeee593de2437b6a5f8cc464d4f550eb30f97 /src
parentadd --hub-hydration-target-spec to zen hub (#867) (diff)
downloadzen-43fb80586043da50f81e2e935afb2fbc6f55ec24.tar.xz
zen-43fb80586043da50f81e2e935afb2fbc6f55ec24.zip
Zen disk benchmark utility (#868)HEADmain
This PR adds a `zen bench disk` subcommand to help with gathering user performance metrics It also contains a fix for `xmake precommit`, the task now probes to find the most appropriate way to launch pre-commit
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/bench_cmd.cpp1381
-rw-r--r--src/zen/cmds/bench_cmd.h43
2 files changed, 1424 insertions, 0 deletions
diff --git a/src/zen/cmds/bench_cmd.cpp b/src/zen/cmds/bench_cmd.cpp
index 908d62257..b1639105a 100644
--- a/src/zen/cmds/bench_cmd.cpp
+++ b/src/zen/cmds/bench_cmd.cpp
@@ -3,6 +3,7 @@
#include "bench_cmd.h"
#include "bench.h"
+#include <zencore/basicfile.h>
#include <zencore/compactbinary.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
@@ -15,10 +16,22 @@
#include <zenhttp/httpclient.h>
#include <zentelemetry/stats.h>
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+#else
+# include <errno.h>
+# include <fcntl.h>
+# include <sys/stat.h>
+# include <unistd.h>
+#endif
+
#include <algorithm>
#include <atomic>
+#include <chrono>
#include <csignal>
+#include <cstring>
#include <mutex>
+#include <random>
#include <thread>
static std::atomic<bool> s_BenchAbort{false};
@@ -488,6 +501,1373 @@ BenchHttpSubCmd::RunContinuous(const std::string& BaseUri, const std::string& Pa
}
//////////////////////////////////////////////////////////////////////////
+// DirectIoFile - thin wrapper around platform file handles for unbuffered I/O.
+// On Linux: opens with O_DIRECT, falls back gracefully if the filesystem
+// doesn't support it (e.g. tmpfs). On macOS: uses F_NOCACHE. On Windows:
+// opens with FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH.
+// Buffers, offsets and transfer sizes must all be multiples of kDirectIoAlignment.
+
+static constexpr uint64_t kDirectIoAlignment = 4096;
+
+static uint8_t*
+AlignBuffer(uint8_t* Raw, uint64_t Alignment)
+{
+ return reinterpret_cast<uint8_t*>((reinterpret_cast<uintptr_t>(Raw) + Alignment - 1) & ~(Alignment - 1));
+}
+
+static uint64_t
+AlignUp(uint64_t Value, uint64_t Alignment)
+{
+ return (Value + Alignment - 1) & ~(Alignment - 1);
+}
+
+class DirectIoFile
+{
+public:
+ DirectIoFile() = default;
+ ~DirectIoFile() { Close(); }
+ DirectIoFile(const DirectIoFile&) = delete;
+ DirectIoFile& operator=(const DirectIoFile&) = delete;
+
+ // Returns true if the file was successfully opened with direct/unbuffered I/O active.
+ // Returns false if direct I/O was requested but the filesystem doesn't support it and
+ // we fell back to buffered I/O. Throws std::system_error if the file cannot be opened.
+ bool OpenWrite(const std::filesystem::path& Path, bool RequestDirect); // create/truncate
+ bool OpenRead(const std::filesystem::path& Path, bool RequestDirect); // read-only, existing
+ bool OpenReadWrite(const std::filesystem::path& Path, bool RequestDirect); // read-write, existing, no truncate
+
+ void WriteAt(const void* Data, uint64_t Size, uint64_t Offset);
+ void ReadAt(void* Data, uint64_t Size, uint64_t Offset);
+ void Close();
+
+private:
+#if ZEN_PLATFORM_WINDOWS
+ HANDLE m_Handle = INVALID_HANDLE_VALUE;
+#else
+ int m_Fd = -1;
+#endif
+};
+
+#if ZEN_PLATFORM_WINDOWS
+
+bool
+DirectIoFile::OpenWrite(const std::filesystem::path& Path, bool RequestDirect)
+{
+ DWORD Flags = FILE_ATTRIBUTE_NORMAL;
+ if (RequestDirect)
+ {
+ Flags |= FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH;
+ }
+
+ HANDLE Handle = ::CreateFileW(Path.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, nullptr, CREATE_ALWAYS, Flags, nullptr);
+
+ if (Handle == INVALID_HANDLE_VALUE && RequestDirect)
+ {
+ // Fall back to buffered I/O
+ Handle = ::CreateFileW(Path.c_str(),
+ GENERIC_READ | GENERIC_WRITE,
+ FILE_SHARE_READ,
+ nullptr,
+ CREATE_ALWAYS,
+ FILE_ATTRIBUTE_NORMAL,
+ nullptr);
+ if (Handle == INVALID_HANDLE_VALUE)
+ {
+ throw std::system_error(std::error_code(::GetLastError(), std::system_category()),
+ fmt::format("Failed to open '{}' for writing", Path));
+ }
+ m_Handle = Handle;
+ return false;
+ }
+
+ if (Handle == INVALID_HANDLE_VALUE)
+ {
+ throw std::system_error(std::error_code(::GetLastError(), std::system_category()),
+ fmt::format("Failed to open '{}' for writing", Path));
+ }
+
+ m_Handle = Handle;
+ return RequestDirect;
+}
+
+bool
+DirectIoFile::OpenRead(const std::filesystem::path& Path, bool RequestDirect)
+{
+ DWORD Flags = FILE_ATTRIBUTE_NORMAL;
+ if (RequestDirect)
+ {
+ Flags |= FILE_FLAG_NO_BUFFERING;
+ }
+
+ HANDLE Handle = ::CreateFileW(Path.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING, Flags, nullptr);
+
+ if (Handle == INVALID_HANDLE_VALUE && RequestDirect)
+ {
+ Handle = ::CreateFileW(Path.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, nullptr);
+ if (Handle == INVALID_HANDLE_VALUE)
+ {
+ throw std::system_error(std::error_code(::GetLastError(), std::system_category()),
+ fmt::format("Failed to open '{}' for reading", Path));
+ }
+ m_Handle = Handle;
+ return false;
+ }
+
+ if (Handle == INVALID_HANDLE_VALUE)
+ {
+ throw std::system_error(std::error_code(::GetLastError(), std::system_category()),
+ fmt::format("Failed to open '{}' for reading", Path));
+ }
+
+ m_Handle = Handle;
+ return RequestDirect;
+}
+
+void
+DirectIoFile::WriteAt(const void* Data, uint64_t Size, uint64_t Offset)
+{
+ OVERLAPPED Ovl{};
+ Ovl.Offset = DWORD(Offset & 0xFFFF'FFFFu);
+ Ovl.OffsetHigh = DWORD(Offset >> 32);
+
+ DWORD BytesWritten = 0;
+ if (!::WriteFile(m_Handle, Data, static_cast<DWORD>(Size), &BytesWritten, &Ovl) || BytesWritten != Size)
+ {
+ throw std::system_error(std::error_code(::GetLastError(), std::system_category()), "WriteFile failed");
+ }
+}
+
+void
+DirectIoFile::ReadAt(void* Data, uint64_t Size, uint64_t Offset)
+{
+ OVERLAPPED Ovl{};
+ Ovl.Offset = DWORD(Offset & 0xFFFF'FFFFu);
+ Ovl.OffsetHigh = DWORD(Offset >> 32);
+
+ DWORD BytesRead = 0;
+ if (!::ReadFile(m_Handle, Data, static_cast<DWORD>(Size), &BytesRead, &Ovl) || BytesRead != Size)
+ {
+ throw std::system_error(std::error_code(::GetLastError(), std::system_category()), "ReadFile failed");
+ }
+}
+
+bool
+DirectIoFile::OpenReadWrite(const std::filesystem::path& Path, bool RequestDirect)
+{
+ DWORD Flags = FILE_ATTRIBUTE_NORMAL;
+ if (RequestDirect)
+ {
+ Flags |= FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH;
+ }
+
+ HANDLE Handle = ::CreateFileW(Path.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, nullptr, OPEN_EXISTING, Flags, nullptr);
+
+ if (Handle == INVALID_HANDLE_VALUE && RequestDirect)
+ {
+ Handle = ::CreateFileW(Path.c_str(),
+ GENERIC_READ | GENERIC_WRITE,
+ FILE_SHARE_READ,
+ nullptr,
+ OPEN_EXISTING,
+ FILE_ATTRIBUTE_NORMAL,
+ nullptr);
+ if (Handle == INVALID_HANDLE_VALUE)
+ {
+ throw std::system_error(std::error_code(::GetLastError(), std::system_category()),
+ fmt::format("Failed to open '{}' for read-write", Path));
+ }
+ m_Handle = Handle;
+ return false;
+ }
+
+ if (Handle == INVALID_HANDLE_VALUE)
+ {
+ throw std::system_error(std::error_code(::GetLastError(), std::system_category()),
+ fmt::format("Failed to open '{}' for read-write", Path));
+ }
+
+ m_Handle = Handle;
+ return RequestDirect;
+}
+
+void
+DirectIoFile::Close()
+{
+ if (m_Handle != INVALID_HANDLE_VALUE)
+ {
+ ::CloseHandle(m_Handle);
+ m_Handle = INVALID_HANDLE_VALUE;
+ }
+}
+
+#else // Linux / macOS
+
+bool
+DirectIoFile::OpenWrite(const std::filesystem::path& Path, bool RequestDirect)
+{
+ int Flags = O_RDWR | O_CREAT | O_TRUNC | O_CLOEXEC;
+
+# if ZEN_PLATFORM_LINUX
+ if (RequestDirect)
+ {
+ int TryFd = ::open(Path.c_str(), Flags | O_DIRECT, 0666);
+ if (TryFd >= 0)
+ {
+ ::fchmod(TryFd, 0666);
+ m_Fd = TryFd;
+ return true;
+ }
+ if (errno != EINVAL)
+ {
+ throw std::system_error(std::error_code(errno, std::system_category()), fmt::format("Failed to open '{}' for writing", Path));
+ }
+ // EINVAL: filesystem doesn't support O_DIRECT, fall through to buffered open
+ }
+# endif
+
+ int Fd = ::open(Path.c_str(), Flags, 0666);
+ if (Fd < 0)
+ {
+ throw std::system_error(std::error_code(errno, std::system_category()), fmt::format("Failed to open '{}' for writing", Path));
+ }
+ ::fchmod(Fd, 0666);
+
+# if ZEN_PLATFORM_MAC
+ if (RequestDirect)
+ {
+ ::fcntl(Fd, F_NOCACHE, 1);
+ }
+# endif
+
+ m_Fd = Fd;
+ return RequestDirect; // On macOS F_NOCACHE is always "accepted" (best-effort)
+}
+
+bool
+DirectIoFile::OpenRead(const std::filesystem::path& Path, bool RequestDirect)
+{
+ int Flags = O_RDONLY | O_CLOEXEC;
+
+# if ZEN_PLATFORM_LINUX
+ if (RequestDirect)
+ {
+ int TryFd = ::open(Path.c_str(), Flags | O_DIRECT);
+ if (TryFd >= 0)
+ {
+ m_Fd = TryFd;
+ return true;
+ }
+ if (errno != EINVAL)
+ {
+ throw std::system_error(std::error_code(errno, std::system_category()), fmt::format("Failed to open '{}' for reading", Path));
+ }
+ }
+# endif
+
+ int Fd = ::open(Path.c_str(), Flags);
+ if (Fd < 0)
+ {
+ throw std::system_error(std::error_code(errno, std::system_category()), fmt::format("Failed to open '{}' for reading", Path));
+ }
+
+# if ZEN_PLATFORM_MAC
+ if (RequestDirect)
+ {
+ ::fcntl(Fd, F_NOCACHE, 1);
+ }
+# endif
+
+ m_Fd = Fd;
+ return RequestDirect;
+}
+
+void
+DirectIoFile::WriteAt(const void* Data, uint64_t Size, uint64_t Offset)
+{
+ ssize_t Result = ::pwrite(m_Fd, Data, static_cast<size_t>(Size), static_cast<off_t>(Offset));
+ if (Result < 0 || static_cast<uint64_t>(Result) != Size)
+ {
+ throw std::system_error(std::error_code(errno, std::system_category()), "pwrite failed");
+ }
+}
+
+void
+DirectIoFile::ReadAt(void* Data, uint64_t Size, uint64_t Offset)
+{
+ ssize_t Result = ::pread(m_Fd, Data, static_cast<size_t>(Size), static_cast<off_t>(Offset));
+ if (Result < 0 || static_cast<uint64_t>(Result) != Size)
+ {
+ throw std::system_error(std::error_code(errno, std::system_category()), "pread failed");
+ }
+}
+
+bool
+DirectIoFile::OpenReadWrite(const std::filesystem::path& Path, bool RequestDirect)
+{
+ int Flags = O_RDWR | O_CLOEXEC;
+
+# if ZEN_PLATFORM_LINUX
+ if (RequestDirect)
+ {
+ int TryFd = ::open(Path.c_str(), Flags | O_DIRECT);
+ if (TryFd >= 0)
+ {
+ m_Fd = TryFd;
+ return true;
+ }
+ if (errno != EINVAL)
+ {
+ throw std::system_error(std::error_code(errno, std::system_category()),
+ fmt::format("Failed to open '{}' for read-write", Path));
+ }
+ }
+# endif
+
+ int Fd = ::open(Path.c_str(), Flags);
+ if (Fd < 0)
+ {
+ throw std::system_error(std::error_code(errno, std::system_category()), fmt::format("Failed to open '{}' for read-write", Path));
+ }
+
+# if ZEN_PLATFORM_MAC
+ if (RequestDirect)
+ {
+ ::fcntl(Fd, F_NOCACHE, 1);
+ }
+# endif
+
+ m_Fd = Fd;
+ return RequestDirect;
+}
+
+void
+DirectIoFile::Close()
+{
+ if (m_Fd >= 0)
+ {
+ ::close(m_Fd);
+ m_Fd = -1;
+ }
+}
+
+#endif // ZEN_PLATFORM_WINDOWS
+
+//////////////////////////////////////////////////////////////////////////
+// BenchDiskSubCmd
+
+static constexpr uint64_t kDiskBenchBlockSizes[] = {4 * 1024, 64 * 1024, 1024 * 1024, 4 * 1024 * 1024};
+
+BenchDiskSubCmd::BenchDiskSubCmd() : ZenSubCmdBase("disk", "Benchmark local disk I/O bandwidth and file operation throughput")
+{
+ SubOptions().add_option("", "p", "path", "Directory to use for benchmark files", cxxopts::value<std::string>(m_Path), "<path>");
+ SubOptions().add_option("",
+ "",
+ "run",
+ "Comma-separated list of benchmarks to run: seq, rand, ops, clone, sync (default: all except sync)",
+ cxxopts::value<std::string>(m_Run)->default_value("seq,rand,ops,clone"),
+ "<list>");
+ SubOptions().add_option("",
+ "",
+ "sync-duration",
+ "Duration of the sync latency benchmark in seconds",
+ cxxopts::value<int>(m_SyncDurationSec)->default_value("10"),
+ "<seconds>");
+ SubOptions().add_option("",
+ "c",
+ "concurrency",
+ "Number of concurrent threads",
+ cxxopts::value<int>(m_Concurrency)->default_value("1"),
+ "<threads>");
+ SubOptions().add_option("",
+ "",
+ "file-size",
+ "Size of each sequential test file in bytes (default: 268435456 = 256 MiB)",
+ cxxopts::value<uint64_t>(m_FileSize)->default_value("268435456"),
+ "<bytes>");
+ SubOptions().add_option("",
+ "",
+ "file-count",
+ "Number of small files to create/delete for the file-ops test",
+ cxxopts::value<int>(m_FileCount)->default_value("1000"),
+ "<count>");
+ SubOptions().add_option("",
+ "",
+ "rand-ops",
+ "Number of random I/O operations per thread",
+ cxxopts::value<int>(m_RandOps)->default_value("10000"),
+ "<count>");
+ SubOptions().add_options()("no-direct",
+ "Use buffered I/O instead of direct/unbuffered I/O (may inflate read results due to OS page cache)",
+ cxxopts::value<bool>(m_NoDirectIo)->default_value("false"));
+ SubOptions().parse_positional({"path"});
+}
+
+void
+BenchDiskSubCmd::Run(const ZenCliOptions& GlobalOptions)
+{
+ ZEN_UNUSED(GlobalOptions);
+
+ if (m_Path.empty())
+ {
+ throw OptionParseException("Path is required", SubOptions().help());
+ }
+
+ if (m_Concurrency <= 0)
+ {
+ throw OptionParseException("--concurrency must be a positive integer", SubOptions().help());
+ }
+
+ if (m_FileSize == 0)
+ {
+ throw OptionParseException("--file-size must be positive", SubOptions().help());
+ }
+
+ if (m_FileCount <= 0)
+ {
+ throw OptionParseException("--file-count must be a positive integer", SubOptions().help());
+ }
+
+ if (m_RandOps <= 0)
+ {
+ throw OptionParseException("--rand-ops must be a positive integer", SubOptions().help());
+ }
+
+ // Parse the --run list into individual flags
+ bool DoSeq = false;
+ bool DoRand = false;
+ bool DoOps = false;
+ bool DoClone = false;
+ bool DoSync = false;
+
+ {
+ std::string_view Remaining(m_Run);
+ while (!Remaining.empty())
+ {
+ size_t Comma = Remaining.find(',');
+ std::string_view Token = Remaining.substr(0, Comma);
+ if (Token == "seq")
+ {
+ DoSeq = true;
+ }
+ else if (Token == "rand")
+ {
+ DoRand = true;
+ }
+ else if (Token == "ops")
+ {
+ DoOps = true;
+ }
+ else if (Token == "clone")
+ {
+ DoClone = true;
+ }
+ else if (Token == "sync")
+ {
+ DoSync = true;
+ }
+ else
+ {
+ throw OptionParseException(fmt::format("Unknown benchmark '{}' in --run (valid: seq, rand, ops, clone, sync)", Token),
+ SubOptions().help());
+ }
+ Remaining = (Comma == std::string_view::npos) ? std::string_view{} : Remaining.substr(Comma + 1);
+ }
+ }
+
+ if (!DoSeq && !DoRand && !DoOps && !DoClone && !DoSync)
+ {
+ throw OptionParseException("--run list is empty", SubOptions().help());
+ }
+
+ std::filesystem::path Dir(m_Path);
+
+ if (!std::filesystem::is_directory(Dir))
+ {
+ throw OptionParseException(fmt::format("'{}' is not a valid directory", m_Path), SubOptions().help());
+ }
+
+ bool const DirectIo = !m_NoDirectIo;
+
+ s_BenchAbort.store(false);
+ auto PrevSigInt = std::signal(SIGINT, [](int) { s_BenchAbort.store(true); });
+ auto PrevSigTerm = std::signal(SIGTERM, [](int) { s_BenchAbort.store(true); });
+
+ uint64_t TotalSeqBytes = m_FileSize * static_cast<uint64_t>(m_Concurrency);
+
+ ZEN_CONSOLE("Disk Benchmark");
+ ZEN_CONSOLE(" Path : {}", m_Path);
+ ZEN_CONSOLE(" Concurrency : {} thread(s)", m_Concurrency);
+ ZEN_CONSOLE(" File size : {} per thread ({} total)", NiceBytes(m_FileSize), NiceBytes(TotalSeqBytes));
+ ZEN_CONSOLE(" File count : {} (file-ops test)", m_FileCount);
+ ZEN_CONSOLE(" I/O mode : {}", DirectIo ? "direct (unbuffered)" : "buffered");
+ ZEN_CONSOLE("");
+ if (DoSeq)
+ {
+ ZEN_CONSOLE("Sequential I/O");
+ ZEN_CONSOLE(" {:>10} {:>12} {:>10} {:>12} {:>10}", "Block", "Write", "W IOPS", "Read", "R IOPS");
+
+ bool WarnedAboutFallback = false;
+
+ for (uint64_t BlockSize : kDiskBenchBlockSizes)
+ {
+ if (s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ break;
+ }
+
+ SeqResult WriteResult = RunSeqWrite(Dir, BlockSize, DirectIo);
+ SeqResult ReadResult = RunSeqRead(Dir, BlockSize, DirectIo);
+
+ CleanupSeqFiles(Dir);
+
+ if (DirectIo && !WriteResult.UsedDirectIo && !WarnedAboutFallback)
+ {
+ ZEN_CONSOLE_WARN(" Note: direct I/O not supported on this filesystem, results reflect buffered I/O");
+ WarnedAboutFallback = true;
+ }
+
+ std::string WriteStr = WriteResult.ErrorCount > 0
+ ? fmt::format("{}/s ({} err)", NiceBytes(WriteResult.BytesPerSec), WriteResult.ErrorCount)
+ : fmt::format("{}/s", NiceBytes(WriteResult.BytesPerSec));
+
+ std::string ReadStr = ReadResult.ErrorCount > 0
+ ? fmt::format("{}/s ({} err)", NiceBytes(ReadResult.BytesPerSec), ReadResult.ErrorCount)
+ : fmt::format("{}/s", NiceBytes(ReadResult.BytesPerSec));
+
+ ZEN_CONSOLE(" {:>10} {:>12} {:>10L} {:>12} {:>10L}",
+ NiceBytes(BlockSize),
+ WriteStr,
+ WriteResult.Iops,
+ ReadStr,
+ ReadResult.Iops);
+ }
+ }
+
+ if (DoRand && !s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ if (DoSeq)
+ {
+ ZEN_CONSOLE("");
+ }
+
+ ZEN_CONSOLE("Random I/O ({:L} ops per thread)", m_RandOps);
+ ZEN_CONSOLE(" {:>10} {:>12} {:>10} {:>12} {:>10}", "Block", "Write", "W IOPS", "Read", "R IOPS");
+
+ PrefillRandFiles(Dir, DirectIo);
+
+ for (uint64_t BlockSize : kDiskBenchBlockSizes)
+ {
+ SeqResult RandWriteResult = RunRandWrite(Dir, BlockSize, DirectIo);
+ SeqResult RandReadResult = RunRandRead(Dir, BlockSize, DirectIo);
+
+ std::string WriteStr = RandWriteResult.ErrorCount > 0
+ ? fmt::format("{}/s ({} err)", NiceBytes(RandWriteResult.BytesPerSec), RandWriteResult.ErrorCount)
+ : fmt::format("{}/s", NiceBytes(RandWriteResult.BytesPerSec));
+
+ std::string ReadStr = RandReadResult.ErrorCount > 0
+ ? fmt::format("{}/s ({} err)", NiceBytes(RandReadResult.BytesPerSec), RandReadResult.ErrorCount)
+ : fmt::format("{}/s", NiceBytes(RandReadResult.BytesPerSec));
+
+ ZEN_CONSOLE(" {:>10} {:>12} {:>10L} {:>12} {:>10L}",
+ NiceBytes(BlockSize),
+ WriteStr,
+ RandWriteResult.Iops,
+ ReadStr,
+ RandReadResult.Iops);
+ }
+
+ CleanupRandFiles(Dir);
+ }
+
+ if (DoOps && !s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ if (DoSeq || DoRand)
+ {
+ ZEN_CONSOLE("");
+ }
+
+ RunFileOps(Dir);
+ }
+
+ if (DoClone && !s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ if (DoSeq || DoRand || DoOps)
+ {
+ ZEN_CONSOLE("");
+ }
+
+ RunClone(Dir);
+ }
+
+ if (DoSync && !s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ if (DoSeq || DoRand || DoOps || DoClone)
+ {
+ ZEN_CONSOLE("");
+ }
+
+ RunSync(Dir);
+ }
+
+ // Always clean up any leftover bench files, even if interrupted by Ctrl-C
+ CleanupSeqFiles(Dir);
+ CleanupRandFiles(Dir);
+ CleanupFileOpFiles(Dir);
+ CleanupCloneFiles(Dir);
+ CleanupSyncFiles(Dir);
+
+ std::signal(SIGINT, PrevSigInt);
+ std::signal(SIGTERM, PrevSigTerm);
+}
+
+BenchDiskSubCmd::SeqResult
+BenchDiskSubCmd::RunSeqWrite(const std::filesystem::path& Dir, uint64_t BlockSize, bool DirectIo)
+{
+ // When using direct I/O, the file size must be a multiple of the alignment.
+ uint64_t const EffectiveFileSize = DirectIo ? AlignUp(m_FileSize, kDirectIoAlignment) : m_FileSize;
+
+ // Shared read-only write buffer, aligned for direct I/O. Extra padding lets us
+ // hand any thread a pointer that is guaranteed kDirectIoAlignment-aligned.
+ std::vector<uint8_t> RawWriteBuf(BlockSize + kDirectIoAlignment - 1);
+ uint8_t* const WritePtr = AlignBuffer(RawWriteBuf.data(), kDirectIoAlignment);
+ for (size_t i = 0; i < BlockSize; ++i)
+ {
+ WritePtr[i] = static_cast<uint8_t>(i & 0xFF);
+ }
+
+ std::atomic<int64_t> TotalBytesWritten{0};
+ std::atomic<int64_t> TotalOps{0};
+ std::atomic<int> ErrorCount{0};
+ std::atomic<int> DirectIoCount{0};
+
+ Stopwatch Timer;
+
+ std::vector<std::thread> Threads;
+ Threads.reserve(m_Concurrency);
+
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ Threads.emplace_back([&, i]() {
+ try
+ {
+ std::filesystem::path FilePath = Dir / fmt::format("bench_disk_{}.tmp", i);
+ DirectIoFile File;
+ bool GotDirect = File.OpenWrite(FilePath, DirectIo);
+
+ if (GotDirect)
+ {
+ DirectIoCount.fetch_add(1, std::memory_order_relaxed);
+ }
+
+ uint64_t Remaining = EffectiveFileSize;
+ uint64_t Offset = 0;
+ int64_t LocalOps = 0;
+
+ while (Remaining > 0 && !s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ uint64_t WriteSize = std::min(BlockSize, Remaining);
+ File.WriteAt(WritePtr, WriteSize, Offset);
+ Offset += WriteSize;
+ Remaining -= WriteSize;
+ ++LocalOps;
+ }
+
+ TotalBytesWritten.fetch_add(static_cast<int64_t>(EffectiveFileSize), std::memory_order_relaxed);
+ TotalOps.fetch_add(LocalOps, std::memory_order_relaxed);
+ }
+ catch (const std::exception&)
+ {
+ ErrorCount.fetch_add(1, std::memory_order_relaxed);
+ }
+ });
+ }
+
+ for (std::thread& T : Threads)
+ {
+ T.join();
+ }
+
+ SeqResult Result;
+ Result.ElapsedSec = Timer.GetElapsedTimeMs() / 1000.0;
+ Result.BytesPerSec = Result.ElapsedSec > 0.0 ? static_cast<uint64_t>(TotalBytesWritten.load() / Result.ElapsedSec) : 0;
+ Result.Iops = Result.ElapsedSec > 0.0 ? static_cast<uint64_t>(TotalOps.load() / Result.ElapsedSec) : 0;
+ Result.ErrorCount = ErrorCount.load();
+ Result.UsedDirectIo = DirectIoCount.load() > 0;
+
+ return Result;
+}
+
+BenchDiskSubCmd::SeqResult
+BenchDiskSubCmd::RunSeqRead(const std::filesystem::path& Dir, uint64_t BlockSize, bool DirectIo)
+{
+ uint64_t const EffectiveFileSize = DirectIo ? AlignUp(m_FileSize, kDirectIoAlignment) : m_FileSize;
+
+ std::atomic<int64_t> TotalBytesRead{0};
+ std::atomic<int64_t> TotalOps{0};
+ std::atomic<int> ErrorCount{0};
+ std::atomic<int> DirectIoCount{0};
+
+ Stopwatch Timer;
+
+ std::vector<std::thread> Threads;
+ Threads.reserve(m_Concurrency);
+
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ Threads.emplace_back([&, i, BlockSize]() {
+ // Each thread owns its aligned read buffer
+ std::vector<uint8_t> RawReadBuf(BlockSize + kDirectIoAlignment - 1);
+ uint8_t* ReadPtr = AlignBuffer(RawReadBuf.data(), kDirectIoAlignment);
+
+ try
+ {
+ std::filesystem::path FilePath = Dir / fmt::format("bench_disk_{}.tmp", i);
+ DirectIoFile File;
+ bool GotDirect = File.OpenRead(FilePath, DirectIo);
+
+ if (GotDirect)
+ {
+ DirectIoCount.fetch_add(1, std::memory_order_relaxed);
+ }
+
+ uint64_t Remaining = EffectiveFileSize;
+ uint64_t Offset = 0;
+ int64_t LocalOps = 0;
+
+ while (Remaining > 0 && !s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ uint64_t ReadSize = std::min(BlockSize, Remaining);
+ File.ReadAt(ReadPtr, ReadSize, Offset);
+ Offset += ReadSize;
+ Remaining -= ReadSize;
+ ++LocalOps;
+ }
+
+ TotalBytesRead.fetch_add(static_cast<int64_t>(EffectiveFileSize), std::memory_order_relaxed);
+ TotalOps.fetch_add(LocalOps, std::memory_order_relaxed);
+ }
+ catch (const std::exception&)
+ {
+ ErrorCount.fetch_add(1, std::memory_order_relaxed);
+ }
+ });
+ }
+
+ for (std::thread& T : Threads)
+ {
+ T.join();
+ }
+
+ SeqResult Result;
+ Result.ElapsedSec = Timer.GetElapsedTimeMs() / 1000.0;
+ Result.BytesPerSec = Result.ElapsedSec > 0.0 ? static_cast<uint64_t>(TotalBytesRead.load() / Result.ElapsedSec) : 0;
+ Result.Iops = Result.ElapsedSec > 0.0 ? static_cast<uint64_t>(TotalOps.load() / Result.ElapsedSec) : 0;
+ Result.ErrorCount = ErrorCount.load();
+ Result.UsedDirectIo = DirectIoCount.load() > 0;
+
+ return Result;
+}
+
+void
+BenchDiskSubCmd::CleanupSeqFiles(const std::filesystem::path& Dir)
+{
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ std::error_code Ec;
+ std::filesystem::path FilePath = Dir / fmt::format("bench_disk_{}.tmp", i);
+ std::filesystem::remove(FilePath, Ec);
+ }
+}
+
+void
+BenchDiskSubCmd::PrefillRandFiles(const std::filesystem::path& Dir, bool DirectIo)
+{
+ // Write files sequentially with a large block size so the random I/O pass
+ // has data to work with. The write itself is not measured.
+ static constexpr uint64_t kPrefillBlock = 1024 * 1024;
+
+ uint64_t const EffectiveFileSize = DirectIo ? AlignUp(m_FileSize, kDirectIoAlignment) : m_FileSize;
+
+ std::vector<uint8_t> RawBuf(kPrefillBlock + kDirectIoAlignment - 1);
+ uint8_t* const WritePtr = AlignBuffer(RawBuf.data(), kDirectIoAlignment);
+ for (size_t i = 0; i < kPrefillBlock; ++i)
+ {
+ WritePtr[i] = static_cast<uint8_t>(i & 0xFF);
+ }
+
+ std::vector<std::thread> Threads;
+ Threads.reserve(m_Concurrency);
+
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ Threads.emplace_back([&, i]() {
+ std::filesystem::path FilePath = Dir / fmt::format("bench_rand_{}.tmp", i);
+ DirectIoFile File;
+ File.OpenWrite(FilePath, DirectIo);
+
+ uint64_t Remaining = EffectiveFileSize;
+ uint64_t Offset = 0;
+
+ while (Remaining > 0 && !s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ uint64_t WriteSize = std::min(kPrefillBlock, Remaining);
+ File.WriteAt(WritePtr, WriteSize, Offset);
+ Offset += WriteSize;
+ Remaining -= WriteSize;
+ }
+ });
+ }
+
+ for (std::thread& T : Threads)
+ {
+ T.join();
+ }
+}
+
+BenchDiskSubCmd::SeqResult
+BenchDiskSubCmd::RunRandWrite(const std::filesystem::path& Dir, uint64_t BlockSize, bool DirectIo)
+{
+ uint64_t const EffectiveFileSize = DirectIo ? AlignUp(m_FileSize, kDirectIoAlignment) : m_FileSize;
+ uint64_t const NumBlocks = EffectiveFileSize / BlockSize;
+
+ std::vector<uint8_t> RawBuf(BlockSize + kDirectIoAlignment - 1);
+ uint8_t* const WritePtr = AlignBuffer(RawBuf.data(), kDirectIoAlignment);
+ for (size_t i = 0; i < BlockSize; ++i)
+ {
+ WritePtr[i] = static_cast<uint8_t>(i & 0xFF);
+ }
+
+ std::atomic<int64_t> TotalBytesWritten{0};
+ std::atomic<int64_t> TotalOps{0};
+ std::atomic<int> ErrorCount{0};
+ std::atomic<int> DirectIoCount{0};
+
+ Stopwatch Timer;
+
+ std::vector<std::thread> Threads;
+ Threads.reserve(m_Concurrency);
+
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ Threads.emplace_back([&, i]() {
+ std::mt19937_64 Rng(uint64_t(i) * 6364136223846793005ULL + 1442695040888963407ULL);
+ std::uniform_int_distribution<uint64_t> Dist(0, NumBlocks - 1);
+
+ try
+ {
+ std::filesystem::path FilePath = Dir / fmt::format("bench_rand_{}.tmp", i);
+ DirectIoFile File;
+ bool GotDirect = File.OpenReadWrite(FilePath, DirectIo);
+
+ if (GotDirect)
+ {
+ DirectIoCount.fetch_add(1, std::memory_order_relaxed);
+ }
+
+ int64_t LocalOps = 0;
+
+ for (int Op = 0; Op < m_RandOps && !s_BenchAbort.load(std::memory_order_relaxed); ++Op)
+ {
+ File.WriteAt(WritePtr, BlockSize, Dist(Rng) * BlockSize);
+ ++LocalOps;
+ }
+
+ TotalBytesWritten.fetch_add(LocalOps * static_cast<int64_t>(BlockSize), std::memory_order_relaxed);
+ TotalOps.fetch_add(LocalOps, std::memory_order_relaxed);
+ }
+ catch (const std::exception&)
+ {
+ ErrorCount.fetch_add(1, std::memory_order_relaxed);
+ }
+ });
+ }
+
+ for (std::thread& T : Threads)
+ {
+ T.join();
+ }
+
+ SeqResult Result;
+ Result.ElapsedSec = Timer.GetElapsedTimeMs() / 1000.0;
+ Result.BytesPerSec = Result.ElapsedSec > 0.0 ? static_cast<uint64_t>(TotalBytesWritten.load() / Result.ElapsedSec) : 0;
+ Result.Iops = Result.ElapsedSec > 0.0 ? static_cast<uint64_t>(TotalOps.load() / Result.ElapsedSec) : 0;
+ Result.ErrorCount = ErrorCount.load();
+ Result.UsedDirectIo = DirectIoCount.load() > 0;
+
+ return Result;
+}
+
+BenchDiskSubCmd::SeqResult
+BenchDiskSubCmd::RunRandRead(const std::filesystem::path& Dir, uint64_t BlockSize, bool DirectIo)
+{
+ uint64_t const EffectiveFileSize = DirectIo ? AlignUp(m_FileSize, kDirectIoAlignment) : m_FileSize;
+ uint64_t const NumBlocks = EffectiveFileSize / BlockSize;
+
+ std::atomic<int64_t> TotalBytesRead{0};
+ std::atomic<int64_t> TotalOps{0};
+ std::atomic<int> ErrorCount{0};
+ std::atomic<int> DirectIoCount{0};
+
+ Stopwatch Timer;
+
+ std::vector<std::thread> Threads;
+ Threads.reserve(m_Concurrency);
+
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ Threads.emplace_back([&, i, BlockSize]() {
+ std::vector<uint8_t> RawBuf(BlockSize + kDirectIoAlignment - 1);
+ uint8_t* ReadPtr = AlignBuffer(RawBuf.data(), kDirectIoAlignment);
+ std::mt19937_64 Rng(uint64_t(i) * 6364136223846793005ULL + 1442695040888963407ULL);
+ std::uniform_int_distribution<uint64_t> Dist(0, NumBlocks - 1);
+
+ try
+ {
+ std::filesystem::path FilePath = Dir / fmt::format("bench_rand_{}.tmp", i);
+ DirectIoFile File;
+ bool GotDirect = File.OpenRead(FilePath, DirectIo);
+
+ if (GotDirect)
+ {
+ DirectIoCount.fetch_add(1, std::memory_order_relaxed);
+ }
+
+ int64_t LocalOps = 0;
+
+ for (int Op = 0; Op < m_RandOps && !s_BenchAbort.load(std::memory_order_relaxed); ++Op)
+ {
+ File.ReadAt(ReadPtr, BlockSize, Dist(Rng) * BlockSize);
+ ++LocalOps;
+ }
+
+ TotalBytesRead.fetch_add(LocalOps * static_cast<int64_t>(BlockSize), std::memory_order_relaxed);
+ TotalOps.fetch_add(LocalOps, std::memory_order_relaxed);
+ }
+ catch (const std::exception&)
+ {
+ ErrorCount.fetch_add(1, std::memory_order_relaxed);
+ }
+ });
+ }
+
+ for (std::thread& T : Threads)
+ {
+ T.join();
+ }
+
+ SeqResult Result;
+ Result.ElapsedSec = Timer.GetElapsedTimeMs() / 1000.0;
+ Result.BytesPerSec = Result.ElapsedSec > 0.0 ? static_cast<uint64_t>(TotalBytesRead.load() / Result.ElapsedSec) : 0;
+ Result.Iops = Result.ElapsedSec > 0.0 ? static_cast<uint64_t>(TotalOps.load() / Result.ElapsedSec) : 0;
+ Result.ErrorCount = ErrorCount.load();
+ Result.UsedDirectIo = DirectIoCount.load() > 0;
+
+ return Result;
+}
+
+void
+BenchDiskSubCmd::CleanupRandFiles(const std::filesystem::path& Dir)
+{
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ std::error_code Ec;
+ std::filesystem::path FilePath = Dir / fmt::format("bench_rand_{}.tmp", i);
+ std::filesystem::remove(FilePath, Ec);
+ }
+}
+
+void
+BenchDiskSubCmd::CleanupFileOpFiles(const std::filesystem::path& Dir)
+{
+ for (int i = 0; i < m_FileCount; ++i)
+ {
+ std::error_code Ec;
+ std::filesystem::path FilePath = Dir / fmt::format("bench_fileop_{}.tmp", i);
+ std::filesystem::remove(FilePath, Ec);
+ }
+}
+
+void
+BenchDiskSubCmd::CleanupCloneFiles(const std::filesystem::path& Dir)
+{
+ std::error_code Ec;
+ std::filesystem::remove(Dir / "bench_clone_src.tmp", Ec);
+ for (int i = 0; i < m_FileCount; ++i)
+ {
+ std::filesystem::remove(Dir / fmt::format("bench_clone_{}.tmp", i), Ec);
+ }
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ std::filesystem::remove(Dir / fmt::format("bench_rangeclone_{}.tmp", i), Ec);
+ }
+}
+
+void
+BenchDiskSubCmd::RunFileOps(const std::filesystem::path& Dir)
+{
+ static constexpr uint64_t kSmallFileSize = 4 * 1024; // 4 KiB
+
+ std::vector<uint8_t> WriteBuffer(kSmallFileSize, 0xAB);
+
+ ZEN_CONSOLE("File Operations (4 KiB files, count={}, concurrency={})", m_FileCount, m_Concurrency);
+
+ // Create phase
+ {
+ std::atomic<int> NextFile{0};
+ std::atomic<int> ErrorCount{0};
+
+ Stopwatch Timer;
+
+ std::vector<std::thread> Threads;
+ Threads.reserve(m_Concurrency);
+
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ Threads.emplace_back([&]() {
+ while (true)
+ {
+ int FileIndex = NextFile.fetch_add(1);
+
+ if (FileIndex >= m_FileCount || s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ break;
+ }
+
+ try
+ {
+ std::filesystem::path FilePath = Dir / fmt::format("bench_fileop_{}.tmp", FileIndex);
+ BasicFile File(FilePath, BasicFile::Mode::kTruncate);
+ File.Write(WriteBuffer.data(), kSmallFileSize, 0);
+ }
+ catch (const std::exception&)
+ {
+ ErrorCount.fetch_add(1, std::memory_order_relaxed);
+ }
+ }
+ });
+ }
+
+ for (std::thread& T : Threads)
+ {
+ T.join();
+ }
+
+ double ElapsedSec = Timer.GetElapsedTimeMs() / 1000.0;
+ double FilesPerSec = ElapsedSec > 0.0 ? m_FileCount / ElapsedSec : 0.0;
+
+ ZEN_CONSOLE(" Create: {:.0f} files/s errors: {} (elapsed: {:.2f}s)", FilesPerSec, ErrorCount.load(), ElapsedSec);
+ }
+
+ // Delete phase
+ {
+ std::atomic<int> NextFile{0};
+ std::atomic<int> ErrorCount{0};
+
+ Stopwatch Timer;
+
+ std::vector<std::thread> Threads;
+ Threads.reserve(m_Concurrency);
+
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ Threads.emplace_back([&]() {
+ while (true)
+ {
+ int FileIndex = NextFile.fetch_add(1);
+
+ if (FileIndex >= m_FileCount || s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ break;
+ }
+
+ try
+ {
+ std::filesystem::path FilePath = Dir / fmt::format("bench_fileop_{}.tmp", FileIndex);
+ std::filesystem::remove(FilePath);
+ }
+ catch (const std::exception&)
+ {
+ ErrorCount.fetch_add(1, std::memory_order_relaxed);
+ }
+ }
+ });
+ }
+
+ for (std::thread& T : Threads)
+ {
+ T.join();
+ }
+
+ double ElapsedSec = Timer.GetElapsedTimeMs() / 1000.0;
+ double FilesPerSec = ElapsedSec > 0.0 ? m_FileCount / ElapsedSec : 0.0;
+
+ ZEN_CONSOLE(" Delete: {:.0f} files/s errors: {} (elapsed: {:.2f}s)", FilesPerSec, ErrorCount.load(), ElapsedSec);
+ }
+}
+
+void
+BenchDiskSubCmd::RunClone(const std::filesystem::path& Dir)
+{
+ ZEN_CONSOLE("Block Clone (file-size={}, count={}, concurrency={})", NiceBytes(m_FileSize), m_FileCount, m_Concurrency);
+
+ std::unique_ptr<CloneQueryInterface> CloneIface = GetCloneQueryInterface(Dir);
+
+ if (!CloneIface)
+ {
+ ZEN_CONSOLE(" Skipped: block cloning not supported on this filesystem");
+ return;
+ }
+
+ // Write a source file used by both the file-clone and range-clone sub-tests
+ std::filesystem::path SrcPath = Dir / "bench_clone_src.tmp";
+ {
+ static constexpr uint64_t kWriteBlock = 1024 * 1024;
+ std::vector<uint8_t> WriteBuf(kWriteBlock, 0xCD);
+ BasicFile SrcFile(SrcPath, BasicFile::Mode::kTruncate);
+ uint64_t Remaining = m_FileSize;
+ uint64_t Offset = 0;
+ while (Remaining > 0)
+ {
+ uint64_t WriteSize = std::min(kWriteBlock, Remaining);
+ SrcFile.Write(WriteBuf.data(), WriteSize, Offset);
+ Offset += WriteSize;
+ Remaining -= WriteSize;
+ }
+ }
+
+ // == File Clone (whole-file CoW) ==
+ {
+ std::atomic<int> NextFile{0};
+ std::atomic<int> ErrorCount{0};
+ std::atomic<int> CloneCount{0};
+
+ Stopwatch Timer;
+
+ std::vector<std::thread> Threads;
+ Threads.reserve(m_Concurrency);
+
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ Threads.emplace_back([&]() {
+ while (true)
+ {
+ int FileIndex = NextFile.fetch_add(1);
+
+ if (FileIndex >= m_FileCount || s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ break;
+ }
+
+ try
+ {
+ std::filesystem::path DstPath = Dir / fmt::format("bench_clone_{}.tmp", FileIndex);
+ if (TryCloneFile(SrcPath, DstPath))
+ {
+ CloneCount.fetch_add(1, std::memory_order_relaxed);
+ }
+ else
+ {
+ ErrorCount.fetch_add(1, std::memory_order_relaxed);
+ }
+ }
+ catch (const std::exception&)
+ {
+ ErrorCount.fetch_add(1, std::memory_order_relaxed);
+ }
+ }
+ });
+ }
+
+ for (std::thread& T : Threads)
+ {
+ T.join();
+ }
+
+ double ElapsedSec = Timer.GetElapsedTimeMs() / 1000.0;
+ int TotalClones = CloneCount.load();
+ double FilesPerSec = ElapsedSec > 0.0 ? TotalClones / ElapsedSec : 0.0;
+ double BytesPerSec = ElapsedSec > 0.0 ? TotalClones * static_cast<double>(m_FileSize) / ElapsedSec : 0.0;
+
+ ZEN_CONSOLE(" File Clone : {:.0f} files/s {}/s errors: {} (elapsed: {:.2f}s)",
+ FilesPerSec,
+ NiceBytes(static_cast<uint64_t>(BytesPerSec)),
+ ErrorCount.load(),
+ ElapsedSec);
+ }
+
+ if (s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ return;
+ }
+
+ // == Block Range Clone (sub-file FICLONERANGE / FSCTL_DUPLICATE_EXTENTS_TO_FILE) ==
+ //
+ // Measures how fast the filesystem can process individual range-clone operations.
+ // One shared source file is opened in the main thread; each worker thread opens its
+ // own destination file and issues m_RandOps FICLONERANGE calls at random aligned offsets.
+
+ // Pre-create per-thread destination files at full size so TryClone's ftruncate is a no-op
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ BasicFile DstFile(Dir / fmt::format("bench_rangeclone_{}.tmp", i), BasicFile::Mode::kTruncate);
+ DstFile.SetFileSize(m_FileSize);
+ }
+
+ BasicFile SrcFile(SrcPath, BasicFile::Mode::kRead);
+
+ ZEN_CONSOLE("");
+ ZEN_CONSOLE("Block Range Clone (rand-ops={:L}, concurrency={})", m_RandOps, m_Concurrency);
+ ZEN_CONSOLE(" {:>10} {:>12} {:>10}", "Block", "Ops/s", "Throughput");
+
+ for (uint64_t BlockSize : kDiskBenchBlockSizes)
+ {
+ if (s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ break;
+ }
+
+ uint64_t const NumBlocks = m_FileSize / BlockSize;
+ if (NumBlocks == 0)
+ {
+ continue;
+ }
+
+ std::atomic<int64_t> TotalOps{0};
+ std::atomic<int> ErrorCount{0};
+
+ Stopwatch Timer;
+
+ std::vector<std::thread> Threads;
+ Threads.reserve(m_Concurrency);
+
+ for (int i = 0; i < m_Concurrency; ++i)
+ {
+ Threads.emplace_back([&, i]() {
+ std::mt19937_64 Rng(uint64_t(i) * 6364136223846793005ULL + 1442695040888963407ULL);
+ std::uniform_int_distribution<uint64_t> Dist(0, NumBlocks - 1);
+
+ try
+ {
+ BasicFile DstFile(Dir / fmt::format("bench_rangeclone_{}.tmp", i), BasicFile::Mode::kWrite);
+ int64_t LocalOps = 0;
+
+ for (int Op = 0; Op < m_RandOps && !s_BenchAbort.load(std::memory_order_relaxed); ++Op)
+ {
+ uint64_t Offset = Dist(Rng) * BlockSize;
+ if (CloneIface->TryClone(SrcFile.Handle(), DstFile.Handle(), Offset, Offset, BlockSize, m_FileSize))
+ {
+ ++LocalOps;
+ }
+ else
+ {
+ ErrorCount.fetch_add(1, std::memory_order_relaxed);
+ }
+ }
+
+ TotalOps.fetch_add(LocalOps, std::memory_order_relaxed);
+ }
+ catch (const std::exception&)
+ {
+ ErrorCount.fetch_add(1, std::memory_order_relaxed);
+ }
+ });
+ }
+
+ for (std::thread& T : Threads)
+ {
+ T.join();
+ }
+
+ double ElapsedSec = Timer.GetElapsedTimeMs() / 1000.0;
+ double OpsPerSec = ElapsedSec > 0.0 ? static_cast<double>(TotalOps.load()) / ElapsedSec : 0.0;
+ double ThroughputSec = OpsPerSec * static_cast<double>(BlockSize);
+
+ std::string ThroughputStr = ErrorCount.load() > 0
+ ? fmt::format("{}/s ({} err)", NiceBytes(static_cast<uint64_t>(ThroughputSec)), ErrorCount.load())
+ : fmt::format("{}/s", NiceBytes(static_cast<uint64_t>(ThroughputSec)));
+
+ ZEN_CONSOLE(" {:>10} {:>12L} {}", NiceBytes(BlockSize), static_cast<int64_t>(OpsPerSec), ThroughputStr);
+ }
+}
+
+void
+BenchDiskSubCmd::RunSync(const std::filesystem::path& Dir)
+{
+ // Each iteration writes one page to dirty it, then times a full fsync.
+ // Single-threaded; runs until the deadline rather than a fixed count.
+ static constexpr uint64_t kWriteSize = 4 * 1024; // bytes dirtied per sync
+ static constexpr uint64_t kFileSize = 4 * 1024 * 1024; // cycling window
+
+ ZEN_CONSOLE("Sync Latency (duration={}s, write={}/op)", m_SyncDurationSec, NiceBytes(kWriteSize));
+
+ std::filesystem::path FilePath = Dir / "bench_sync.tmp";
+ BasicFile File(FilePath, BasicFile::Mode::kTruncate);
+ File.SetFileSize(kFileSize);
+
+ static constexpr uint64_t kNumSlots = kFileSize / kWriteSize;
+ uint8_t WriteBuf[kWriteSize];
+ std::memset(WriteBuf, 0xEF, kWriteSize);
+
+ // Collect latencies in microseconds for sub-millisecond resolution
+ std::vector<uint64_t> LatenciesUs;
+
+ auto Deadline = std::chrono::steady_clock::now() + std::chrono::seconds(m_SyncDurationSec);
+ uint64_t SlotIndex = 0;
+ Stopwatch TotalTimer;
+
+ while (std::chrono::steady_clock::now() < Deadline && !s_BenchAbort.load(std::memory_order_relaxed))
+ {
+ File.Write(WriteBuf, kWriteSize, (SlotIndex % kNumSlots) * kWriteSize);
+ ++SlotIndex;
+
+ Stopwatch SyncTimer;
+ File.Flush();
+ LatenciesUs.push_back(SyncTimer.GetElapsedTimeUs());
+ }
+
+ double TotalElapsedSec = TotalTimer.GetElapsedTimeUs() / 1'000'000.0;
+
+ File.Close();
+
+ if (LatenciesUs.empty())
+ {
+ ZEN_CONSOLE(" No samples collected");
+ return;
+ }
+
+ std::sort(LatenciesUs.begin(), LatenciesUs.end());
+
+ auto PercentileMs = [&](double Pct) -> double {
+ size_t Index = std::min(static_cast<size_t>(LatenciesUs.size() * Pct / 100.0), LatenciesUs.size() - 1);
+ return LatenciesUs[Index] / 1000.0;
+ };
+
+ double SumUs = 0.0;
+ for (uint64_t L : LatenciesUs)
+ {
+ SumUs += static_cast<double>(L);
+ }
+ double MeanMs = SumUs / static_cast<double>(LatenciesUs.size()) / 1000.0;
+ double SyncsPs = TotalElapsedSec > 0.0 ? static_cast<double>(LatenciesUs.size()) / TotalElapsedSec : 0.0;
+
+ ZEN_CONSOLE(" Count : {:L} syncs ({:.1f} syncs/s)", LatenciesUs.size(), SyncsPs);
+ ZEN_CONSOLE(" Latency: min={:.3f}ms mean={:.3f}ms p50={:.3f}ms p95={:.3f}ms p99={:.3f}ms max={:.3f}ms",
+ LatenciesUs.front() / 1000.0,
+ MeanMs,
+ PercentileMs(50),
+ PercentileMs(95),
+ PercentileMs(99),
+ LatenciesUs.back() / 1000.0);
+}
+
+void
+BenchDiskSubCmd::CleanupSyncFiles(const std::filesystem::path& Dir)
+{
+ std::error_code Ec;
+ std::filesystem::remove(Dir / "bench_sync.tmp", Ec);
+}
+
+//////////////////////////////////////////////////////////////////////////
// BenchCommand
BenchCommand::BenchCommand()
@@ -498,6 +1878,7 @@ BenchCommand::BenchCommand()
AddSubCommand(m_PurgeSubCmd);
AddSubCommand(m_HttpSubCmd);
+ AddSubCommand(m_DiskSubCmd);
}
BenchCommand::~BenchCommand() = default;
diff --git a/src/zen/cmds/bench_cmd.h b/src/zen/cmds/bench_cmd.h
index 6700ee410..96f0c9616 100644
--- a/src/zen/cmds/bench_cmd.h
+++ b/src/zen/cmds/bench_cmd.h
@@ -4,6 +4,8 @@
#include "../zen.h"
+#include <filesystem>
+
namespace zen {
class BenchPurgeSubCmd : public ZenSubCmdBase
@@ -35,6 +37,46 @@ private:
bool m_Continuous = false;
};
+class BenchDiskSubCmd : public ZenSubCmdBase
+{
+public:
+ BenchDiskSubCmd();
+ void Run(const ZenCliOptions& GlobalOptions) override;
+
+private:
+ struct SeqResult
+ {
+ uint64_t BytesPerSec = 0;
+ uint64_t Iops = 0;
+ double ElapsedSec = 0.0;
+ int ErrorCount = 0;
+ bool UsedDirectIo = false;
+ };
+
+ SeqResult RunSeqWrite(const std::filesystem::path& Dir, uint64_t BlockSize, bool DirectIo);
+ SeqResult RunSeqRead(const std::filesystem::path& Dir, uint64_t BlockSize, bool DirectIo);
+ SeqResult RunRandWrite(const std::filesystem::path& Dir, uint64_t BlockSize, bool DirectIo);
+ SeqResult RunRandRead(const std::filesystem::path& Dir, uint64_t BlockSize, bool DirectIo);
+ void PrefillRandFiles(const std::filesystem::path& Dir, bool DirectIo);
+ void RunFileOps(const std::filesystem::path& Dir);
+ void RunClone(const std::filesystem::path& Dir);
+ void CleanupSeqFiles(const std::filesystem::path& Dir);
+ void CleanupRandFiles(const std::filesystem::path& Dir);
+ void CleanupFileOpFiles(const std::filesystem::path& Dir);
+ void CleanupCloneFiles(const std::filesystem::path& Dir);
+ void RunSync(const std::filesystem::path& Dir);
+ void CleanupSyncFiles(const std::filesystem::path& Dir);
+
+ std::string m_Path;
+ std::string m_Run = "seq,rand,ops,clone";
+ int m_SyncDurationSec = 10;
+ int m_Concurrency = 1;
+ uint64_t m_FileSize = 256 * 1024 * 1024;
+ int m_FileCount = 1000;
+ int m_RandOps = 10000;
+ bool m_NoDirectIo = false;
+};
+
class BenchCommand : public ZenCmdWithSubCommands
{
public:
@@ -52,6 +94,7 @@ private:
std::string m_SubCommand;
BenchPurgeSubCmd m_PurgeSubCmd;
BenchHttpSubCmd m_HttpSubCmd;
+ BenchDiskSubCmd m_DiskSubCmd;
};
} // namespace zen