diff options
| author | Stefan Boberg <[email protected]> | 2023-10-20 13:25:58 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-20 13:25:58 +0200 |
| commit | 733dce4c722f101635d60709613d7c97c3bd30df (patch) | |
| tree | 548cfddd0329a2be1e11ddb0b08ce4ec7b238950 /src | |
| parent | clean up GcContributor and GcStorage to be pure interfaces (#485) (diff) | |
| download | zen-733dce4c722f101635d60709613d7c97c3bd30df.tar.xz zen-733dce4c722f101635d60709613d7c97c3bd30df.zip | |
Cache (rpc) activitity recording improvements (#482)
this adds a new RPC recording path aimed at more continuous recording and analysis of recorded sessions
the new strategy is implemented alongside the original in order to retain the ability to read the older format
the main difference between v2 and v1 is that the new strategy splits the recording into segments which are independent from each other. This is done to enable long running sessions with automatic disk cleanup (not implemented yet), appending to an existing recording (not implemented) and/or partial analysis and processing. The recorder will start a new segment when some criteria is fulfilled, including the number of files in the segment directory, disk footprint etc
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/rpcreplay_cmd.cpp | 123 | ||||
| -rw-r--r-- | src/zen/cmds/rpcreplay_cmd.h | 25 | ||||
| -rw-r--r-- | src/zencore/include/zencore/string.h | 2 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpclient.h | 2 | ||||
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 25 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 7 | ||||
| -rw-r--r-- | src/zenstore/caslog.cpp | 4 | ||||
| -rw-r--r-- | src/zenutil/cache/rpcrecording.cpp | 701 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cache/rpcrecording.h | 23 |
9 files changed, 816 insertions, 96 deletions
diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp index 9e43280e1..d2cfc13ac 100644 --- a/src/zen/cmds/rpcreplay_cmd.cpp +++ b/src/zen/cmds/rpcreplay_cmd.cpp @@ -6,6 +6,7 @@ #include <zencore/filesystem.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> +#include <zencore/session.h> #include <zencore/stream.h> #include <zencore/timer.h> #include <zencore/workthreadpool.h> @@ -106,6 +107,7 @@ RpcReplayCommand::RpcReplayCommand() m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); m_Options.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), "<path>"); + m_Options.add_option("", "", "dry", "Do a dry run", cxxopts::value(m_DryRun), "<enable>"); m_Options.add_option("", "w", "numthreads", @@ -192,6 +194,13 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException("Rpc replay command requires a path"); } + if (!std::filesystem::exists(m_RecordingPath) || !std::filesystem::is_directory(m_RecordingPath)) + { + throw std::runtime_error(fmt::format("could not find recording at '{}'", m_RecordingPath)); + } + + Stopwatch TotalTimer; + if (m_OnHost) { cpr::Session Session; @@ -254,20 +263,30 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) [this, &WorkLatch, EntryCount, &EntryOffset, &Replayer, &BytesSent, &BytesReceived, &MethodTypes, &MethodTypesLock]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + std::map<std::string, size_t> LocalMethodTypes; + + auto ReduceTypes = MakeGuard([&] { + RwLock::ExclusiveLockScope __(MethodTypesLock); + + for (auto& Entry : LocalMethodTypes) + { + MethodTypes[Entry.first] += Entry.second; + } + }); + cpr::Session Session; Session.SetUrl(fmt::format("{}/z$/$rpc"sv, m_HostName)); uint64_t EntryIndex = EntryOffset.fetch_add(m_Stride); while (EntryIndex < EntryCount) { - IoBuffer Payload; - std::pair<ZenContentType, ZenContentType> Types = Replayer->GetRequest(EntryIndex, Payload); - ZenContentType RequestContentType = Types.first; - ZenContentType AcceptContentType = Types.second; + IoBuffer Payload; + zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload); CbPackage RequestPackage; CbObject Request; - switch (RequestContentType) + + switch (RequestInfo.ContentType) { case ZenContentType::kCbPackage: { @@ -289,6 +308,7 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) int AdjustedPid = 0; RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone; + if (!m_DisableLocalRefs) { if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || m_ForceAllowLocalRefs) @@ -314,15 +334,14 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (m_ShowMethodStats) { - std::string MethodName = std::string(Request["Method"sv].AsString()); - RwLock::ExclusiveLockScope __(MethodTypesLock); - if (auto It = MethodTypes.find(MethodName); It != MethodTypes.end()) + std::string MethodName = std::string(Request["Method"sv].AsString()); + if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end()) { It->second++; } else { - MethodTypes[MethodName] = 1; + LocalMethodTypes[MethodName] = 1; } } @@ -356,7 +375,7 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions)); } - if (RequestContentType == ZenContentType::kCbPackage) + if (RequestInfo.ContentType == ZenContentType::kCbPackage) { RequestPackage.SetObject(RequestCopyWriter.Save()); std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage); @@ -371,27 +390,44 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } - Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestContentType))}, - {"Accept", std::string(MapContentTypeToString(AcceptContentType))}}); - uint64_t Offset = 0; - auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, Payload.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; - }; - Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - cpr::Response Response = Session.Post(); - BytesSent.fetch_add(Payload.GetSize()); - if (Response.error || !(IsHttpSuccessCode(Response.status_code) || - Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound))) + if (!m_DryRun) { - ZEN_CONSOLE("{}", FormatHttpResponse(Response)); - break; + StringBuilder<32> SessionIdString; + + if (RequestInfo.SessionId != Oid::Zero) + { + RequestInfo.SessionId.ToString(SessionIdString); + } + else + { + GetSessionId().ToString(SessionIdString); + } + + Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestInfo.ContentType))}, + {"Accept", std::string(MapContentTypeToString(RequestInfo.AcceptType))}, + {"UE-Session", std::string(SessionIdString)}}); + + uint64_t Offset = 0; + auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); + MutableMemoryView Data(buffer, size); + Data.CopyFrom(PayloadRange.GetView()); + Offset += size; + return true; + }; + Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + cpr::Response Response = Session.Post(); + BytesSent.fetch_add(Payload.GetSize()); + if (Response.error || !(IsHttpSuccessCode(Response.status_code) || + Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound))) + { + ZEN_CONSOLE("{}", FormatHttpResponse(Response)); + break; + } + BytesReceived.fetch_add(Response.downloaded_bytes); } - BytesReceived.fetch_add(Response.downloaded_bytes); + EntryIndex = EntryOffset.fetch_add(m_Stride); } }); @@ -399,38 +435,41 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) while (!WorkLatch.Wait(1000)) { - ZEN_CONSOLE("Processing {} requests, {} remaining (sent {}, recevied {})...", - (EntryCount - m_Offset) / m_Stride, - (EntryCount - EntryOffset.load()) / m_Stride, + const uint64_t RequestsTotal = (EntryCount - m_Offset) / m_Stride; + const uint64_t RequestsRemaining = (EntryCount - EntryOffset.load()) / m_Stride; + + ZEN_CONSOLE("[{:3}%] [{}] {} requests, {} remaining (sent {}, received {})", + (RequestsTotal - RequestsRemaining) * 100 / RequestsTotal, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + RequestsTotal, + RequestsRemaining, NiceBytes(BytesSent.load()), NiceBytes(BytesReceived.load())); } + if (m_ShowMethodStats) { for (const auto& It : MethodTypes) { - ZEN_CONSOLE("{}: {}", It.first, It.second); + ZEN_CONSOLE("{:18}: {:10}", It.first, It.second); } } } const uint64_t RequestsSent = (EntryOffset.load() - m_Offset) / m_Stride; const uint64_t ElapsedMS = Timer.GetElapsedTimeMs(); - const double ElapsedS = ElapsedMS / 1000.500; const uint64_t Sent = BytesSent.load(); const uint64_t Received = BytesReceived.load(); - const uint64_t RequestsPerS = static_cast<uint64_t>(RequestsSent / ElapsedS); - const uint64_t SentPerS = static_cast<uint64_t>(Sent / ElapsedS); - const uint64_t ReceivedPerS = static_cast<uint64_t>(Received / ElapsedS); - ZEN_CONSOLE("Requests sent {} ({}/s), payloads sent {}B ({}B/s), payloads received {}B ({}B/s) in {}", + ZEN_CONSOLE("Processed requests: {} ({}), payloads sent {} ({}), payloads received {} ({}) in {}.\nTotal runtime: {}", RequestsSent, - RequestsPerS, + NiceRate(RequestsSent, ElapsedMS, "req"), NiceBytes(Sent), - NiceBytes(SentPerS), + NiceByteRate(Sent, ElapsedMS), NiceBytes(Received), - NiceBytes(ReceivedPerS), - NiceTimeSpanMs(ElapsedMS)); + NiceByteRate(Received, ElapsedMS), + NiceTimeSpanMs(ElapsedMS), + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs())); return 0; } diff --git a/src/zen/cmds/rpcreplay_cmd.h b/src/zen/cmds/rpcreplay_cmd.h index 742e5ec5b..e1c2831a5 100644 --- a/src/zen/cmds/rpcreplay_cmd.h +++ b/src/zen/cmds/rpcreplay_cmd.h @@ -48,18 +48,19 @@ private: cxxopts::Options m_Options{"rpc-record-replay", "Replays a previously recorded session of cache rpc requests to a target host"}; std::string m_HostName; std::string m_RecordingPath; - bool m_OnHost = false; - bool m_ShowMethodStats = false; - int m_ProcessCount; - int m_ThreadCount; - uint64_t m_Offset; - uint64_t m_Stride; - bool m_ForceAllowLocalRefs; - bool m_DisableLocalRefs; - bool m_ForceAllowLocalHandleRef; - bool m_DisableLocalHandleRefs; - bool m_ForceAllowPartialLocalRefs; - bool m_DisablePartialLocalRefs; + bool m_OnHost = false; + bool m_ShowMethodStats = false; + int m_ProcessCount = 1; + int m_ThreadCount = 0; + uint64_t m_Offset = 0; + uint64_t m_Stride = 1; + bool m_ForceAllowLocalRefs = false; + bool m_DisableLocalRefs = false; + bool m_ForceAllowLocalHandleRef = false; + bool m_DisableLocalHandleRefs = false; + bool m_ForceAllowPartialLocalRefs = false; + bool m_DisablePartialLocalRefs = false; + bool m_DryRun = false; }; } // namespace zen diff --git a/src/zencore/include/zencore/string.h b/src/zencore/include/zencore/string.h index 2319d7ade..2b61b3954 100644 --- a/src/zencore/include/zencore/string.h +++ b/src/zencore/include/zencore/string.h @@ -690,7 +690,7 @@ struct NiceTimeSpanMs : public NiceBase ////////////////////////////////////////////////////////////////////////// inline std::string -NiceRate(uint64_t Num, uint32_t DurationMilliseconds, const char* Unit = "B") +NiceRate(uint64_t Num, uint64_t DurationMilliseconds, const char* Unit = "B") { char Buffer[32]; diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h index 044a3162e..0663abc43 100644 --- a/src/zenhttp/include/zenhttp/httpclient.h +++ b/src/zenhttp/include/zenhttp/httpclient.h @@ -87,7 +87,7 @@ public: // The number of bytes sent as part of the request int64_t UploadedBytes; - // The number of bytes recevied as part of the response + // The number of bytes received as part of the response int64_t DownloadedBytes; // The elapsed time in seconds for the request to execute diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 4ec7c56db..91f895b0b 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1691,16 +1691,22 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { WorkerPool.ScheduleWork([this, &Context, &JobLatch, &Replayer, RequestIndex]() { - IoBuffer Body; - std::pair<ZenContentType, ZenContentType> ContentType = Replayer.GetRequest(RequestIndex, Body); + IoBuffer Body; + zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body); + if (Body) { uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; int TargetPid = 0; CbPackage RpcResult; - if (IsHttpSuccessCode( - HandleRpcRequest(Context, ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult))) + if (IsHttpSuccessCode(HandleRpcRequest(Context, + RequestInfo.ContentType, + std::move(Body), + AcceptMagic, + AcceptFlags, + TargetPid, + RpcResult))) { if (AcceptMagic == kCbPkgMagic) { @@ -1764,8 +1770,15 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) auto HandleRpc = [this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { - std::uint64_t RequestIndex = - m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull; + uint64_t RequestIndex = ~0ull; + + if (m_RequestRecorder) + { + RequestIndex = m_RequestRecorder->RecordRequest( + {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, + Body); + } + uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; int TargetProcessId = 0; diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 152a47584..6a937089d 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -268,10 +268,11 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen m_VfsService->AddService(Ref<ZenCacheStore>(m_CacheStore)); m_Http->RegisterService(*m_VfsService); - ZEN_INFO("initializing GC, enabled '{}', interval {}s, lightweight interval {}s", + ZEN_INFO("initializing GC, enabled '{}', interval {}, lightweight interval {}", ServerOptions.GcConfig.Enabled, - ServerOptions.GcConfig.IntervalSeconds, - ServerOptions.GcConfig.LightweightIntervalSeconds); + NiceTimeSpanMs(ServerOptions.GcConfig.IntervalSeconds * 1000ull), + NiceTimeSpanMs(ServerOptions.GcConfig.LightweightIntervalSeconds * 1000ull)); + GcSchedulerConfig GcConfig{.RootDirectory = m_DataRoot / "gc", .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds), .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), diff --git a/src/zenstore/caslog.cpp b/src/zenstore/caslog.cpp index 2a978ae12..c04324fbc 100644 --- a/src/zenstore/caslog.cpp +++ b/src/zenstore/caslog.cpp @@ -2,16 +2,12 @@ #include <zenstore/caslog.h> -#include "compactcas.h" - #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/memory.h> #include <zencore/string.h> -#include <zencore/thread.h> -#include <zencore/uid.h> #include <xxhash.h> diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index 4958a27f6..054ac0e56 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -1,5 +1,9 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zencore/compactbinarybuilder.h> +#include <zencore/filesystem.h> +#include <zencore/logging.h> +#include <zencore/system.h> #include <zenutil/basicfile.h> #include <zenutil/cache/rpcrecording.h> @@ -9,6 +13,15 @@ ZEN_THIRD_PARTY_INCLUDES_START ZEN_THIRD_PARTY_INCLUDES_END namespace zen::cache { + +const RecordedRequestInfo RecordedRequestInfo::NullRequest = {.ContentType = ZenContentType::kUnknownContentType, + .AcceptType = ZenContentType::kUnknownContentType, + .SessionId = Oid::Zero}; + +} + +namespace zen::cache::v1 { + struct RecordedRequest { uint64_t Offset; @@ -17,7 +30,8 @@ struct RecordedRequest ZenContentType AcceptType; }; -const uint64_t RecordedRequestBlockSize = 1ull << 31u; +const uint64_t RecordedRequestBlockSize = 1ull << 31u; +const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; struct RecordedRequestsWriter { @@ -32,6 +46,31 @@ struct RecordedRequestsWriter RwLock::ExclusiveLockScope _(m_Lock); m_BlockFiles.clear(); + try + { + // Emit some metadata alongside the recording + + DateTime EndTime = DateTime::Now(); + TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()}; + + CbObjectWriter Cbo; + Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration; + Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest); + Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold; + + Cbo.BeginObject("system_info"); + Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); + Describe(GetSystemMetrics(), Cbo); + Cbo.EndObject(); + CbObject Metadata = Cbo.Save(); + + WriteFile(m_BasePath / "rpc_recording_metadata.zcb", Metadata.GetBuffer().AsIoBuffer()); + } + catch (std::exception& Ex) + { + ZEN_WARN("caught exception while generating metadata for RPC recording: {}", Ex.what()); + } + IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); BasicFile IndexFile; IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); @@ -41,15 +80,18 @@ struct RecordedRequestsWriter m_Entries.clear(); } - uint64_t WriteRequest(ZenContentType ContentType, ZenContentType AcceptType, const IoBuffer& RequestBuffer) + uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { RwLock::ExclusiveLockScope Lock(m_Lock); uint64_t RequestIndex = m_Entries.size(); - RecordedRequest& Entry = m_Entries.emplace_back( - RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType, .AcceptType = AcceptType}); - if (Entry.Length < 1 * 1024 * 1024) + RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull, + .Length = RequestBuffer.Size(), + .ContentType = RequestInfo.ContentType, + .AcceptType = RequestInfo.AcceptType}); + + if (Entry.Length < StandaloneFileSizeThreshold) { - uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); + const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); if (BlockIndex == m_BlockFiles.size()) { std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); @@ -92,6 +134,7 @@ struct RecordedRequestsWriter std::vector<RecordedRequest> m_Entries; std::vector<std::unique_ptr<BasicFile>> m_BlockFiles; uint64_t m_ChunkOffset; + zen::DateTime m_StartTime = DateTime::Now(); }; struct RecordedRequestsReader @@ -128,26 +171,31 @@ struct RecordedRequestsReader } void EndRead() { m_BlockFiles.clear(); } - std::pair<ZenContentType, ZenContentType> ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const + RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const { if (RequestIndex >= m_Entries.size()) { - return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType}; + return RecordedRequestInfo::NullRequest; } + const RecordedRequest& Entry = m_Entries[RequestIndex]; if (Entry.Length == 0) { - return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType}; + return RecordedRequestInfo::NullRequest; } + if (Entry.Offset != ~0ull) { uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); - return {Entry.ContentType, Entry.AcceptType}; } - OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); - return {Entry.ContentType, Entry.AcceptType}; + else + { + OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); + } + + return {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Oid::Zero}; } std::filesystem::path m_BasePath; @@ -162,14 +210,13 @@ public: virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } private: - virtual uint64_t RecordRequest(const ZenContentType ContentType, - const ZenContentType AcceptType, - const IoBuffer& RequestBuffer) override + virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override { - return m_RecordedRequests.WriteRequest(ContentType, AcceptType, RequestBuffer); + return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); } - virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} - virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} + virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} + virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} + RecordedRequestsWriter m_RecordedRequests; }; @@ -185,7 +232,7 @@ public: private: virtual uint64_t GetRequestCount() const override { return m_RequestCount; } - virtual std::pair<ZenContentType, ZenContentType> GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override + virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override { return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); } @@ -195,16 +242,628 @@ private: RecordedRequestsReader m_RequestBuffer; }; +} // namespace zen::cache::v1 + +////////////////////////////////////////////////////////////////////////// + +namespace zen::cache::v2 { + +using namespace std::literals; + +struct RecordedRequest +{ + uint32_t Offset; // 4 bytes + uint32_t Length; // 4 bytes + ZenContentType ContentType; // 1 byte + ZenContentType AcceptType; // 1 byte + uint8_t Padding; // 1 byte + uint8_t Padding2; // 1 byte + Oid SessionId; // 12 bytes +}; + +static_assert(sizeof(RecordedRequest) == 24); + +const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB +const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; // 1MiB +const uint64_t SegmentRequestCount = 10 * 1000 * 1000; +const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the + // number of files in a directory below this level + // for performance +const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024; +const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0}; + +struct RecordedRequestsSegmentWriter +{ + RecordedRequestsSegmentWriter() = default; + ~RecordedRequestsSegmentWriter() = default; + + RecordedRequestsSegmentWriter(const RecordedRequestsSegmentWriter&) = delete; + RecordedRequestsSegmentWriter& operator=(const RecordedRequestsSegmentWriter&) = delete; + + void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex); + uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); + void EndWrite(); + + inline uint64_t GetRequestCount() const + { + if (m_RequestCount) + { + return m_RequestCount; + } + + return m_Entries.size(); + } + inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; } + inline uint64_t GetSegmentIndex() const { return m_SegmentIndex; } + inline uint64_t GetFileCount() const { return m_FileCount; } + inline uint64_t GetRequestsByteCount() const { return m_RequestsByteCount; } + inline DateTime GetStartTime() const { return m_StartTime; } + inline DateTime GetEndTime() const { return m_EndTime; } + +private: + std::filesystem::path m_BasePath; + uint64_t m_SegmentIndex = 0; + uint64_t m_RequestBaseIndex = 0; + uint64_t m_RequestCount = 0; + std::atomic_uint64_t m_FileCount{}; + std::atomic_uint64_t m_RequestsByteCount{}; + mutable RwLock m_Lock; + std::vector<RecordedRequest> m_Entries; + std::vector<std::unique_ptr<BasicFile>> m_BlockFiles; + uint64_t m_ChunkOffset; + DateTime m_StartTime = DateTime::Now(); + DateTime m_EndTime = DateTime::Now(); +}; + +struct RecordedRequestsWriter +{ + void BeginWrite(const std::filesystem::path& BasePath); + RecordedRequestsSegmentWriter& EnsureCurrentSegment(); + void CommitCurrentSegment(RwLock::ExclusiveLockScope&); + void EndWrite(); + uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); + +private: + std::filesystem::path m_BasePath; + zen::DateTime m_StartTime = DateTime::Now(); + + mutable RwLock m_Lock; + std::unique_ptr<RecordedRequestsSegmentWriter> m_CurrentWriter; + std::vector<std::unique_ptr<RecordedRequestsSegmentWriter>> m_FinishedSegments; + std::vector<uint64_t> m_SegmentBaseIndex; + uint64_t m_NextSegmentBaseIndex = 0; +}; + +////////////////////////////////////////////////////////////////////////// + +struct RecordedRequestsSegmentReader +{ + RecordedRequestsSegmentReader() = default; + + RecordedRequestsSegmentReader(const RecordedRequestsSegmentReader&) = delete; + RecordedRequestsSegmentReader& operator=(const RecordedRequestsSegmentReader&) = delete; + + uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory); + void EndRead(); + RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const; + +private: + std::filesystem::path m_BasePath; + std::vector<RecordedRequest> m_Entries; + std::vector<IoBuffer> m_BlockFiles; +}; + +struct RecordedRequestsReader +{ + uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory); + void EndRead(); + RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const; + +private: + struct SegmentInfo + { + uint64_t SegmentIndex; + uint64_t BaseRequestIndex; + uint64_t RequestCount; + uint64_t RequestBytes; + DateTime StartTime{0}; + DateTime EndTime{0}; + }; + + bool m_InMemory = false; + std::filesystem::path m_BasePath; + std::vector<SegmentInfo> m_KnownSegments; + + mutable RwLock m_SegmentLock; + mutable std::vector<std::unique_ptr<RecordedRequestsSegmentReader>> m_SegmentReaders; +}; + +////////////////////////////////////////////////////////////////////////// + +void +RecordedRequestsSegmentWriter::BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex) +{ + m_BasePath = BasePath; + m_SegmentIndex = SegmentIndex; + m_RequestBaseIndex = RequestBaseIndex; + std::filesystem::create_directories(m_BasePath); +} + +void +RecordedRequestsSegmentWriter::EndWrite() +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_RequestCount = m_Entries.size(); + m_BlockFiles.clear(); + + // Emit some metadata alongside the recording + + try + { + m_EndTime = DateTime::Now(); + TimeSpan Duration{m_EndTime.GetTicks() - m_StartTime.GetTicks()}; + + CbObjectWriter Cbo; + Cbo << "time_start" << m_StartTime << "time_end" << m_EndTime << "duration" << Duration; + Cbo << "segment_index" << m_SegmentIndex; + Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest); + Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold; + + Cbo.BeginObject("system_info"); + Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); + Cbo.EndObject(); + CbObject Metadata = Cbo.Save(); + + WriteFile(m_BasePath / "rpc_segment_info.zcb", Metadata.GetBuffer().AsIoBuffer()); + } + catch (std::exception& Ex) + { + ZEN_WARN("caught exception while writing segment metadata for RPC recording: {}", Ex.what()); + } + + IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); + BasicFile IndexFile; + IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); + std::error_code Ec; + IndexFile.WriteAll(IndexBuffer, Ec); + IndexFile.Close(); + m_Entries.clear(); +} + +uint64_t +RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) +{ + const uint64_t RequestBufferSize = RequestBuffer.GetSize(); + + RwLock::ExclusiveLockScope Lock(m_Lock); + uint64_t RequestIndex = m_Entries.size(); + RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u, + .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), + .ContentType = RequestInfo.ContentType, + .AcceptType = RequestInfo.AcceptType, + .Padding = 0, + .Padding2 = 0, + .SessionId = RequestInfo.SessionId}); + + if (Entry.Length < StandaloneFileSizeThreshold) + { + const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); + + if (BlockIndex == m_BlockFiles.size()) + { + std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); + NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); + m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; + ++m_FileCount; + } + + ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); + BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); + ZEN_ASSERT(BlockFile != nullptr); + + Entry.Offset = uint32_t(m_ChunkOffset & 0xffffFFFF); + m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); + Lock.ReleaseNow(); + + std::error_code Ec; + BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); + if (Ec) + { + Entry.Length = 0; + return ~0ull; + } + + m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); + + return RequestIndex; + } + Lock.ReleaseNow(); + + // Write request data to standalone file + + try + { + BasicFile RequestFile; + RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); + + if (RequestBufferSize > std::numeric_limits<uint32_t>::max()) + { + // The exact value of the entry is not important, we will use + // the size of the standalone file regardless when performing + // the read + Entry.Length = std::numeric_limits<uint32_t>::max(); + } + + ++m_FileCount; + + std::error_code Ec; + RequestFile.WriteAll(RequestBuffer, Ec); + + if (Ec) + { + Entry.Length = 0; + return ~0ull; + } + + m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); + + return RequestIndex; + } + catch (std::exception&) + { + Entry.Length = 0; + return ~0ull; + } +} + +uint64_t +RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory) +{ + m_BasePath = BasePath; + BasicFile IndexFile; + IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); + m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); + IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); + uint64_t MaxChunkPosition = 0; + for (const RecordedRequest& R : m_Entries) + { + if (R.Offset != ~0u) + { + MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); + } + } + uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1; + m_BlockFiles.resize(BlockCount); + for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) + { + if (InMemory) + { + BasicFile Chunk; + Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead); + m_BlockFiles[BlockIndex] = Chunk.ReadAll(); + continue; + } + m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex)); + } + return m_Entries.size(); +} +void +RecordedRequestsSegmentReader::EndRead() +{ + m_BlockFiles.clear(); +} + +RecordedRequestInfo +RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const +{ + if (RequestIndex >= m_Entries.size()) + { + return RecordedRequestInfo::NullRequest; + } + const RecordedRequest& Entry = m_Entries[RequestIndex]; + if (Entry.Length == 0) + { + return RecordedRequestInfo::NullRequest; + } + + RecordedRequestInfo RequestInfo = {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Entry.SessionId}; + + if (Entry.Offset != ~0u) + { + // Inline in block file + uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); + uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); + OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); + + return RequestInfo; + } + + // Standalone file + OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); + + return RequestInfo; +} + +void +RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath) +{ + m_BasePath = BasePath; + EnsureCurrentSegment(); +} + +RecordedRequestsSegmentWriter& +RecordedRequestsWriter::EnsureCurrentSegment() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + if (m_CurrentWriter) + { + bool StartNewSegment = false; + + if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount) + { + ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount); + StartNewSegment = true; + } + else if (m_CurrentWriter->GetFileCount() >= LooseFileThreshold) + { + ZEN_DEBUG("starting new RPC recording segment due to file count >= {}", LooseFileThreshold); + StartNewSegment = true; + } + else if (m_CurrentWriter->GetRequestsByteCount() >= SegmentByteThreshold) + { + ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold); + StartNewSegment = true; + } + + if (StartNewSegment) + { + CommitCurrentSegment(_); + } + } + + if (!m_CurrentWriter) + { + const uint64_t SegmentIndex = m_FinishedSegments.size(); + m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>(); + + m_CurrentWriter->BeginWrite(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex); + } + + return *m_CurrentWriter; +} + +void +RecordedRequestsWriter::CommitCurrentSegment(RwLock::ExclusiveLockScope&) +{ + if (m_CurrentWriter) + { + RecordedRequestsSegmentWriter& Writer = *m_CurrentWriter; + m_FinishedSegments.push_back(std::move(m_CurrentWriter)); + m_SegmentBaseIndex.push_back(m_NextSegmentBaseIndex); + m_NextSegmentBaseIndex += Writer.GetRequestCount(); + + Writer.EndWrite(); + } +} + +void +RecordedRequestsWriter::EndWrite() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + CommitCurrentSegment(_); + + // Emit some metadata alongside the recording + + try + { + DateTime EndTime = DateTime::Now(); + TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()}; + + CbObjectWriter Cbo; + Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration << "format_version" << 2; + + Cbo.BeginObject("system_info"); + Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); + Cbo << "segment_count" << m_FinishedSegments.size(); + Cbo << "block_size" << RecordedRequestBlockSize; + Cbo << "standalone_threshold" << StandaloneFileSizeThreshold; + Cbo << "segment_request_count" << SegmentRequestCount; + Cbo << "segment_file_threshold" << LooseFileThreshold; + Cbo << "segment_byte_threshold" << SegmentByteThreshold; + + Describe(GetSystemMetrics(), Cbo); + Cbo.EndObject(); + + Cbo.BeginArray("segments"); + + for (auto& Segment : m_FinishedSegments) + { + Cbo.BeginObject(); + Cbo << "segment" << Segment->GetSegmentIndex(); + Cbo << "base_index" << Segment->GetBaseRequestIndex(); + Cbo << "request_count" << Segment->GetRequestCount(); + Cbo << "request_bytes" << Segment->GetRequestsByteCount(); + Cbo << "start_time" << Segment->GetStartTime(); + Cbo << "end_time" << Segment->GetEndTime(); + Cbo.EndObject(); + } + + Cbo.EndArray(); + + CbObject Metadata = Cbo.Save(); + + WriteFile(m_BasePath / "rpc_recording_info.zcb", Metadata.GetBuffer().AsIoBuffer()); + } + catch (std::exception& Ex) + { + ZEN_WARN("caught exception while writing metadata for RPC recording: {}", Ex.what()); + } +} + +uint64_t +RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) +{ + RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); + + const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer); + + return Writer.GetBaseRequestIndex() + SegmentLocalIndex; +} + +////////////////////////////////////////////////////////////////////////// + +uint64_t +RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory) +{ + m_InMemory = InMemory; + m_BasePath = BasePath; + + BasicFile InfoFile; + InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead); + CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll()); + + uint64_t TotalRequestCount = 0; + uint64_t MaxSegmentIndex = 0; + + for (auto SegmentElement : CbInfo["segments"]) + { + CbObjectView Segment = SegmentElement.AsObjectView(); + + const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(), + .BaseRequestIndex = Segment["base_index"sv].AsUInt64(), + .RequestCount = Segment["request_count"sv].AsUInt64(), + .RequestBytes = Segment["request_bytes"sv].AsUInt64(), + .StartTime = Segment["start_time"sv].AsDateTime(), + .EndTime = Segment["end_time"sv].AsDateTime()}); + + TotalRequestCount += Info.RequestCount; + MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + } + + m_SegmentReaders.resize(MaxSegmentIndex + 1); + + return TotalRequestCount; +} + +void +RecordedRequestsReader::EndRead() +{ + RwLock::ExclusiveLockScope _(m_SegmentLock); + m_KnownSegments.clear(); + m_SegmentReaders.clear(); + m_BasePath.clear(); +} + +RecordedRequestInfo +RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const +{ + auto EnsureSegment = [&](uint64_t SegmentIndex) -> const RecordedRequestsSegmentReader& { + { + RwLock::SharedLockScope _(m_SegmentLock); + + if (auto SegmentReaderPtr = m_SegmentReaders[SegmentIndex].get()) + { + return *SegmentReaderPtr; + } + } + + RwLock::ExclusiveLockScope _(m_SegmentLock); + + auto& SegmentReaderPtr = m_SegmentReaders[SegmentIndex]; + + if (!SegmentReaderPtr) + { + RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader; + NewSegment->BeginRead(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), m_InMemory); + SegmentReaderPtr.reset(NewSegment); + } + + return *(SegmentReaderPtr.get()); + }; + + for (auto& Segment : m_KnownSegments) + { + if (Segment.RequestCount > RequestIndex) + { + return EnsureSegment(Segment.SegmentIndex).ReadRequest(RequestIndex, OutBuffer); + } + else + { + RequestIndex -= Segment.RequestCount; + } + } + + return RecordedRequestInfo::NullRequest; +} + +////////////////////////////////////////////////////////////////////////// + +class DiskRequestRecorder : public IRpcRequestRecorder +{ +public: + DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } + virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } + +private: + virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override + { + return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); + } + virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} + virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} + + RecordedRequestsWriter m_RecordedRequests; +}; + +class DiskRequestReplayer : public IRpcRequestReplayer +{ +public: + DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) + { + m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory); + } + virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } + + static bool IsCompatible(const std::filesystem::path& BasePath) { return std::filesystem::exists(BasePath / "rpc_recording_info.zcb"); } + +private: + virtual uint64_t GetRequestCount() const override { return m_RequestCount; } + + virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override + { + return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); + } + virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } + + std::uint64_t m_RequestCount; + RecordedRequestsReader m_RequestBuffer; +}; + +} // namespace zen::cache::v2 + +////////////////////////////////////////////////////////////////////////// + +namespace zen::cache { + std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath) { - return std::make_unique<DiskRequestRecorder>(BasePath); + return std::make_unique<v2::DiskRequestRecorder>(BasePath); } std::unique_ptr<cache::IRpcRequestReplayer> MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) { - return std::make_unique<DiskRequestReplayer>(BasePath, InMemory); + if (v2::DiskRequestReplayer::IsCompatible(BasePath)) + { + return std::make_unique<v2::DiskRequestReplayer>(BasePath, InMemory); + } + else + { + return std::make_unique<v1::DiskRequestReplayer>(BasePath, InMemory); + } } } // namespace zen::cache diff --git a/src/zenutil/include/zenutil/cache/rpcrecording.h b/src/zenutil/include/zenutil/cache/rpcrecording.h index 6d65a532a..fd5df26ad 100644 --- a/src/zenutil/include/zenutil/cache/rpcrecording.h +++ b/src/zenutil/include/zenutil/cache/rpcrecording.h @@ -6,21 +6,32 @@ #include <zencore/iobuffer.h> namespace zen::cache { + +struct RecordedRequestInfo +{ + ZenContentType ContentType; + ZenContentType AcceptType; + Oid SessionId; + + static const RecordedRequestInfo NullRequest; +}; + class IRpcRequestRecorder { public: virtual ~IRpcRequestRecorder() {} - virtual uint64_t RecordRequest(const ZenContentType ContentType, const ZenContentType AcceptType, const IoBuffer& RequestBuffer) = 0; - virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const IoBuffer& ResponseBuffer) = 0; - virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const CompositeBuffer& ResponseBuffer) = 0; + virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) = 0; + virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const IoBuffer& ResponseBuffer) = 0; + virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const CompositeBuffer& ResponseBuffer) = 0; }; + class IRpcRequestReplayer { public: virtual ~IRpcRequestReplayer() {} - virtual uint64_t GetRequestCount() const = 0; - virtual std::pair<ZenContentType, ZenContentType> GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; - virtual ZenContentType GetResponse(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; + virtual uint64_t GetRequestCount() const = 0; + virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; + virtual ZenContentType GetResponse(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; }; std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath); |