// Copyright Epic Games, Inc. All Rights Reserved. #include "bench_cmd.h" #include "bench.h" #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_PLATFORM_WINDOWS # include #else # include # include # include # include #endif #include #include #include #include #include #include #include #include static std::atomic s_BenchAbort{false}; namespace zen { ////////////////////////////////////////////////////////////////////////// // BenchPurgeSubCmd BenchPurgeSubCmd::BenchPurgeSubCmd() : ZenSubCmdBase("purge", "Purge standby memory (system cache)") { SubOptions().add_options()("single", "Do not spawn child processes", cxxopts::value(m_SingleProcess)->default_value("false")); } void BenchPurgeSubCmd::Run(const ZenCliOptions& GlobalOptions) { ZEN_UNUSED(GlobalOptions); bool Ok = false; zen::Stopwatch Timer; try { zen::bench::util::EmptyStandByList(); Ok = true; } catch (const zen::bench::util::elevation_required_exception&) { ZEN_CONSOLE_WARN("Purging standby lists requires elevation. Will try launch as elevated process"); } catch (const std::exception& Ex) { ZEN_CONSOLE_ERROR("{}", Ex.what()); } #if ZEN_PLATFORM_WINDOWS if (!Ok && !m_SingleProcess) { try { zen::CreateProcOptions Cpo; Cpo.Flags = zen::CreateProcOptions::Flag_Elevated | zen::CreateProcOptions::Flag_NewConsole; std::filesystem::path CurExe{zen::GetRunningExecutablePath()}; if (zen::CreateProcResult Cpr = zen::CreateProc(CurExe, fmt::format("bench purge --single"), Cpo)) { zen::ProcessHandle ProcHandle; ProcHandle.Initialize(Cpr); int ExitCode = ProcHandle.WaitExitCode(); if (ExitCode == 0) { Ok = true; } else { ZEN_CONSOLE_ERROR("Elevated child process failed with return code {}", ExitCode); } } } catch (const std::exception& Ex) { ZEN_CONSOLE_ERROR("{}", Ex.what()); } } #endif if (Ok) { // TODO: could also add reporting on just how much memory was purged ZEN_CONSOLE("Purged standby lists! (took {})", zen::NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } } ////////////////////////////////////////////////////////////////////////// // BenchHttpSubCmd BenchHttpSubCmd::BenchHttpSubCmd() : ZenSubCmdBase("http", "Benchmark an HTTP server") { SubOptions().add_option("", "u", "url", "URL to benchmark", cxxopts::value(m_Url), ""); SubOptions().add_option("", "n", "count", "Number of requests to send", cxxopts::value(m_Count)->default_value("100"), ""); SubOptions().add_option("", "c", "concurrency", "Number of concurrent threads", cxxopts::value(m_Concurrency)->default_value("1"), ""); SubOptions().add_option("", "", "method", "HTTP method to use (GET, HEAD)", cxxopts::value(m_Method)->default_value("GET"), ""); SubOptions() .add_option("", "", "unix-socket", "Unix domain socket path (overrides TCP)", cxxopts::value(m_SocketPath), ""); SubOptions().add_options()("no-keepalive", "Close connection after each request (disables keep-alive)", cxxopts::value(m_NoKeepAlive)->default_value("false")); SubOptions().add_options()("continuous", "Run until interrupted (Ctrl+C), printing metrics once per second", cxxopts::value(m_Continuous)->default_value("false")); SubOptions().parse_positional({"url"}); } static std::pair SplitUrl(std::string_view Url) { size_t SchemeEnd = Url.find("://"); size_t SearchFrom = (SchemeEnd != std::string_view::npos) ? SchemeEnd + 3 : 0; size_t PathStart = Url.find('/', SearchFrom); if (PathStart == std::string_view::npos) { return {std::string(Url), "/"}; } return {std::string(Url.substr(0, PathStart)), std::string(Url.substr(PathStart))}; } void BenchHttpSubCmd::Run(const ZenCliOptions& GlobalOptions) { ZEN_UNUSED(GlobalOptions); if (m_Url.empty()) { throw OptionParseException("URL is required", SubOptions().help()); } if (!m_Continuous && m_Count <= 0) { throw OptionParseException("--count must be a positive integer", SubOptions().help()); } if (m_Concurrency <= 0) { throw OptionParseException("--concurrency must be a positive integer", SubOptions().help()); } if (m_Method != "GET" && m_Method != "HEAD") { throw OptionParseException(fmt::format("Unsupported HTTP method '{}'. Supported: GET, HEAD", m_Method), SubOptions().help()); } auto [BaseUri, Path] = SplitUrl(m_Url); std::string ModeStr = m_Continuous ? "continuous" : fmt::format("count={}", m_Count); if (m_SocketPath.empty()) { ZEN_CONSOLE("Benchmarking {} {} ({}, concurrency={})", m_Method, m_Url, ModeStr, m_Concurrency); } else { ZEN_CONSOLE("Benchmarking {} {} via {} ({}, concurrency={})", m_Method, m_Url, m_SocketPath, ModeStr, m_Concurrency); } // Probe for a zenserver identity. If the target exposes /health/info and the // response contains a BuildVersion field we print a short summary. Any failure // (non-zenserver, timeout, unreachable) is silently ignored. try { HttpClientSettings ProbeSettings{.ConnectTimeout = std::chrono::milliseconds(2000), .Timeout = std::chrono::milliseconds(2000), .UnixSocketPath = m_SocketPath}; HttpClient ProbeHttp(BaseUri, ProbeSettings); HttpClient::Response ProbeResp = ProbeHttp.Get("/health/info"); if (ProbeResp.IsSuccess()) { CbObject Info = ProbeResp.AsObject(); std::string_view BuildVersion = Info["BuildVersion"].AsString(); if (!BuildVersion.empty()) { std::string_view Hostname = Info["Hostname"].AsString(); int64_t Pid = Info["Pid"].AsInt64(); std::string_view HttpServerClass = Info["HttpServerClass"].AsString(); ZEN_CONSOLE("Remote : zenserver {} on {} (pid {}, {})", BuildVersion, Hostname, Pid, HttpServerClass); std::string_view OS = Info["OS"].AsString(); std::string_view Arch = Info["Arch"].AsString(); CbObjectView System = Info["System"].AsObjectView(); int64_t LpCount = System["lp_count"].AsInt64(); int64_t TotalMemMiB = System["total_memory_mb"].AsInt64(); ZEN_CONSOLE(" : {}, {}, {} logical processors, {} RAM", OS, Arch, LpCount, NiceBytes(static_cast(TotalMemMiB) * 1024 * 1024)); } } } catch (...) { } if (m_Continuous) { RunContinuous(BaseUri, Path); } else { RunFixedCount(BaseUri, Path); } } void BenchHttpSubCmd::RunFixedCount(const std::string& BaseUri, const std::string& Path) { std::atomic NextRequest{0}; std::vector AllLatencies; AllLatencies.reserve(m_Count); std::mutex LatencyMutex; std::atomic ErrorCount{0}; std::atomic TotalDownloadedBytes{0}; std::atomic TotalUploadedBytes{0}; Stopwatch Timer; auto WorkerFn = [&]() { std::vector LocalLatencies; HttpClientSettings Settings{.UnixSocketPath = m_SocketPath, .ForbidReuseConnection = m_NoKeepAlive}; HttpClient Http(BaseUri, Settings); while (true) { int RequestIndex = NextRequest.fetch_add(1); if (RequestIndex >= m_Count) { break; } try { HttpClient::Response Resp = (m_Method == "HEAD") ? Http.Head(Path) : Http.Get(Path); if (Resp.IsSuccess()) { LocalLatencies.push_back(Resp.ElapsedSeconds); TotalDownloadedBytes.fetch_add(Resp.DownloadedBytes); TotalUploadedBytes.fetch_add(Resp.UploadedBytes); } else { ErrorCount.fetch_add(1); } } catch (const HttpClientError&) { ErrorCount.fetch_add(1); } } std::lock_guard Lock(LatencyMutex); AllLatencies.insert(AllLatencies.end(), LocalLatencies.begin(), LocalLatencies.end()); }; std::vector Threads; Threads.reserve(m_Concurrency); for (int i = 0; i < m_Concurrency; ++i) { Threads.emplace_back(WorkerFn); } for (std::thread& T : Threads) { T.join(); } double TotalSeconds = Timer.GetElapsedTimeMs() / 1000.0; int SuccessCount = static_cast(AllLatencies.size()); int TotalCount = SuccessCount + ErrorCount.load(); std::sort(AllLatencies.begin(), AllLatencies.end()); auto PercentileMs = [&](int Pct) -> double { if (AllLatencies.empty()) { return 0.0; } size_t Index = std::min(AllLatencies.size() * static_cast(Pct) / 100, AllLatencies.size() - 1); return AllLatencies[Index] * 1000.0; }; double SumMs = 0.0; for (double L : AllLatencies) { SumMs += L * 1000.0; } double MeanMs = SuccessCount > 0 ? SumMs / SuccessCount : 0.0; double Rps = TotalSeconds > 0.0 ? TotalCount / TotalSeconds : 0.0; uint64_t DownBytesPerSec = TotalSeconds > 0.0 ? static_cast(TotalDownloadedBytes.load() / TotalSeconds) : 0; uint64_t UpBytesPerSec = TotalSeconds > 0.0 ? static_cast(TotalUploadedBytes.load() / TotalSeconds) : 0; ZEN_CONSOLE(" Requests : {:L} total, {:L} success, {:L} errors", TotalCount, SuccessCount, ErrorCount.load()); ZEN_CONSOLE(" Latency : min={:.1f}ms mean={:.1f}ms p50={:.1f}ms p95={:.1f}ms p99={:.1f}ms max={:.1f}ms", PercentileMs(0), MeanMs, PercentileMs(50), PercentileMs(95), PercentileMs(99), PercentileMs(100)); ZEN_CONSOLE(" Throughput: {:.1f} req/s down: {}/s up: {}/s (elapsed: {:.2f}s)", Rps, NiceBytes(DownBytesPerSec), NiceBytes(UpBytesPerSec), TotalSeconds); } void BenchHttpSubCmd::RunContinuous(const std::string& BaseUri, const std::string& Path) { s_BenchAbort.store(false); auto PrevSigInt = std::signal(SIGINT, [](int) { s_BenchAbort.store(true); }); auto PrevSigTerm = std::signal(SIGTERM, [](int) { s_BenchAbort.store(true); }); metrics::Histogram LatencyHistogram; std::atomic IntervalSuccessCount{0}; std::atomic IntervalErrorCount{0}; std::atomic IntervalDownloadBytes{0}; std::atomic IntervalUploadBytes{0}; std::atomic TotalSuccessCount{0}; std::atomic TotalErrorCount{0}; std::atomic TotalDownloadBytes{0}; std::atomic TotalUploadBytes{0}; Stopwatch RunTimer; auto WorkerFn = [&]() { HttpClientSettings Settings{.UnixSocketPath = m_SocketPath, .ForbidReuseConnection = m_NoKeepAlive}; HttpClient Http(BaseUri, Settings); while (!s_BenchAbort.load(std::memory_order_relaxed)) { try { HttpClient::Response Resp = (m_Method == "HEAD") ? Http.Head(Path) : Http.Get(Path); if (Resp.IsSuccess()) { LatencyHistogram.Update(static_cast(Resp.ElapsedSeconds * 1.0e6)); IntervalSuccessCount.fetch_add(1, std::memory_order_relaxed); IntervalDownloadBytes.fetch_add(Resp.DownloadedBytes, std::memory_order_relaxed); IntervalUploadBytes.fetch_add(Resp.UploadedBytes, std::memory_order_relaxed); TotalSuccessCount.fetch_add(1, std::memory_order_relaxed); TotalDownloadBytes.fetch_add(Resp.DownloadedBytes, std::memory_order_relaxed); TotalUploadBytes.fetch_add(Resp.UploadedBytes, std::memory_order_relaxed); } else { IntervalErrorCount.fetch_add(1, std::memory_order_relaxed); TotalErrorCount.fetch_add(1, std::memory_order_relaxed); } } catch (const HttpClientError&) { IntervalErrorCount.fetch_add(1, std::memory_order_relaxed); TotalErrorCount.fetch_add(1, std::memory_order_relaxed); } } }; auto ReporterFn = [&]() { while (!s_BenchAbort.load(std::memory_order_relaxed)) { // Sleep 1s in short increments to stay responsive to abort for (int i = 0; i < 10 && !s_BenchAbort.load(std::memory_order_relaxed); ++i) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } if (s_BenchAbort.load(std::memory_order_relaxed)) { break; } // Snapshot and reset per-interval counters int64_t Successes = IntervalSuccessCount.exchange(0); int64_t Errors = IntervalErrorCount.exchange(0); int64_t DownBytes = IntervalDownloadBytes.exchange(0); int64_t UpBytes = IntervalUploadBytes.exchange(0); // Snapshot and reset latency histogram uint64_t HistCount = LatencyHistogram.Count(); int64_t HistMin = LatencyHistogram.Min(); int64_t HistMax = LatencyHistogram.Max(); double HistMean = LatencyHistogram.Mean(); metrics::SampleSnapshot Snap = LatencyHistogram.Snapshot(); LatencyHistogram.Clear(); // Format elapsed as HH:MM:SS int TotalSec = static_cast(RunTimer.GetElapsedTimeMs() / 1000.0); int Hours = TotalSec / 3600; int Minutes = (TotalSec % 3600) / 60; int Secs = TotalSec % 60; if (HistCount > 0) { ZEN_CONSOLE( "[{:02d}:{:02d}:{:02d}] req/s: {:L} errors: {:L} lat(ms): min={:.1f} mean={:.1f} p95={:.1f} p99={:.1f} max={:.1f} " "down: {}/s up: {}/s", Hours, Minutes, Secs, Successes, Errors, HistMin / 1000.0, HistMean / 1000.0, Snap.Get95Percentile() / 1000.0, Snap.Get99Percentile() / 1000.0, HistMax / 1000.0, NiceBytes(static_cast(std::max(int64_t{0}, DownBytes))), NiceBytes(static_cast(std::max(int64_t{0}, UpBytes)))); } else { ZEN_CONSOLE("[{:02d}:{:02d}:{:02d}] req/s: 0 errors: {:L} (no successful requests)", Hours, Minutes, Secs, Errors); } } }; std::vector Threads; Threads.reserve(m_Concurrency + 1); Threads.emplace_back(ReporterFn); for (int i = 0; i < m_Concurrency; ++i) { Threads.emplace_back(WorkerFn); } for (std::thread& T : Threads) { T.join(); } std::signal(SIGINT, PrevSigInt); std::signal(SIGTERM, PrevSigTerm); double TotalSeconds = RunTimer.GetElapsedTimeMs() / 1000.0; int64_t TotalCount = TotalSuccessCount.load() + TotalErrorCount.load(); uint64_t DownPerSec = TotalSeconds > 0.0 ? static_cast(TotalDownloadBytes.load() / TotalSeconds) : 0; uint64_t UpPerSec = TotalSeconds > 0.0 ? static_cast(TotalUploadBytes.load() / TotalSeconds) : 0; ZEN_CONSOLE("Stopped. Total: {:L} requests, {:L} success, {:L} errors avg throughput: down {}/s up {}/s (elapsed: {:.2f}s)", TotalCount, TotalSuccessCount.load(), TotalErrorCount.load(), NiceBytes(DownPerSec), NiceBytes(UpPerSec), TotalSeconds); } ////////////////////////////////////////////////////////////////////////// // DirectIoFile - thin wrapper around platform file handles for unbuffered I/O. // On Linux: opens with O_DIRECT, falls back gracefully if the filesystem // doesn't support it (e.g. tmpfs). On macOS: uses F_NOCACHE. On Windows: // opens with FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH. // Buffers, offsets and transfer sizes must all be multiples of kDirectIoAlignment. static constexpr uint64_t kDirectIoAlignment = 4096; static uint8_t* AlignBuffer(uint8_t* Raw, uint64_t Alignment) { return reinterpret_cast((reinterpret_cast(Raw) + Alignment - 1) & ~(Alignment - 1)); } static uint64_t AlignUp(uint64_t Value, uint64_t Alignment) { return (Value + Alignment - 1) & ~(Alignment - 1); } class DirectIoFile { public: DirectIoFile() = default; ~DirectIoFile() { Close(); } DirectIoFile(const DirectIoFile&) = delete; DirectIoFile& operator=(const DirectIoFile&) = delete; // Returns true if the file was successfully opened with direct/unbuffered I/O active. // Returns false if direct I/O was requested but the filesystem doesn't support it and // we fell back to buffered I/O. Throws std::system_error if the file cannot be opened. bool OpenWrite(const std::filesystem::path& Path, bool RequestDirect); // create/truncate bool OpenRead(const std::filesystem::path& Path, bool RequestDirect); // read-only, existing bool OpenReadWrite(const std::filesystem::path& Path, bool RequestDirect); // read-write, existing, no truncate void WriteAt(const void* Data, uint64_t Size, uint64_t Offset); void ReadAt(void* Data, uint64_t Size, uint64_t Offset); void Close(); private: #if ZEN_PLATFORM_WINDOWS HANDLE m_Handle = INVALID_HANDLE_VALUE; #else int m_Fd = -1; #endif }; #if ZEN_PLATFORM_WINDOWS bool DirectIoFile::OpenWrite(const std::filesystem::path& Path, bool RequestDirect) { DWORD Flags = FILE_ATTRIBUTE_NORMAL; if (RequestDirect) { Flags |= FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH; } HANDLE Handle = ::CreateFileW(Path.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, nullptr, CREATE_ALWAYS, Flags, nullptr); if (Handle == INVALID_HANDLE_VALUE && RequestDirect) { // Fall back to buffered I/O Handle = ::CreateFileW(Path.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, nullptr, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, nullptr); if (Handle == INVALID_HANDLE_VALUE) { throw std::system_error(std::error_code(::GetLastError(), std::system_category()), fmt::format("Failed to open '{}' for writing", Path)); } m_Handle = Handle; return false; } if (Handle == INVALID_HANDLE_VALUE) { throw std::system_error(std::error_code(::GetLastError(), std::system_category()), fmt::format("Failed to open '{}' for writing", Path)); } m_Handle = Handle; return RequestDirect; } bool DirectIoFile::OpenRead(const std::filesystem::path& Path, bool RequestDirect) { DWORD Flags = FILE_ATTRIBUTE_NORMAL; if (RequestDirect) { Flags |= FILE_FLAG_NO_BUFFERING; } HANDLE Handle = ::CreateFileW(Path.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING, Flags, nullptr); if (Handle == INVALID_HANDLE_VALUE && RequestDirect) { Handle = ::CreateFileW(Path.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, nullptr); if (Handle == INVALID_HANDLE_VALUE) { throw std::system_error(std::error_code(::GetLastError(), std::system_category()), fmt::format("Failed to open '{}' for reading", Path)); } m_Handle = Handle; return false; } if (Handle == INVALID_HANDLE_VALUE) { throw std::system_error(std::error_code(::GetLastError(), std::system_category()), fmt::format("Failed to open '{}' for reading", Path)); } m_Handle = Handle; return RequestDirect; } void DirectIoFile::WriteAt(const void* Data, uint64_t Size, uint64_t Offset) { OVERLAPPED Ovl{}; Ovl.Offset = DWORD(Offset & 0xFFFF'FFFFu); Ovl.OffsetHigh = DWORD(Offset >> 32); DWORD BytesWritten = 0; if (!::WriteFile(m_Handle, Data, static_cast(Size), &BytesWritten, &Ovl) || BytesWritten != Size) { throw std::system_error(std::error_code(::GetLastError(), std::system_category()), "WriteFile failed"); } } void DirectIoFile::ReadAt(void* Data, uint64_t Size, uint64_t Offset) { OVERLAPPED Ovl{}; Ovl.Offset = DWORD(Offset & 0xFFFF'FFFFu); Ovl.OffsetHigh = DWORD(Offset >> 32); DWORD BytesRead = 0; if (!::ReadFile(m_Handle, Data, static_cast(Size), &BytesRead, &Ovl) || BytesRead != Size) { throw std::system_error(std::error_code(::GetLastError(), std::system_category()), "ReadFile failed"); } } bool DirectIoFile::OpenReadWrite(const std::filesystem::path& Path, bool RequestDirect) { DWORD Flags = FILE_ATTRIBUTE_NORMAL; if (RequestDirect) { Flags |= FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH; } HANDLE Handle = ::CreateFileW(Path.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, nullptr, OPEN_EXISTING, Flags, nullptr); if (Handle == INVALID_HANDLE_VALUE && RequestDirect) { Handle = ::CreateFileW(Path.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, nullptr, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, nullptr); if (Handle == INVALID_HANDLE_VALUE) { throw std::system_error(std::error_code(::GetLastError(), std::system_category()), fmt::format("Failed to open '{}' for read-write", Path)); } m_Handle = Handle; return false; } if (Handle == INVALID_HANDLE_VALUE) { throw std::system_error(std::error_code(::GetLastError(), std::system_category()), fmt::format("Failed to open '{}' for read-write", Path)); } m_Handle = Handle; return RequestDirect; } void DirectIoFile::Close() { if (m_Handle != INVALID_HANDLE_VALUE) { ::CloseHandle(m_Handle); m_Handle = INVALID_HANDLE_VALUE; } } #else // Linux / macOS bool DirectIoFile::OpenWrite(const std::filesystem::path& Path, bool RequestDirect) { int Flags = O_RDWR | O_CREAT | O_TRUNC | O_CLOEXEC; # if ZEN_PLATFORM_LINUX if (RequestDirect) { int TryFd = ::open(Path.c_str(), Flags | O_DIRECT, 0666); if (TryFd >= 0) { ::fchmod(TryFd, 0666); m_Fd = TryFd; return true; } if (errno != EINVAL) { throw std::system_error(std::error_code(errno, std::system_category()), fmt::format("Failed to open '{}' for writing", Path)); } // EINVAL: filesystem doesn't support O_DIRECT, fall through to buffered open } # endif int Fd = ::open(Path.c_str(), Flags, 0666); if (Fd < 0) { throw std::system_error(std::error_code(errno, std::system_category()), fmt::format("Failed to open '{}' for writing", Path)); } ::fchmod(Fd, 0666); # if ZEN_PLATFORM_MAC if (RequestDirect) { ::fcntl(Fd, F_NOCACHE, 1); } # endif m_Fd = Fd; return RequestDirect; // On macOS F_NOCACHE is always "accepted" (best-effort) } bool DirectIoFile::OpenRead(const std::filesystem::path& Path, bool RequestDirect) { int Flags = O_RDONLY | O_CLOEXEC; # if ZEN_PLATFORM_LINUX if (RequestDirect) { int TryFd = ::open(Path.c_str(), Flags | O_DIRECT); if (TryFd >= 0) { m_Fd = TryFd; return true; } if (errno != EINVAL) { throw std::system_error(std::error_code(errno, std::system_category()), fmt::format("Failed to open '{}' for reading", Path)); } } # endif int Fd = ::open(Path.c_str(), Flags); if (Fd < 0) { throw std::system_error(std::error_code(errno, std::system_category()), fmt::format("Failed to open '{}' for reading", Path)); } # if ZEN_PLATFORM_MAC if (RequestDirect) { ::fcntl(Fd, F_NOCACHE, 1); } # endif m_Fd = Fd; return RequestDirect; } void DirectIoFile::WriteAt(const void* Data, uint64_t Size, uint64_t Offset) { ssize_t Result = ::pwrite(m_Fd, Data, static_cast(Size), static_cast(Offset)); if (Result < 0 || static_cast(Result) != Size) { throw std::system_error(std::error_code(errno, std::system_category()), "pwrite failed"); } } void DirectIoFile::ReadAt(void* Data, uint64_t Size, uint64_t Offset) { ssize_t Result = ::pread(m_Fd, Data, static_cast(Size), static_cast(Offset)); if (Result < 0 || static_cast(Result) != Size) { throw std::system_error(std::error_code(errno, std::system_category()), "pread failed"); } } bool DirectIoFile::OpenReadWrite(const std::filesystem::path& Path, bool RequestDirect) { int Flags = O_RDWR | O_CLOEXEC; # if ZEN_PLATFORM_LINUX if (RequestDirect) { int TryFd = ::open(Path.c_str(), Flags | O_DIRECT); if (TryFd >= 0) { m_Fd = TryFd; return true; } if (errno != EINVAL) { throw std::system_error(std::error_code(errno, std::system_category()), fmt::format("Failed to open '{}' for read-write", Path)); } } # endif int Fd = ::open(Path.c_str(), Flags); if (Fd < 0) { throw std::system_error(std::error_code(errno, std::system_category()), fmt::format("Failed to open '{}' for read-write", Path)); } # if ZEN_PLATFORM_MAC if (RequestDirect) { ::fcntl(Fd, F_NOCACHE, 1); } # endif m_Fd = Fd; return RequestDirect; } void DirectIoFile::Close() { if (m_Fd >= 0) { ::close(m_Fd); m_Fd = -1; } } #endif // ZEN_PLATFORM_WINDOWS ////////////////////////////////////////////////////////////////////////// // BenchDiskSubCmd static constexpr uint64_t kDiskBenchBlockSizes[] = {4 * 1024, 64 * 1024, 1024 * 1024, 4 * 1024 * 1024}; BenchDiskSubCmd::BenchDiskSubCmd() : ZenSubCmdBase("disk", "Benchmark local disk I/O bandwidth and file operation throughput") { SubOptions().add_option("", "p", "path", "Directory to use for benchmark files", cxxopts::value(m_Path), ""); SubOptions().add_option("", "", "run", "Comma-separated list of benchmarks to run: seq, rand, ops, clone, sync (default: all except sync)", cxxopts::value(m_Run)->default_value("seq,rand,ops,clone"), ""); SubOptions().add_option("", "", "sync-duration", "Duration of the sync latency benchmark in seconds", cxxopts::value(m_SyncDurationSec)->default_value("10"), ""); SubOptions().add_option("", "c", "concurrency", "Number of concurrent threads", cxxopts::value(m_Concurrency)->default_value("1"), ""); SubOptions().add_option("", "", "file-size", "Size of each sequential test file in bytes (default: 268435456 = 256 MiB)", cxxopts::value(m_FileSize)->default_value("268435456"), ""); SubOptions().add_option("", "", "file-count", "Number of small files to create/delete for the file-ops test", cxxopts::value(m_FileCount)->default_value("1000"), ""); SubOptions().add_option("", "", "rand-ops", "Number of random I/O operations per thread", cxxopts::value(m_RandOps)->default_value("10000"), ""); SubOptions().add_options()("no-direct", "Use buffered I/O instead of direct/unbuffered I/O (may inflate read results due to OS page cache)", cxxopts::value(m_NoDirectIo)->default_value("false")); SubOptions().parse_positional({"path"}); } void BenchDiskSubCmd::Run(const ZenCliOptions& GlobalOptions) { ZEN_UNUSED(GlobalOptions); if (m_Path.empty()) { throw OptionParseException("Path is required", SubOptions().help()); } if (m_Concurrency <= 0) { throw OptionParseException("--concurrency must be a positive integer", SubOptions().help()); } if (m_FileSize == 0) { throw OptionParseException("--file-size must be positive", SubOptions().help()); } if (m_FileCount <= 0) { throw OptionParseException("--file-count must be a positive integer", SubOptions().help()); } if (m_RandOps <= 0) { throw OptionParseException("--rand-ops must be a positive integer", SubOptions().help()); } // Parse the --run list into individual flags bool DoSeq = false; bool DoRand = false; bool DoOps = false; bool DoClone = false; bool DoSync = false; { std::string_view Remaining(m_Run); while (!Remaining.empty()) { size_t Comma = Remaining.find(','); std::string_view Token = Remaining.substr(0, Comma); if (Token == "seq") { DoSeq = true; } else if (Token == "rand") { DoRand = true; } else if (Token == "ops") { DoOps = true; } else if (Token == "clone") { DoClone = true; } else if (Token == "sync") { DoSync = true; } else { throw OptionParseException(fmt::format("Unknown benchmark '{}' in --run (valid: seq, rand, ops, clone, sync)", Token), SubOptions().help()); } Remaining = (Comma == std::string_view::npos) ? std::string_view{} : Remaining.substr(Comma + 1); } } if (!DoSeq && !DoRand && !DoOps && !DoClone && !DoSync) { throw OptionParseException("--run list is empty", SubOptions().help()); } std::filesystem::path Dir(m_Path); if (!std::filesystem::is_directory(Dir)) { throw OptionParseException(fmt::format("'{}' is not a valid directory", m_Path), SubOptions().help()); } bool const DirectIo = !m_NoDirectIo; s_BenchAbort.store(false); auto PrevSigInt = std::signal(SIGINT, [](int) { s_BenchAbort.store(true); }); auto PrevSigTerm = std::signal(SIGTERM, [](int) { s_BenchAbort.store(true); }); uint64_t TotalSeqBytes = m_FileSize * static_cast(m_Concurrency); ZEN_CONSOLE("Disk Benchmark"); ZEN_CONSOLE(" Path : {}", m_Path); ZEN_CONSOLE(" Concurrency : {} thread(s)", m_Concurrency); ZEN_CONSOLE(" File size : {} per thread ({} total)", NiceBytes(m_FileSize), NiceBytes(TotalSeqBytes)); ZEN_CONSOLE(" File count : {} (file-ops test)", m_FileCount); ZEN_CONSOLE(" I/O mode : {}", DirectIo ? "direct (unbuffered)" : "buffered"); ZEN_CONSOLE(""); if (DoSeq) { ZEN_CONSOLE("Sequential I/O"); ZEN_CONSOLE(" {:>10} {:>12} {:>10} {:>12} {:>10}", "Block", "Write", "W IOPS", "Read", "R IOPS"); bool WarnedAboutFallback = false; for (uint64_t BlockSize : kDiskBenchBlockSizes) { if (s_BenchAbort.load(std::memory_order_relaxed)) { break; } SeqResult WriteResult = RunSeqWrite(Dir, BlockSize, DirectIo); SeqResult ReadResult = RunSeqRead(Dir, BlockSize, DirectIo); CleanupSeqFiles(Dir); if (DirectIo && !WriteResult.UsedDirectIo && !WarnedAboutFallback) { ZEN_CONSOLE_WARN(" Note: direct I/O not supported on this filesystem, results reflect buffered I/O"); WarnedAboutFallback = true; } std::string WriteStr = WriteResult.ErrorCount > 0 ? fmt::format("{}/s ({} err)", NiceBytes(WriteResult.BytesPerSec), WriteResult.ErrorCount) : fmt::format("{}/s", NiceBytes(WriteResult.BytesPerSec)); std::string ReadStr = ReadResult.ErrorCount > 0 ? fmt::format("{}/s ({} err)", NiceBytes(ReadResult.BytesPerSec), ReadResult.ErrorCount) : fmt::format("{}/s", NiceBytes(ReadResult.BytesPerSec)); ZEN_CONSOLE(" {:>10} {:>12} {:>10L} {:>12} {:>10L}", NiceBytes(BlockSize), WriteStr, WriteResult.Iops, ReadStr, ReadResult.Iops); } } if (DoRand && !s_BenchAbort.load(std::memory_order_relaxed)) { if (DoSeq) { ZEN_CONSOLE(""); } ZEN_CONSOLE("Random I/O ({:L} ops per thread)", m_RandOps); ZEN_CONSOLE(" {:>10} {:>12} {:>10} {:>12} {:>10}", "Block", "Write", "W IOPS", "Read", "R IOPS"); PrefillRandFiles(Dir, DirectIo); for (uint64_t BlockSize : kDiskBenchBlockSizes) { SeqResult RandWriteResult = RunRandWrite(Dir, BlockSize, DirectIo); SeqResult RandReadResult = RunRandRead(Dir, BlockSize, DirectIo); std::string WriteStr = RandWriteResult.ErrorCount > 0 ? fmt::format("{}/s ({} err)", NiceBytes(RandWriteResult.BytesPerSec), RandWriteResult.ErrorCount) : fmt::format("{}/s", NiceBytes(RandWriteResult.BytesPerSec)); std::string ReadStr = RandReadResult.ErrorCount > 0 ? fmt::format("{}/s ({} err)", NiceBytes(RandReadResult.BytesPerSec), RandReadResult.ErrorCount) : fmt::format("{}/s", NiceBytes(RandReadResult.BytesPerSec)); ZEN_CONSOLE(" {:>10} {:>12} {:>10L} {:>12} {:>10L}", NiceBytes(BlockSize), WriteStr, RandWriteResult.Iops, ReadStr, RandReadResult.Iops); } CleanupRandFiles(Dir); } if (DoOps && !s_BenchAbort.load(std::memory_order_relaxed)) { if (DoSeq || DoRand) { ZEN_CONSOLE(""); } RunFileOps(Dir); } if (DoClone && !s_BenchAbort.load(std::memory_order_relaxed)) { if (DoSeq || DoRand || DoOps) { ZEN_CONSOLE(""); } RunClone(Dir); } if (DoSync && !s_BenchAbort.load(std::memory_order_relaxed)) { if (DoSeq || DoRand || DoOps || DoClone) { ZEN_CONSOLE(""); } RunSync(Dir); } // Always clean up any leftover bench files, even if interrupted by Ctrl-C CleanupSeqFiles(Dir); CleanupRandFiles(Dir); CleanupFileOpFiles(Dir); CleanupCloneFiles(Dir); CleanupSyncFiles(Dir); std::signal(SIGINT, PrevSigInt); std::signal(SIGTERM, PrevSigTerm); } BenchDiskSubCmd::SeqResult BenchDiskSubCmd::RunSeqWrite(const std::filesystem::path& Dir, uint64_t BlockSize, bool DirectIo) { // When using direct I/O, the file size must be a multiple of the alignment. uint64_t const EffectiveFileSize = DirectIo ? AlignUp(m_FileSize, kDirectIoAlignment) : m_FileSize; // Shared read-only write buffer, aligned for direct I/O. Extra padding lets us // hand any thread a pointer that is guaranteed kDirectIoAlignment-aligned. std::vector RawWriteBuf(BlockSize + kDirectIoAlignment - 1); uint8_t* const WritePtr = AlignBuffer(RawWriteBuf.data(), kDirectIoAlignment); for (size_t i = 0; i < BlockSize; ++i) { WritePtr[i] = static_cast(i & 0xFF); } std::atomic TotalBytesWritten{0}; std::atomic TotalOps{0}; std::atomic ErrorCount{0}; std::atomic DirectIoCount{0}; Stopwatch Timer; std::vector Threads; Threads.reserve(m_Concurrency); for (int i = 0; i < m_Concurrency; ++i) { Threads.emplace_back([&, i]() { try { std::filesystem::path FilePath = Dir / fmt::format("bench_disk_{}.tmp", i); DirectIoFile File; bool GotDirect = File.OpenWrite(FilePath, DirectIo); if (GotDirect) { DirectIoCount.fetch_add(1, std::memory_order_relaxed); } uint64_t Remaining = EffectiveFileSize; uint64_t Offset = 0; int64_t LocalOps = 0; while (Remaining > 0 && !s_BenchAbort.load(std::memory_order_relaxed)) { uint64_t WriteSize = std::min(BlockSize, Remaining); File.WriteAt(WritePtr, WriteSize, Offset); Offset += WriteSize; Remaining -= WriteSize; ++LocalOps; } TotalBytesWritten.fetch_add(static_cast(EffectiveFileSize), std::memory_order_relaxed); TotalOps.fetch_add(LocalOps, std::memory_order_relaxed); } catch (const std::exception&) { ErrorCount.fetch_add(1, std::memory_order_relaxed); } }); } for (std::thread& T : Threads) { T.join(); } SeqResult Result; Result.ElapsedSec = Timer.GetElapsedTimeMs() / 1000.0; Result.BytesPerSec = Result.ElapsedSec > 0.0 ? static_cast(TotalBytesWritten.load() / Result.ElapsedSec) : 0; Result.Iops = Result.ElapsedSec > 0.0 ? static_cast(TotalOps.load() / Result.ElapsedSec) : 0; Result.ErrorCount = ErrorCount.load(); Result.UsedDirectIo = DirectIoCount.load() > 0; return Result; } BenchDiskSubCmd::SeqResult BenchDiskSubCmd::RunSeqRead(const std::filesystem::path& Dir, uint64_t BlockSize, bool DirectIo) { uint64_t const EffectiveFileSize = DirectIo ? AlignUp(m_FileSize, kDirectIoAlignment) : m_FileSize; std::atomic TotalBytesRead{0}; std::atomic TotalOps{0}; std::atomic ErrorCount{0}; std::atomic DirectIoCount{0}; Stopwatch Timer; std::vector Threads; Threads.reserve(m_Concurrency); for (int i = 0; i < m_Concurrency; ++i) { Threads.emplace_back([&, i, BlockSize]() { // Each thread owns its aligned read buffer std::vector RawReadBuf(BlockSize + kDirectIoAlignment - 1); uint8_t* ReadPtr = AlignBuffer(RawReadBuf.data(), kDirectIoAlignment); try { std::filesystem::path FilePath = Dir / fmt::format("bench_disk_{}.tmp", i); DirectIoFile File; bool GotDirect = File.OpenRead(FilePath, DirectIo); if (GotDirect) { DirectIoCount.fetch_add(1, std::memory_order_relaxed); } uint64_t Remaining = EffectiveFileSize; uint64_t Offset = 0; int64_t LocalOps = 0; while (Remaining > 0 && !s_BenchAbort.load(std::memory_order_relaxed)) { uint64_t ReadSize = std::min(BlockSize, Remaining); File.ReadAt(ReadPtr, ReadSize, Offset); Offset += ReadSize; Remaining -= ReadSize; ++LocalOps; } TotalBytesRead.fetch_add(static_cast(EffectiveFileSize), std::memory_order_relaxed); TotalOps.fetch_add(LocalOps, std::memory_order_relaxed); } catch (const std::exception&) { ErrorCount.fetch_add(1, std::memory_order_relaxed); } }); } for (std::thread& T : Threads) { T.join(); } SeqResult Result; Result.ElapsedSec = Timer.GetElapsedTimeMs() / 1000.0; Result.BytesPerSec = Result.ElapsedSec > 0.0 ? static_cast(TotalBytesRead.load() / Result.ElapsedSec) : 0; Result.Iops = Result.ElapsedSec > 0.0 ? static_cast(TotalOps.load() / Result.ElapsedSec) : 0; Result.ErrorCount = ErrorCount.load(); Result.UsedDirectIo = DirectIoCount.load() > 0; return Result; } void BenchDiskSubCmd::CleanupSeqFiles(const std::filesystem::path& Dir) { for (int i = 0; i < m_Concurrency; ++i) { std::error_code Ec; std::filesystem::path FilePath = Dir / fmt::format("bench_disk_{}.tmp", i); std::filesystem::remove(FilePath, Ec); } } void BenchDiskSubCmd::PrefillRandFiles(const std::filesystem::path& Dir, bool DirectIo) { // Write files sequentially with a large block size so the random I/O pass // has data to work with. The write itself is not measured. static constexpr uint64_t kPrefillBlock = 1024 * 1024; uint64_t const EffectiveFileSize = DirectIo ? AlignUp(m_FileSize, kDirectIoAlignment) : m_FileSize; std::vector RawBuf(kPrefillBlock + kDirectIoAlignment - 1); uint8_t* const WritePtr = AlignBuffer(RawBuf.data(), kDirectIoAlignment); for (size_t i = 0; i < kPrefillBlock; ++i) { WritePtr[i] = static_cast(i & 0xFF); } std::vector Threads; Threads.reserve(m_Concurrency); for (int i = 0; i < m_Concurrency; ++i) { Threads.emplace_back([&, i]() { std::filesystem::path FilePath = Dir / fmt::format("bench_rand_{}.tmp", i); DirectIoFile File; File.OpenWrite(FilePath, DirectIo); uint64_t Remaining = EffectiveFileSize; uint64_t Offset = 0; while (Remaining > 0 && !s_BenchAbort.load(std::memory_order_relaxed)) { uint64_t WriteSize = std::min(kPrefillBlock, Remaining); File.WriteAt(WritePtr, WriteSize, Offset); Offset += WriteSize; Remaining -= WriteSize; } }); } for (std::thread& T : Threads) { T.join(); } } BenchDiskSubCmd::SeqResult BenchDiskSubCmd::RunRandWrite(const std::filesystem::path& Dir, uint64_t BlockSize, bool DirectIo) { uint64_t const EffectiveFileSize = DirectIo ? AlignUp(m_FileSize, kDirectIoAlignment) : m_FileSize; uint64_t const NumBlocks = EffectiveFileSize / BlockSize; std::vector RawBuf(BlockSize + kDirectIoAlignment - 1); uint8_t* const WritePtr = AlignBuffer(RawBuf.data(), kDirectIoAlignment); for (size_t i = 0; i < BlockSize; ++i) { WritePtr[i] = static_cast(i & 0xFF); } std::atomic TotalBytesWritten{0}; std::atomic TotalOps{0}; std::atomic ErrorCount{0}; std::atomic DirectIoCount{0}; Stopwatch Timer; std::vector Threads; Threads.reserve(m_Concurrency); for (int i = 0; i < m_Concurrency; ++i) { Threads.emplace_back([&, i]() { std::mt19937_64 Rng(uint64_t(i) * 6364136223846793005ULL + 1442695040888963407ULL); std::uniform_int_distribution Dist(0, NumBlocks - 1); try { std::filesystem::path FilePath = Dir / fmt::format("bench_rand_{}.tmp", i); DirectIoFile File; bool GotDirect = File.OpenReadWrite(FilePath, DirectIo); if (GotDirect) { DirectIoCount.fetch_add(1, std::memory_order_relaxed); } int64_t LocalOps = 0; for (int Op = 0; Op < m_RandOps && !s_BenchAbort.load(std::memory_order_relaxed); ++Op) { File.WriteAt(WritePtr, BlockSize, Dist(Rng) * BlockSize); ++LocalOps; } TotalBytesWritten.fetch_add(LocalOps * static_cast(BlockSize), std::memory_order_relaxed); TotalOps.fetch_add(LocalOps, std::memory_order_relaxed); } catch (const std::exception&) { ErrorCount.fetch_add(1, std::memory_order_relaxed); } }); } for (std::thread& T : Threads) { T.join(); } SeqResult Result; Result.ElapsedSec = Timer.GetElapsedTimeMs() / 1000.0; Result.BytesPerSec = Result.ElapsedSec > 0.0 ? static_cast(TotalBytesWritten.load() / Result.ElapsedSec) : 0; Result.Iops = Result.ElapsedSec > 0.0 ? static_cast(TotalOps.load() / Result.ElapsedSec) : 0; Result.ErrorCount = ErrorCount.load(); Result.UsedDirectIo = DirectIoCount.load() > 0; return Result; } BenchDiskSubCmd::SeqResult BenchDiskSubCmd::RunRandRead(const std::filesystem::path& Dir, uint64_t BlockSize, bool DirectIo) { uint64_t const EffectiveFileSize = DirectIo ? AlignUp(m_FileSize, kDirectIoAlignment) : m_FileSize; uint64_t const NumBlocks = EffectiveFileSize / BlockSize; std::atomic TotalBytesRead{0}; std::atomic TotalOps{0}; std::atomic ErrorCount{0}; std::atomic DirectIoCount{0}; Stopwatch Timer; std::vector Threads; Threads.reserve(m_Concurrency); for (int i = 0; i < m_Concurrency; ++i) { Threads.emplace_back([&, i, BlockSize]() { std::vector RawBuf(BlockSize + kDirectIoAlignment - 1); uint8_t* ReadPtr = AlignBuffer(RawBuf.data(), kDirectIoAlignment); std::mt19937_64 Rng(uint64_t(i) * 6364136223846793005ULL + 1442695040888963407ULL); std::uniform_int_distribution Dist(0, NumBlocks - 1); try { std::filesystem::path FilePath = Dir / fmt::format("bench_rand_{}.tmp", i); DirectIoFile File; bool GotDirect = File.OpenRead(FilePath, DirectIo); if (GotDirect) { DirectIoCount.fetch_add(1, std::memory_order_relaxed); } int64_t LocalOps = 0; for (int Op = 0; Op < m_RandOps && !s_BenchAbort.load(std::memory_order_relaxed); ++Op) { File.ReadAt(ReadPtr, BlockSize, Dist(Rng) * BlockSize); ++LocalOps; } TotalBytesRead.fetch_add(LocalOps * static_cast(BlockSize), std::memory_order_relaxed); TotalOps.fetch_add(LocalOps, std::memory_order_relaxed); } catch (const std::exception&) { ErrorCount.fetch_add(1, std::memory_order_relaxed); } }); } for (std::thread& T : Threads) { T.join(); } SeqResult Result; Result.ElapsedSec = Timer.GetElapsedTimeMs() / 1000.0; Result.BytesPerSec = Result.ElapsedSec > 0.0 ? static_cast(TotalBytesRead.load() / Result.ElapsedSec) : 0; Result.Iops = Result.ElapsedSec > 0.0 ? static_cast(TotalOps.load() / Result.ElapsedSec) : 0; Result.ErrorCount = ErrorCount.load(); Result.UsedDirectIo = DirectIoCount.load() > 0; return Result; } void BenchDiskSubCmd::CleanupRandFiles(const std::filesystem::path& Dir) { for (int i = 0; i < m_Concurrency; ++i) { std::error_code Ec; std::filesystem::path FilePath = Dir / fmt::format("bench_rand_{}.tmp", i); std::filesystem::remove(FilePath, Ec); } } void BenchDiskSubCmd::CleanupFileOpFiles(const std::filesystem::path& Dir) { for (int i = 0; i < m_FileCount; ++i) { std::error_code Ec; std::filesystem::path FilePath = Dir / fmt::format("bench_fileop_{}.tmp", i); std::filesystem::remove(FilePath, Ec); } } void BenchDiskSubCmd::CleanupCloneFiles(const std::filesystem::path& Dir) { std::error_code Ec; std::filesystem::remove(Dir / "bench_clone_src.tmp", Ec); for (int i = 0; i < m_FileCount; ++i) { std::filesystem::remove(Dir / fmt::format("bench_clone_{}.tmp", i), Ec); } for (int i = 0; i < m_Concurrency; ++i) { std::filesystem::remove(Dir / fmt::format("bench_rangeclone_{}.tmp", i), Ec); } } void BenchDiskSubCmd::RunFileOps(const std::filesystem::path& Dir) { static constexpr uint64_t kSmallFileSize = 4 * 1024; // 4 KiB std::vector WriteBuffer(kSmallFileSize, 0xAB); ZEN_CONSOLE("File Operations (4 KiB files, count={}, concurrency={})", m_FileCount, m_Concurrency); // Create phase { std::atomic NextFile{0}; std::atomic ErrorCount{0}; Stopwatch Timer; std::vector Threads; Threads.reserve(m_Concurrency); for (int i = 0; i < m_Concurrency; ++i) { Threads.emplace_back([&]() { while (true) { int FileIndex = NextFile.fetch_add(1); if (FileIndex >= m_FileCount || s_BenchAbort.load(std::memory_order_relaxed)) { break; } try { std::filesystem::path FilePath = Dir / fmt::format("bench_fileop_{}.tmp", FileIndex); BasicFile File(FilePath, BasicFile::Mode::kTruncate); File.Write(WriteBuffer.data(), kSmallFileSize, 0); } catch (const std::exception&) { ErrorCount.fetch_add(1, std::memory_order_relaxed); } } }); } for (std::thread& T : Threads) { T.join(); } double ElapsedSec = Timer.GetElapsedTimeMs() / 1000.0; double FilesPerSec = ElapsedSec > 0.0 ? m_FileCount / ElapsedSec : 0.0; ZEN_CONSOLE(" Create: {:.0f} files/s errors: {} (elapsed: {:.2f}s)", FilesPerSec, ErrorCount.load(), ElapsedSec); } // Delete phase { std::atomic NextFile{0}; std::atomic ErrorCount{0}; Stopwatch Timer; std::vector Threads; Threads.reserve(m_Concurrency); for (int i = 0; i < m_Concurrency; ++i) { Threads.emplace_back([&]() { while (true) { int FileIndex = NextFile.fetch_add(1); if (FileIndex >= m_FileCount || s_BenchAbort.load(std::memory_order_relaxed)) { break; } try { std::filesystem::path FilePath = Dir / fmt::format("bench_fileop_{}.tmp", FileIndex); std::filesystem::remove(FilePath); } catch (const std::exception&) { ErrorCount.fetch_add(1, std::memory_order_relaxed); } } }); } for (std::thread& T : Threads) { T.join(); } double ElapsedSec = Timer.GetElapsedTimeMs() / 1000.0; double FilesPerSec = ElapsedSec > 0.0 ? m_FileCount / ElapsedSec : 0.0; ZEN_CONSOLE(" Delete: {:.0f} files/s errors: {} (elapsed: {:.2f}s)", FilesPerSec, ErrorCount.load(), ElapsedSec); } } void BenchDiskSubCmd::RunClone(const std::filesystem::path& Dir) { ZEN_CONSOLE("Block Clone (file-size={}, count={}, concurrency={})", NiceBytes(m_FileSize), m_FileCount, m_Concurrency); std::unique_ptr CloneIface = GetCloneQueryInterface(Dir); if (!CloneIface) { ZEN_CONSOLE(" Skipped: block cloning not supported on this filesystem"); return; } // Write a source file used by both the file-clone and range-clone sub-tests std::filesystem::path SrcPath = Dir / "bench_clone_src.tmp"; { static constexpr uint64_t kWriteBlock = 1024 * 1024; std::vector WriteBuf(kWriteBlock, 0xCD); BasicFile SrcFile(SrcPath, BasicFile::Mode::kTruncate); uint64_t Remaining = m_FileSize; uint64_t Offset = 0; while (Remaining > 0) { uint64_t WriteSize = std::min(kWriteBlock, Remaining); SrcFile.Write(WriteBuf.data(), WriteSize, Offset); Offset += WriteSize; Remaining -= WriteSize; } } // == File Clone (whole-file CoW) == { std::atomic NextFile{0}; std::atomic ErrorCount{0}; std::atomic CloneCount{0}; Stopwatch Timer; std::vector Threads; Threads.reserve(m_Concurrency); for (int i = 0; i < m_Concurrency; ++i) { Threads.emplace_back([&]() { while (true) { int FileIndex = NextFile.fetch_add(1); if (FileIndex >= m_FileCount || s_BenchAbort.load(std::memory_order_relaxed)) { break; } try { std::filesystem::path DstPath = Dir / fmt::format("bench_clone_{}.tmp", FileIndex); if (std::error_code CloneEc = TryCloneFile(SrcPath, DstPath); !CloneEc) { CloneCount.fetch_add(1, std::memory_order_relaxed); } else { ErrorCount.fetch_add(1, std::memory_order_relaxed); } } catch (const std::exception&) { ErrorCount.fetch_add(1, std::memory_order_relaxed); } } }); } for (std::thread& T : Threads) { T.join(); } double ElapsedSec = Timer.GetElapsedTimeMs() / 1000.0; int TotalClones = CloneCount.load(); double FilesPerSec = ElapsedSec > 0.0 ? TotalClones / ElapsedSec : 0.0; double BytesPerSec = ElapsedSec > 0.0 ? TotalClones * static_cast(m_FileSize) / ElapsedSec : 0.0; ZEN_CONSOLE(" File Clone : {:.0f} files/s {}/s errors: {} (elapsed: {:.2f}s)", FilesPerSec, NiceBytes(static_cast(BytesPerSec)), ErrorCount.load(), ElapsedSec); } if (s_BenchAbort.load(std::memory_order_relaxed)) { return; } // == Block Range Clone (sub-file FICLONERANGE / FSCTL_DUPLICATE_EXTENTS_TO_FILE) == // // Measures how fast the filesystem can process individual range-clone operations. // One shared source file is opened in the main thread; each worker thread opens its // own destination file and issues m_RandOps FICLONERANGE calls at random aligned offsets. // Pre-create per-thread destination files at full size so TryClone's ftruncate is a no-op for (int i = 0; i < m_Concurrency; ++i) { BasicFile DstFile(Dir / fmt::format("bench_rangeclone_{}.tmp", i), BasicFile::Mode::kTruncate); DstFile.SetFileSize(m_FileSize); } BasicFile SrcFile(SrcPath, BasicFile::Mode::kRead); ZEN_CONSOLE(""); ZEN_CONSOLE("Block Range Clone (rand-ops={:L}, concurrency={})", m_RandOps, m_Concurrency); ZEN_CONSOLE(" {:>10} {:>12} {:>10}", "Block", "Ops/s", "Throughput"); for (uint64_t BlockSize : kDiskBenchBlockSizes) { if (s_BenchAbort.load(std::memory_order_relaxed)) { break; } uint64_t const NumBlocks = m_FileSize / BlockSize; if (NumBlocks == 0) { continue; } std::atomic TotalOps{0}; std::atomic ErrorCount{0}; Stopwatch Timer; std::vector Threads; Threads.reserve(m_Concurrency); for (int i = 0; i < m_Concurrency; ++i) { Threads.emplace_back([&, i]() { std::mt19937_64 Rng(uint64_t(i) * 6364136223846793005ULL + 1442695040888963407ULL); std::uniform_int_distribution Dist(0, NumBlocks - 1); try { BasicFile DstFile(Dir / fmt::format("bench_rangeclone_{}.tmp", i), BasicFile::Mode::kWrite); int64_t LocalOps = 0; for (int Op = 0; Op < m_RandOps && !s_BenchAbort.load(std::memory_order_relaxed); ++Op) { uint64_t Offset = Dist(Rng) * BlockSize; if (CloneIface->TryClone(SrcFile.Handle(), DstFile.Handle(), Offset, Offset, BlockSize, m_FileSize)) { ++LocalOps; } else { ErrorCount.fetch_add(1, std::memory_order_relaxed); } } TotalOps.fetch_add(LocalOps, std::memory_order_relaxed); } catch (const std::exception&) { ErrorCount.fetch_add(1, std::memory_order_relaxed); } }); } for (std::thread& T : Threads) { T.join(); } double ElapsedSec = Timer.GetElapsedTimeMs() / 1000.0; double OpsPerSec = ElapsedSec > 0.0 ? static_cast(TotalOps.load()) / ElapsedSec : 0.0; double ThroughputSec = OpsPerSec * static_cast(BlockSize); std::string ThroughputStr = ErrorCount.load() > 0 ? fmt::format("{}/s ({} err)", NiceBytes(static_cast(ThroughputSec)), ErrorCount.load()) : fmt::format("{}/s", NiceBytes(static_cast(ThroughputSec))); ZEN_CONSOLE(" {:>10} {:>12L} {}", NiceBytes(BlockSize), static_cast(OpsPerSec), ThroughputStr); } } void BenchDiskSubCmd::RunSync(const std::filesystem::path& Dir) { // Each iteration writes one page to dirty it, then times a full fsync. // Single-threaded; runs until the deadline rather than a fixed count. static constexpr uint64_t kWriteSize = 4 * 1024; // bytes dirtied per sync static constexpr uint64_t kFileSize = 4 * 1024 * 1024; // cycling window ZEN_CONSOLE("Sync Latency (duration={}s, write={}/op)", m_SyncDurationSec, NiceBytes(kWriteSize)); std::filesystem::path FilePath = Dir / "bench_sync.tmp"; BasicFile File(FilePath, BasicFile::Mode::kTruncate); File.SetFileSize(kFileSize); static constexpr uint64_t kNumSlots = kFileSize / kWriteSize; uint8_t WriteBuf[kWriteSize]; std::memset(WriteBuf, 0xEF, kWriteSize); // Collect latencies in microseconds for sub-millisecond resolution std::vector LatenciesUs; auto Deadline = std::chrono::steady_clock::now() + std::chrono::seconds(m_SyncDurationSec); uint64_t SlotIndex = 0; Stopwatch TotalTimer; while (std::chrono::steady_clock::now() < Deadline && !s_BenchAbort.load(std::memory_order_relaxed)) { File.Write(WriteBuf, kWriteSize, (SlotIndex % kNumSlots) * kWriteSize); ++SlotIndex; Stopwatch SyncTimer; File.Flush(); LatenciesUs.push_back(SyncTimer.GetElapsedTimeUs()); } double TotalElapsedSec = TotalTimer.GetElapsedTimeUs() / 1'000'000.0; File.Close(); if (LatenciesUs.empty()) { ZEN_CONSOLE(" No samples collected"); return; } std::sort(LatenciesUs.begin(), LatenciesUs.end()); auto PercentileMs = [&](double Pct) -> double { size_t Index = std::min(static_cast(LatenciesUs.size() * Pct / 100.0), LatenciesUs.size() - 1); return LatenciesUs[Index] / 1000.0; }; double SumUs = 0.0; for (uint64_t L : LatenciesUs) { SumUs += static_cast(L); } double MeanMs = SumUs / static_cast(LatenciesUs.size()) / 1000.0; double SyncsPs = TotalElapsedSec > 0.0 ? static_cast(LatenciesUs.size()) / TotalElapsedSec : 0.0; ZEN_CONSOLE(" Count : {:L} syncs ({:.1f} syncs/s)", LatenciesUs.size(), SyncsPs); ZEN_CONSOLE(" Latency: min={:.3f}ms mean={:.3f}ms p50={:.3f}ms p95={:.3f}ms p99={:.3f}ms max={:.3f}ms", LatenciesUs.front() / 1000.0, MeanMs, PercentileMs(50), PercentileMs(95), PercentileMs(99), LatenciesUs.back() / 1000.0); } void BenchDiskSubCmd::CleanupSyncFiles(const std::filesystem::path& Dir) { std::error_code Ec; std::filesystem::remove(Dir / "bench_sync.tmp", Ec); } ////////////////////////////////////////////////////////////////////////// // BenchCommand BenchCommand::BenchCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("__hidden__", "", "subcommand", "", cxxopts::value(m_SubCommand)->default_value(""), ""); m_Options.parse_positional({"subcommand"}); AddSubCommand(m_PurgeSubCmd); AddSubCommand(m_HttpSubCmd); AddSubCommand(m_DiskSubCmd); } BenchCommand::~BenchCommand() = default; } // namespace zen