diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/rpcreplay_cmd.cpp | 123 | ||||
| -rw-r--r-- | src/zen/cmds/rpcreplay_cmd.h | 25 | ||||
| -rw-r--r-- | src/zencore/include/zencore/string.h | 2 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpclient.h | 2 | ||||
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 25 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 7 | ||||
| -rw-r--r-- | src/zenstore/caslog.cpp | 4 | ||||
| -rw-r--r-- | src/zenutil/cache/rpcrecording.cpp | 701 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cache/rpcrecording.h | 23 |
9 files changed, 816 insertions, 96 deletions
diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp index 9e43280e1..d2cfc13ac 100644 --- a/src/zen/cmds/rpcreplay_cmd.cpp +++ b/src/zen/cmds/rpcreplay_cmd.cpp @@ -6,6 +6,7 @@ #include <zencore/filesystem.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> +#include <zencore/session.h> #include <zencore/stream.h> #include <zencore/timer.h> #include <zencore/workthreadpool.h> @@ -106,6 +107,7 @@ RpcReplayCommand::RpcReplayCommand() m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); m_Options.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), "<path>"); + m_Options.add_option("", "", "dry", "Do a dry run", cxxopts::value(m_DryRun), "<enable>"); m_Options.add_option("", "w", "numthreads", @@ -192,6 +194,13 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException("Rpc replay command requires a path"); } + if (!std::filesystem::exists(m_RecordingPath) || !std::filesystem::is_directory(m_RecordingPath)) + { + throw std::runtime_error(fmt::format("could not find recording at '{}'", m_RecordingPath)); + } + + Stopwatch TotalTimer; + if (m_OnHost) { cpr::Session Session; @@ -254,20 +263,30 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) [this, &WorkLatch, EntryCount, &EntryOffset, &Replayer, &BytesSent, &BytesReceived, &MethodTypes, &MethodTypesLock]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + std::map<std::string, size_t> LocalMethodTypes; + + auto ReduceTypes = MakeGuard([&] { + RwLock::ExclusiveLockScope __(MethodTypesLock); + + for (auto& Entry : LocalMethodTypes) + { + MethodTypes[Entry.first] += Entry.second; + } + }); + cpr::Session Session; Session.SetUrl(fmt::format("{}/z$/$rpc"sv, m_HostName)); uint64_t EntryIndex = EntryOffset.fetch_add(m_Stride); while (EntryIndex < EntryCount) { - IoBuffer Payload; - std::pair<ZenContentType, ZenContentType> Types = Replayer->GetRequest(EntryIndex, Payload); - ZenContentType RequestContentType = Types.first; - ZenContentType AcceptContentType = Types.second; + IoBuffer Payload; + zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload); CbPackage RequestPackage; CbObject Request; - switch (RequestContentType) + + switch (RequestInfo.ContentType) { case ZenContentType::kCbPackage: { @@ -289,6 +308,7 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) int AdjustedPid = 0; RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone; + if (!m_DisableLocalRefs) { if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || m_ForceAllowLocalRefs) @@ -314,15 +334,14 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (m_ShowMethodStats) { - std::string MethodName = std::string(Request["Method"sv].AsString()); - RwLock::ExclusiveLockScope __(MethodTypesLock); - if (auto It = MethodTypes.find(MethodName); It != MethodTypes.end()) + std::string MethodName = std::string(Request["Method"sv].AsString()); + if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end()) { It->second++; } else { - MethodTypes[MethodName] = 1; + LocalMethodTypes[MethodName] = 1; } } @@ -356,7 +375,7 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions)); } - if (RequestContentType == ZenContentType::kCbPackage) + if (RequestInfo.ContentType == ZenContentType::kCbPackage) { RequestPackage.SetObject(RequestCopyWriter.Save()); std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage); @@ -371,27 +390,44 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } - Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestContentType))}, - {"Accept", std::string(MapContentTypeToString(AcceptContentType))}}); - uint64_t Offset = 0; - auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, Payload.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; - }; - Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - cpr::Response Response = Session.Post(); - BytesSent.fetch_add(Payload.GetSize()); - if (Response.error || !(IsHttpSuccessCode(Response.status_code) || - Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound))) + if (!m_DryRun) { - ZEN_CONSOLE("{}", FormatHttpResponse(Response)); - break; + StringBuilder<32> SessionIdString; + + if (RequestInfo.SessionId != Oid::Zero) + { + RequestInfo.SessionId.ToString(SessionIdString); + } + else + { + GetSessionId().ToString(SessionIdString); + } + + Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestInfo.ContentType))}, + {"Accept", std::string(MapContentTypeToString(RequestInfo.AcceptType))}, + {"UE-Session", std::string(SessionIdString)}}); + + uint64_t Offset = 0; + auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); + MutableMemoryView Data(buffer, size); + Data.CopyFrom(PayloadRange.GetView()); + Offset += size; + return true; + }; + Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + cpr::Response Response = Session.Post(); + BytesSent.fetch_add(Payload.GetSize()); + if (Response.error || !(IsHttpSuccessCode(Response.status_code) || + Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound))) + { + ZEN_CONSOLE("{}", FormatHttpResponse(Response)); + break; + } + BytesReceived.fetch_add(Response.downloaded_bytes); } - BytesReceived.fetch_add(Response.downloaded_bytes); + EntryIndex = EntryOffset.fetch_add(m_Stride); } }); @@ -399,38 +435,41 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) while (!WorkLatch.Wait(1000)) { - ZEN_CONSOLE("Processing {} requests, {} remaining (sent {}, recevied {})...", - (EntryCount - m_Offset) / m_Stride, - (EntryCount - EntryOffset.load()) / m_Stride, + const uint64_t RequestsTotal = (EntryCount - m_Offset) / m_Stride; + const uint64_t RequestsRemaining = (EntryCount - EntryOffset.load()) / m_Stride; + + ZEN_CONSOLE("[{:3}%] [{}] {} requests, {} remaining (sent {}, received {})", + (RequestsTotal - RequestsRemaining) * 100 / RequestsTotal, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + RequestsTotal, + RequestsRemaining, NiceBytes(BytesSent.load()), NiceBytes(BytesReceived.load())); } + if (m_ShowMethodStats) { for (const auto& It : MethodTypes) { - ZEN_CONSOLE("{}: {}", It.first, It.second); + ZEN_CONSOLE("{:18}: {:10}", It.first, It.second); } } } const uint64_t RequestsSent = (EntryOffset.load() - m_Offset) / m_Stride; const uint64_t ElapsedMS = Timer.GetElapsedTimeMs(); - const double ElapsedS = ElapsedMS / 1000.500; const uint64_t Sent = BytesSent.load(); const uint64_t Received = BytesReceived.load(); - const uint64_t RequestsPerS = static_cast<uint64_t>(RequestsSent / ElapsedS); - const uint64_t SentPerS = static_cast<uint64_t>(Sent / ElapsedS); - const uint64_t ReceivedPerS = static_cast<uint64_t>(Received / ElapsedS); - ZEN_CONSOLE("Requests sent {} ({}/s), payloads sent {}B ({}B/s), payloads received {}B ({}B/s) in {}", + ZEN_CONSOLE("Processed requests: {} ({}), payloads sent {} ({}), payloads received {} ({}) in {}.\nTotal runtime: {}", RequestsSent, - RequestsPerS, + NiceRate(RequestsSent, ElapsedMS, "req"), NiceBytes(Sent), - NiceBytes(SentPerS), + NiceByteRate(Sent, ElapsedMS), NiceBytes(Received), - NiceBytes(ReceivedPerS), - NiceTimeSpanMs(ElapsedMS)); + NiceByteRate(Received, ElapsedMS), + NiceTimeSpanMs(ElapsedMS), + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs())); return 0; } diff --git a/src/zen/cmds/rpcreplay_cmd.h b/src/zen/cmds/rpcreplay_cmd.h index 742e5ec5b..e1c2831a5 100644 --- a/src/zen/cmds/rpcreplay_cmd.h +++ b/src/zen/cmds/rpcreplay_cmd.h @@ -48,18 +48,19 @@ private: cxxopts::Options m_Options{"rpc-record-replay", "Replays a previously recorded session of cache rpc requests to a target host"}; std::string m_HostName; std::string m_RecordingPath; - bool m_OnHost = false; - bool m_ShowMethodStats = false; - int m_ProcessCount; - int m_ThreadCount; - uint64_t m_Offset; - uint64_t m_Stride; - bool m_ForceAllowLocalRefs; - bool m_DisableLocalRefs; - bool m_ForceAllowLocalHandleRef; - bool m_DisableLocalHandleRefs; - bool m_ForceAllowPartialLocalRefs; - bool m_DisablePartialLocalRefs; + bool m_OnHost = false; + bool m_ShowMethodStats = false; + int m_ProcessCount = 1; + int m_ThreadCount = 0; + uint64_t m_Offset = 0; + uint64_t m_Stride = 1; + bool m_ForceAllowLocalRefs = false; + bool m_DisableLocalRefs = false; + bool m_ForceAllowLocalHandleRef = false; + bool m_DisableLocalHandleRefs = false; + bool m_ForceAllowPartialLocalRefs = false; + bool m_DisablePartialLocalRefs = false; + bool m_DryRun = false; }; } // namespace zen diff --git a/src/zencore/include/zencore/string.h b/src/zencore/include/zencore/string.h index 2319d7ade..2b61b3954 100644 --- a/src/zencore/include/zencore/string.h +++ b/src/zencore/include/zencore/string.h @@ -690,7 +690,7 @@ struct NiceTimeSpanMs : public NiceBase ////////////////////////////////////////////////////////////////////////// inline std::string -NiceRate(uint64_t Num, uint32_t DurationMilliseconds, const char* Unit = "B") +NiceRate(uint64_t Num, uint64_t DurationMilliseconds, const char* Unit = "B") { char Buffer[32]; diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h index 044a3162e..0663abc43 100644 --- a/src/zenhttp/include/zenhttp/httpclient.h +++ b/src/zenhttp/include/zenhttp/httpclient.h @@ -87,7 +87,7 @@ public: // The number of bytes sent as part of the request int64_t UploadedBytes; - // The number of bytes recevied as part of the response + // The number of bytes received as part of the response int64_t DownloadedBytes; // The elapsed time in seconds for the request to execute diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 4ec7c56db..91f895b0b 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1691,16 +1691,22 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { WorkerPool.ScheduleWork([this, &Context, &JobLatch, &Replayer, RequestIndex]() { - IoBuffer Body; - std::pair<ZenContentType, ZenContentType> ContentType = Replayer.GetRequest(RequestIndex, Body); + IoBuffer Body; + zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body); + if (Body) { uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; int TargetPid = 0; CbPackage RpcResult; - if (IsHttpSuccessCode( - HandleRpcRequest(Context, ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult))) + if (IsHttpSuccessCode(HandleRpcRequest(Context, + RequestInfo.ContentType, + std::move(Body), + AcceptMagic, + AcceptFlags, + TargetPid, + RpcResult))) { if (AcceptMagic == kCbPkgMagic) { @@ -1764,8 +1770,15 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) auto HandleRpc = [this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { - std::uint64_t RequestIndex = - m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull; + uint64_t RequestIndex = ~0ull; + + if (m_RequestRecorder) + { + RequestIndex = m_RequestRecorder->RecordRequest( + {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, + Body); + } + uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; int TargetProcessId = 0; diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 152a47584..6a937089d 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -268,10 +268,11 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen m_VfsService->AddService(Ref<ZenCacheStore>(m_CacheStore)); m_Http->RegisterService(*m_VfsService); - ZEN_INFO("initializing GC, enabled '{}', interval {}s, lightweight interval {}s", + ZEN_INFO("initializing GC, enabled '{}', interval {}, lightweight interval {}", ServerOptions.GcConfig.Enabled, - ServerOptions.GcConfig.IntervalSeconds, - ServerOptions.GcConfig.LightweightIntervalSeconds); + NiceTimeSpanMs(ServerOptions.GcConfig.IntervalSeconds * 1000ull), + NiceTimeSpanMs(ServerOptions.GcConfig.LightweightIntervalSeconds * 1000ull)); + GcSchedulerConfig GcConfig{.RootDirectory = m_DataRoot / "gc", .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds), .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), diff --git a/src/zenstore/caslog.cpp b/src/zenstore/caslog.cpp index 2a978ae12..c04324fbc 100644 --- a/src/zenstore/caslog.cpp +++ b/src/zenstore/caslog.cpp @@ -2,16 +2,12 @@ #include <zenstore/caslog.h> -#include "compactcas.h" - #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/memory.h> #include <zencore/string.h> -#include <zencore/thread.h> -#include <zencore/uid.h> #include <xxhash.h> diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index 4958a27f6..054ac0e56 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -1,5 +1,9 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zencore/compactbinarybuilder.h> +#include <zencore/filesystem.h> +#include <zencore/logging.h> +#include <zencore/system.h> #include <zenutil/basicfile.h> #include <zenutil/cache/rpcrecording.h> @@ -9,6 +13,15 @@ ZEN_THIRD_PARTY_INCLUDES_START ZEN_THIRD_PARTY_INCLUDES_END namespace zen::cache { + +const RecordedRequestInfo RecordedRequestInfo::NullRequest = {.ContentType = ZenContentType::kUnknownContentType, + .AcceptType = ZenContentType::kUnknownContentType, + .SessionId = Oid::Zero}; + +} + +namespace zen::cache::v1 { + struct RecordedRequest { uint64_t Offset; @@ -17,7 +30,8 @@ struct RecordedRequest ZenContentType AcceptType; }; -const uint64_t RecordedRequestBlockSize = 1ull << 31u; +const uint64_t RecordedRequestBlockSize = 1ull << 31u; +const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; struct RecordedRequestsWriter { @@ -32,6 +46,31 @@ struct RecordedRequestsWriter RwLock::ExclusiveLockScope _(m_Lock); m_BlockFiles.clear(); + try + { + // Emit some metadata alongside the recording + + DateTime EndTime = DateTime::Now(); + TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()}; + + CbObjectWriter Cbo; + Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration; + Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest); + Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold; + + Cbo.BeginObject("system_info"); + Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); + Describe(GetSystemMetrics(), Cbo); + Cbo.EndObject(); + CbObject Metadata = Cbo.Save(); + + WriteFile(m_BasePath / "rpc_recording_metadata.zcb", Metadata.GetBuffer().AsIoBuffer()); + } + catch (std::exception& Ex) + { + ZEN_WARN("caught exception while generating metadata for RPC recording: {}", Ex.what()); + } + IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); BasicFile IndexFile; IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); @@ -41,15 +80,18 @@ struct RecordedRequestsWriter m_Entries.clear(); } - uint64_t WriteRequest(ZenContentType ContentType, ZenContentType AcceptType, const IoBuffer& RequestBuffer) + uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { RwLock::ExclusiveLockScope Lock(m_Lock); uint64_t RequestIndex = m_Entries.size(); - RecordedRequest& Entry = m_Entries.emplace_back( - RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType, .AcceptType = AcceptType}); - if (Entry.Length < 1 * 1024 * 1024) + RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull, + .Length = RequestBuffer.Size(), + .ContentType = RequestInfo.ContentType, + .AcceptType = RequestInfo.AcceptType}); + + if (Entry.Length < StandaloneFileSizeThreshold) { - uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); + const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); if (BlockIndex == m_BlockFiles.size()) { std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); @@ -92,6 +134,7 @@ struct RecordedRequestsWriter std::vector<RecordedRequest> m_Entries; std::vector<std::unique_ptr<BasicFile>> m_BlockFiles; uint64_t m_ChunkOffset; + zen::DateTime m_StartTime = DateTime::Now(); }; struct RecordedRequestsReader @@ -128,26 +171,31 @@ struct RecordedRequestsReader } void EndRead() { m_BlockFiles.clear(); } - std::pair<ZenContentType, ZenContentType> ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const + RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const { if (RequestIndex >= m_Entries.size()) { - return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType}; + return RecordedRequestInfo::NullRequest; } + const RecordedRequest& Entry = m_Entries[RequestIndex]; if (Entry.Length == 0) { - return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType}; + return RecordedRequestInfo::NullRequest; } + if (Entry.Offset != ~0ull) { uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); - return {Entry.ContentType, Entry.AcceptType}; } - OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); - return {Entry.ContentType, Entry.AcceptType}; + else + { + OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); + } + + return {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Oid::Zero}; } std::filesystem::path m_BasePath; @@ -162,14 +210,13 @@ public: virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } private: - virtual uint64_t RecordRequest(const ZenContentType ContentType, - const ZenContentType AcceptType, - const IoBuffer& RequestBuffer) override + virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override { - return m_RecordedRequests.WriteRequest(ContentType, AcceptType, RequestBuffer); + return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); } - virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} - virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} + virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} + virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} + RecordedRequestsWriter m_RecordedRequests; }; @@ -185,7 +232,7 @@ public: private: virtual uint64_t GetRequestCount() const override { return m_RequestCount; } - virtual std::pair<ZenContentType, ZenContentType> GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override + virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override { return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); } @@ -195,16 +242,628 @@ private: RecordedRequestsReader m_RequestBuffer; }; +} // namespace zen::cache::v1 + +////////////////////////////////////////////////////////////////////////// + +namespace zen::cache::v2 { + +using namespace std::literals; + +struct RecordedRequest +{ + uint32_t Offset; // 4 bytes + uint32_t Length; // 4 bytes + ZenContentType ContentType; // 1 byte + ZenContentType AcceptType; // 1 byte + uint8_t Padding; // 1 byte + uint8_t Padding2; // 1 byte + Oid SessionId; // 12 bytes +}; + +static_assert(sizeof(RecordedRequest) == 24); + +const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB +const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; // 1MiB +const uint64_t SegmentRequestCount = 10 * 1000 * 1000; +const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the + // number of files in a directory below this level + // for performance +const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024; +const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0}; + +struct RecordedRequestsSegmentWriter +{ + RecordedRequestsSegmentWriter() = default; + ~RecordedRequestsSegmentWriter() = default; + + RecordedRequestsSegmentWriter(const RecordedRequestsSegmentWriter&) = delete; + RecordedRequestsSegmentWriter& operator=(const RecordedRequestsSegmentWriter&) = delete; + + void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex); + uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); + void EndWrite(); + + inline uint64_t GetRequestCount() const + { + if (m_RequestCount) + { + return m_RequestCount; + } + + return m_Entries.size(); + } + inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; } + inline uint64_t GetSegmentIndex() const { return m_SegmentIndex; } + inline uint64_t GetFileCount() const { return m_FileCount; } + inline uint64_t GetRequestsByteCount() const { return m_RequestsByteCount; } + inline DateTime GetStartTime() const { return m_StartTime; } + inline DateTime GetEndTime() const { return m_EndTime; } + +private: + std::filesystem::path m_BasePath; + uint64_t m_SegmentIndex = 0; + uint64_t m_RequestBaseIndex = 0; + uint64_t m_RequestCount = 0; + std::atomic_uint64_t m_FileCount{}; + std::atomic_uint64_t m_RequestsByteCount{}; + mutable RwLock m_Lock; + std::vector<RecordedRequest> m_Entries; + std::vector<std::unique_ptr<BasicFile>> m_BlockFiles; + uint64_t m_ChunkOffset; + DateTime m_StartTime = DateTime::Now(); + DateTime m_EndTime = DateTime::Now(); +}; + +struct RecordedRequestsWriter +{ + void BeginWrite(const std::filesystem::path& BasePath); + RecordedRequestsSegmentWriter& EnsureCurrentSegment(); + void CommitCurrentSegment(RwLock::ExclusiveLockScope&); + void EndWrite(); + uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); + +private: + std::filesystem::path m_BasePath; + zen::DateTime m_StartTime = DateTime::Now(); + + mutable RwLock m_Lock; + std::unique_ptr<RecordedRequestsSegmentWriter> m_CurrentWriter; + std::vector<std::unique_ptr<RecordedRequestsSegmentWriter>> m_FinishedSegments; + std::vector<uint64_t> m_SegmentBaseIndex; + uint64_t m_NextSegmentBaseIndex = 0; +}; + +////////////////////////////////////////////////////////////////////////// + +struct RecordedRequestsSegmentReader +{ + RecordedRequestsSegmentReader() = default; + + RecordedRequestsSegmentReader(const RecordedRequestsSegmentReader&) = delete; + RecordedRequestsSegmentReader& operator=(const RecordedRequestsSegmentReader&) = delete; + + uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory); + void EndRead(); + RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const; + +private: + std::filesystem::path m_BasePath; + std::vector<RecordedRequest> m_Entries; + std::vector<IoBuffer> m_BlockFiles; +}; + +struct RecordedRequestsReader +{ + uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory); + void EndRead(); + RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const; + +private: + struct SegmentInfo + { + uint64_t SegmentIndex; + uint64_t BaseRequestIndex; + uint64_t RequestCount; + uint64_t RequestBytes; + DateTime StartTime{0}; + DateTime EndTime{0}; + }; + + bool m_InMemory = false; + std::filesystem::path m_BasePath; + std::vector<SegmentInfo> m_KnownSegments; + + mutable RwLock m_SegmentLock; + mutable std::vector<std::unique_ptr<RecordedRequestsSegmentReader>> m_SegmentReaders; +}; + +////////////////////////////////////////////////////////////////////////// + +void +RecordedRequestsSegmentWriter::BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex) +{ + m_BasePath = BasePath; + m_SegmentIndex = SegmentIndex; + m_RequestBaseIndex = RequestBaseIndex; + std::filesystem::create_directories(m_BasePath); +} + +void +RecordedRequestsSegmentWriter::EndWrite() +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_RequestCount = m_Entries.size(); + m_BlockFiles.clear(); + + // Emit some metadata alongside the recording + + try + { + m_EndTime = DateTime::Now(); + TimeSpan Duration{m_EndTime.GetTicks() - m_StartTime.GetTicks()}; + + CbObjectWriter Cbo; + Cbo << "time_start" << m_StartTime << "time_end" << m_EndTime << "duration" << Duration; + Cbo << "segment_index" << m_SegmentIndex; + Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest); + Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold; + + Cbo.BeginObject("system_info"); + Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); + Cbo.EndObject(); + CbObject Metadata = Cbo.Save(); + + WriteFile(m_BasePath / "rpc_segment_info.zcb", Metadata.GetBuffer().AsIoBuffer()); + } + catch (std::exception& Ex) + { + ZEN_WARN("caught exception while writing segment metadata for RPC recording: {}", Ex.what()); + } + + IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); + BasicFile IndexFile; + IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); + std::error_code Ec; + IndexFile.WriteAll(IndexBuffer, Ec); + IndexFile.Close(); + m_Entries.clear(); +} + +uint64_t +RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) +{ + const uint64_t RequestBufferSize = RequestBuffer.GetSize(); + + RwLock::ExclusiveLockScope Lock(m_Lock); + uint64_t RequestIndex = m_Entries.size(); + RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u, + .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), + .ContentType = RequestInfo.ContentType, + .AcceptType = RequestInfo.AcceptType, + .Padding = 0, + .Padding2 = 0, + .SessionId = RequestInfo.SessionId}); + + if (Entry.Length < StandaloneFileSizeThreshold) + { + const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); + + if (BlockIndex == m_BlockFiles.size()) + { + std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); + NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); + m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; + ++m_FileCount; + } + + ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); + BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); + ZEN_ASSERT(BlockFile != nullptr); + + Entry.Offset = uint32_t(m_ChunkOffset & 0xffffFFFF); + m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); + Lock.ReleaseNow(); + + std::error_code Ec; + BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); + if (Ec) + { + Entry.Length = 0; + return ~0ull; + } + + m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); + + return RequestIndex; + } + Lock.ReleaseNow(); + + // Write request data to standalone file + + try + { + BasicFile RequestFile; + RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); + + if (RequestBufferSize > std::numeric_limits<uint32_t>::max()) + { + // The exact value of the entry is not important, we will use + // the size of the standalone file regardless when performing + // the read + Entry.Length = std::numeric_limits<uint32_t>::max(); + } + + ++m_FileCount; + + std::error_code Ec; + RequestFile.WriteAll(RequestBuffer, Ec); + + if (Ec) + { + Entry.Length = 0; + return ~0ull; + } + + m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); + + return RequestIndex; + } + catch (std::exception&) + { + Entry.Length = 0; + return ~0ull; + } +} + +uint64_t +RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory) +{ + m_BasePath = BasePath; + BasicFile IndexFile; + IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); + m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); + IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); + uint64_t MaxChunkPosition = 0; + for (const RecordedRequest& R : m_Entries) + { + if (R.Offset != ~0u) + { + MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); + } + } + uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1; + m_BlockFiles.resize(BlockCount); + for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) + { + if (InMemory) + { + BasicFile Chunk; + Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead); + m_BlockFiles[BlockIndex] = Chunk.ReadAll(); + continue; + } + m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex)); + } + return m_Entries.size(); +} +void +RecordedRequestsSegmentReader::EndRead() +{ + m_BlockFiles.clear(); +} + +RecordedRequestInfo +RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const +{ + if (RequestIndex >= m_Entries.size()) + { + return RecordedRequestInfo::NullRequest; + } + const RecordedRequest& Entry = m_Entries[RequestIndex]; + if (Entry.Length == 0) + { + return RecordedRequestInfo::NullRequest; + } + + RecordedRequestInfo RequestInfo = {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Entry.SessionId}; + + if (Entry.Offset != ~0u) + { + // Inline in block file + uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); + uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); + OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); + + return RequestInfo; + } + + // Standalone file + OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); + + return RequestInfo; +} + +void +RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath) +{ + m_BasePath = BasePath; + EnsureCurrentSegment(); +} + +RecordedRequestsSegmentWriter& +RecordedRequestsWriter::EnsureCurrentSegment() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + if (m_CurrentWriter) + { + bool StartNewSegment = false; + + if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount) + { + ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount); + StartNewSegment = true; + } + else if (m_CurrentWriter->GetFileCount() >= LooseFileThreshold) + { + ZEN_DEBUG("starting new RPC recording segment due to file count >= {}", LooseFileThreshold); + StartNewSegment = true; + } + else if (m_CurrentWriter->GetRequestsByteCount() >= SegmentByteThreshold) + { + ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold); + StartNewSegment = true; + } + + if (StartNewSegment) + { + CommitCurrentSegment(_); + } + } + + if (!m_CurrentWriter) + { + const uint64_t SegmentIndex = m_FinishedSegments.size(); + m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>(); + + m_CurrentWriter->BeginWrite(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex); + } + + return *m_CurrentWriter; +} + +void +RecordedRequestsWriter::CommitCurrentSegment(RwLock::ExclusiveLockScope&) +{ + if (m_CurrentWriter) + { + RecordedRequestsSegmentWriter& Writer = *m_CurrentWriter; + m_FinishedSegments.push_back(std::move(m_CurrentWriter)); + m_SegmentBaseIndex.push_back(m_NextSegmentBaseIndex); + m_NextSegmentBaseIndex += Writer.GetRequestCount(); + + Writer.EndWrite(); + } +} + +void +RecordedRequestsWriter::EndWrite() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + CommitCurrentSegment(_); + + // Emit some metadata alongside the recording + + try + { + DateTime EndTime = DateTime::Now(); + TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()}; + + CbObjectWriter Cbo; + Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration << "format_version" << 2; + + Cbo.BeginObject("system_info"); + Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); + Cbo << "segment_count" << m_FinishedSegments.size(); + Cbo << "block_size" << RecordedRequestBlockSize; + Cbo << "standalone_threshold" << StandaloneFileSizeThreshold; + Cbo << "segment_request_count" << SegmentRequestCount; + Cbo << "segment_file_threshold" << LooseFileThreshold; + Cbo << "segment_byte_threshold" << SegmentByteThreshold; + + Describe(GetSystemMetrics(), Cbo); + Cbo.EndObject(); + + Cbo.BeginArray("segments"); + + for (auto& Segment : m_FinishedSegments) + { + Cbo.BeginObject(); + Cbo << "segment" << Segment->GetSegmentIndex(); + Cbo << "base_index" << Segment->GetBaseRequestIndex(); + Cbo << "request_count" << Segment->GetRequestCount(); + Cbo << "request_bytes" << Segment->GetRequestsByteCount(); + Cbo << "start_time" << Segment->GetStartTime(); + Cbo << "end_time" << Segment->GetEndTime(); + Cbo.EndObject(); + } + + Cbo.EndArray(); + + CbObject Metadata = Cbo.Save(); + + WriteFile(m_BasePath / "rpc_recording_info.zcb", Metadata.GetBuffer().AsIoBuffer()); + } + catch (std::exception& Ex) + { + ZEN_WARN("caught exception while writing metadata for RPC recording: {}", Ex.what()); + } +} + +uint64_t +RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) +{ + RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); + + const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer); + + return Writer.GetBaseRequestIndex() + SegmentLocalIndex; +} + +////////////////////////////////////////////////////////////////////////// + +uint64_t +RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory) +{ + m_InMemory = InMemory; + m_BasePath = BasePath; + + BasicFile InfoFile; + InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead); + CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll()); + + uint64_t TotalRequestCount = 0; + uint64_t MaxSegmentIndex = 0; + + for (auto SegmentElement : CbInfo["segments"]) + { + CbObjectView Segment = SegmentElement.AsObjectView(); + + const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(), + .BaseRequestIndex = Segment["base_index"sv].AsUInt64(), + .RequestCount = Segment["request_count"sv].AsUInt64(), + .RequestBytes = Segment["request_bytes"sv].AsUInt64(), + .StartTime = Segment["start_time"sv].AsDateTime(), + .EndTime = Segment["end_time"sv].AsDateTime()}); + + TotalRequestCount += Info.RequestCount; + MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + } + + m_SegmentReaders.resize(MaxSegmentIndex + 1); + + return TotalRequestCount; +} + +void +RecordedRequestsReader::EndRead() +{ + RwLock::ExclusiveLockScope _(m_SegmentLock); + m_KnownSegments.clear(); + m_SegmentReaders.clear(); + m_BasePath.clear(); +} + +RecordedRequestInfo +RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const +{ + auto EnsureSegment = [&](uint64_t SegmentIndex) -> const RecordedRequestsSegmentReader& { + { + RwLock::SharedLockScope _(m_SegmentLock); + + if (auto SegmentReaderPtr = m_SegmentReaders[SegmentIndex].get()) + { + return *SegmentReaderPtr; + } + } + + RwLock::ExclusiveLockScope _(m_SegmentLock); + + auto& SegmentReaderPtr = m_SegmentReaders[SegmentIndex]; + + if (!SegmentReaderPtr) + { + RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader; + NewSegment->BeginRead(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), m_InMemory); + SegmentReaderPtr.reset(NewSegment); + } + + return *(SegmentReaderPtr.get()); + }; + + for (auto& Segment : m_KnownSegments) + { + if (Segment.RequestCount > RequestIndex) + { + return EnsureSegment(Segment.SegmentIndex).ReadRequest(RequestIndex, OutBuffer); + } + else + { + RequestIndex -= Segment.RequestCount; + } + } + + return RecordedRequestInfo::NullRequest; +} + +////////////////////////////////////////////////////////////////////////// + +class DiskRequestRecorder : public IRpcRequestRecorder +{ +public: + DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } + virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } + +private: + virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override + { + return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); + } + virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} + virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} + + RecordedRequestsWriter m_RecordedRequests; +}; + +class DiskRequestReplayer : public IRpcRequestReplayer +{ +public: + DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) + { + m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory); + } + virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } + + static bool IsCompatible(const std::filesystem::path& BasePath) { return std::filesystem::exists(BasePath / "rpc_recording_info.zcb"); } + +private: + virtual uint64_t GetRequestCount() const override { return m_RequestCount; } + + virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override + { + return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); + } + virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } + + std::uint64_t m_RequestCount; + RecordedRequestsReader m_RequestBuffer; +}; + +} // namespace zen::cache::v2 + +////////////////////////////////////////////////////////////////////////// + +namespace zen::cache { + std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath) { - return std::make_unique<DiskRequestRecorder>(BasePath); + return std::make_unique<v2::DiskRequestRecorder>(BasePath); } std::unique_ptr<cache::IRpcRequestReplayer> MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) { - return std::make_unique<DiskRequestReplayer>(BasePath, InMemory); + if (v2::DiskRequestReplayer::IsCompatible(BasePath)) + { + return std::make_unique<v2::DiskRequestReplayer>(BasePath, InMemory); + } + else + { + return std::make_unique<v1::DiskRequestReplayer>(BasePath, InMemory); + } } } // namespace zen::cache diff --git a/src/zenutil/include/zenutil/cache/rpcrecording.h b/src/zenutil/include/zenutil/cache/rpcrecording.h index 6d65a532a..fd5df26ad 100644 --- a/src/zenutil/include/zenutil/cache/rpcrecording.h +++ b/src/zenutil/include/zenutil/cache/rpcrecording.h @@ -6,21 +6,32 @@ #include <zencore/iobuffer.h> namespace zen::cache { + +struct RecordedRequestInfo +{ + ZenContentType ContentType; + ZenContentType AcceptType; + Oid SessionId; + + static const RecordedRequestInfo NullRequest; +}; + class IRpcRequestRecorder { public: virtual ~IRpcRequestRecorder() {} - virtual uint64_t RecordRequest(const ZenContentType ContentType, const ZenContentType AcceptType, const IoBuffer& RequestBuffer) = 0; - virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const IoBuffer& ResponseBuffer) = 0; - virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const CompositeBuffer& ResponseBuffer) = 0; + virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) = 0; + virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const IoBuffer& ResponseBuffer) = 0; + virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const CompositeBuffer& ResponseBuffer) = 0; }; + class IRpcRequestReplayer { public: virtual ~IRpcRequestReplayer() {} - virtual uint64_t GetRequestCount() const = 0; - virtual std::pair<ZenContentType, ZenContentType> GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; - virtual ZenContentType GetResponse(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; + virtual uint64_t GetRequestCount() const = 0; + virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; + virtual ZenContentType GetResponse(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; }; std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath); |