aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/rpcreplay_cmd.cpp123
-rw-r--r--src/zen/cmds/rpcreplay_cmd.h25
-rw-r--r--src/zencore/include/zencore/string.h2
-rw-r--r--src/zenhttp/include/zenhttp/httpclient.h2
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp25
-rw-r--r--src/zenserver/zenserver.cpp7
-rw-r--r--src/zenstore/caslog.cpp4
-rw-r--r--src/zenutil/cache/rpcrecording.cpp701
-rw-r--r--src/zenutil/include/zenutil/cache/rpcrecording.h23
9 files changed, 816 insertions, 96 deletions
diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp
index 9e43280e1..d2cfc13ac 100644
--- a/src/zen/cmds/rpcreplay_cmd.cpp
+++ b/src/zen/cmds/rpcreplay_cmd.cpp
@@ -6,6 +6,7 @@
#include <zencore/filesystem.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
+#include <zencore/session.h>
#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zencore/workthreadpool.h>
@@ -106,6 +107,7 @@ RpcReplayCommand::RpcReplayCommand()
m_Options.add_options()("h,help", "Print help");
m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>");
m_Options.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), "<path>");
+ m_Options.add_option("", "", "dry", "Do a dry run", cxxopts::value(m_DryRun), "<enable>");
m_Options.add_option("",
"w",
"numthreads",
@@ -192,6 +194,13 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException("Rpc replay command requires a path");
}
+ if (!std::filesystem::exists(m_RecordingPath) || !std::filesystem::is_directory(m_RecordingPath))
+ {
+ throw std::runtime_error(fmt::format("could not find recording at '{}'", m_RecordingPath));
+ }
+
+ Stopwatch TotalTimer;
+
if (m_OnHost)
{
cpr::Session Session;
@@ -254,20 +263,30 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
[this, &WorkLatch, EntryCount, &EntryOffset, &Replayer, &BytesSent, &BytesReceived, &MethodTypes, &MethodTypesLock]() {
auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ std::map<std::string, size_t> LocalMethodTypes;
+
+ auto ReduceTypes = MakeGuard([&] {
+ RwLock::ExclusiveLockScope __(MethodTypesLock);
+
+ for (auto& Entry : LocalMethodTypes)
+ {
+ MethodTypes[Entry.first] += Entry.second;
+ }
+ });
+
cpr::Session Session;
Session.SetUrl(fmt::format("{}/z$/$rpc"sv, m_HostName));
uint64_t EntryIndex = EntryOffset.fetch_add(m_Stride);
while (EntryIndex < EntryCount)
{
- IoBuffer Payload;
- std::pair<ZenContentType, ZenContentType> Types = Replayer->GetRequest(EntryIndex, Payload);
- ZenContentType RequestContentType = Types.first;
- ZenContentType AcceptContentType = Types.second;
+ IoBuffer Payload;
+ zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload);
CbPackage RequestPackage;
CbObject Request;
- switch (RequestContentType)
+
+ switch (RequestInfo.ContentType)
{
case ZenContentType::kCbPackage:
{
@@ -289,6 +308,7 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
int AdjustedPid = 0;
RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone;
+
if (!m_DisableLocalRefs)
{
if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || m_ForceAllowLocalRefs)
@@ -314,15 +334,14 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (m_ShowMethodStats)
{
- std::string MethodName = std::string(Request["Method"sv].AsString());
- RwLock::ExclusiveLockScope __(MethodTypesLock);
- if (auto It = MethodTypes.find(MethodName); It != MethodTypes.end())
+ std::string MethodName = std::string(Request["Method"sv].AsString());
+ if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end())
{
It->second++;
}
else
{
- MethodTypes[MethodName] = 1;
+ LocalMethodTypes[MethodName] = 1;
}
}
@@ -356,7 +375,7 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions));
}
- if (RequestContentType == ZenContentType::kCbPackage)
+ if (RequestInfo.ContentType == ZenContentType::kCbPackage)
{
RequestPackage.SetObject(RequestCopyWriter.Save());
std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage);
@@ -371,27 +390,44 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
}
- Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestContentType))},
- {"Accept", std::string(MapContentTypeToString(AcceptContentType))}});
- uint64_t Offset = 0;
- auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, Payload.GetSize() - Offset);
- IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
- MutableMemoryView Data(buffer, size);
- Data.CopyFrom(PayloadRange.GetView());
- Offset += size;
- return true;
- };
- Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
- cpr::Response Response = Session.Post();
- BytesSent.fetch_add(Payload.GetSize());
- if (Response.error || !(IsHttpSuccessCode(Response.status_code) ||
- Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound)))
+ if (!m_DryRun)
{
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- break;
+ StringBuilder<32> SessionIdString;
+
+ if (RequestInfo.SessionId != Oid::Zero)
+ {
+ RequestInfo.SessionId.ToString(SessionIdString);
+ }
+ else
+ {
+ GetSessionId().ToString(SessionIdString);
+ }
+
+ Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestInfo.ContentType))},
+ {"Accept", std::string(MapContentTypeToString(RequestInfo.AcceptType))},
+ {"UE-Session", std::string(SessionIdString)}});
+
+ uint64_t Offset = 0;
+ auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, Payload.GetSize() - Offset);
+ IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
+ MutableMemoryView Data(buffer, size);
+ Data.CopyFrom(PayloadRange.GetView());
+ Offset += size;
+ return true;
+ };
+ Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+ cpr::Response Response = Session.Post();
+ BytesSent.fetch_add(Payload.GetSize());
+ if (Response.error || !(IsHttpSuccessCode(Response.status_code) ||
+ Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound)))
+ {
+ ZEN_CONSOLE("{}", FormatHttpResponse(Response));
+ break;
+ }
+ BytesReceived.fetch_add(Response.downloaded_bytes);
}
- BytesReceived.fetch_add(Response.downloaded_bytes);
+
EntryIndex = EntryOffset.fetch_add(m_Stride);
}
});
@@ -399,38 +435,41 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
while (!WorkLatch.Wait(1000))
{
- ZEN_CONSOLE("Processing {} requests, {} remaining (sent {}, recevied {})...",
- (EntryCount - m_Offset) / m_Stride,
- (EntryCount - EntryOffset.load()) / m_Stride,
+ const uint64_t RequestsTotal = (EntryCount - m_Offset) / m_Stride;
+ const uint64_t RequestsRemaining = (EntryCount - EntryOffset.load()) / m_Stride;
+
+ ZEN_CONSOLE("[{:3}%] [{}] {} requests, {} remaining (sent {}, received {})",
+ (RequestsTotal - RequestsRemaining) * 100 / RequestsTotal,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ RequestsTotal,
+ RequestsRemaining,
NiceBytes(BytesSent.load()),
NiceBytes(BytesReceived.load()));
}
+
if (m_ShowMethodStats)
{
for (const auto& It : MethodTypes)
{
- ZEN_CONSOLE("{}: {}", It.first, It.second);
+ ZEN_CONSOLE("{:18}: {:10}", It.first, It.second);
}
}
}
const uint64_t RequestsSent = (EntryOffset.load() - m_Offset) / m_Stride;
const uint64_t ElapsedMS = Timer.GetElapsedTimeMs();
- const double ElapsedS = ElapsedMS / 1000.500;
const uint64_t Sent = BytesSent.load();
const uint64_t Received = BytesReceived.load();
- const uint64_t RequestsPerS = static_cast<uint64_t>(RequestsSent / ElapsedS);
- const uint64_t SentPerS = static_cast<uint64_t>(Sent / ElapsedS);
- const uint64_t ReceivedPerS = static_cast<uint64_t>(Received / ElapsedS);
- ZEN_CONSOLE("Requests sent {} ({}/s), payloads sent {}B ({}B/s), payloads received {}B ({}B/s) in {}",
+ ZEN_CONSOLE("Processed requests: {} ({}), payloads sent {} ({}), payloads received {} ({}) in {}.\nTotal runtime: {}",
RequestsSent,
- RequestsPerS,
+ NiceRate(RequestsSent, ElapsedMS, "req"),
NiceBytes(Sent),
- NiceBytes(SentPerS),
+ NiceByteRate(Sent, ElapsedMS),
NiceBytes(Received),
- NiceBytes(ReceivedPerS),
- NiceTimeSpanMs(ElapsedMS));
+ NiceByteRate(Received, ElapsedMS),
+ NiceTimeSpanMs(ElapsedMS),
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()));
return 0;
}
diff --git a/src/zen/cmds/rpcreplay_cmd.h b/src/zen/cmds/rpcreplay_cmd.h
index 742e5ec5b..e1c2831a5 100644
--- a/src/zen/cmds/rpcreplay_cmd.h
+++ b/src/zen/cmds/rpcreplay_cmd.h
@@ -48,18 +48,19 @@ private:
cxxopts::Options m_Options{"rpc-record-replay", "Replays a previously recorded session of cache rpc requests to a target host"};
std::string m_HostName;
std::string m_RecordingPath;
- bool m_OnHost = false;
- bool m_ShowMethodStats = false;
- int m_ProcessCount;
- int m_ThreadCount;
- uint64_t m_Offset;
- uint64_t m_Stride;
- bool m_ForceAllowLocalRefs;
- bool m_DisableLocalRefs;
- bool m_ForceAllowLocalHandleRef;
- bool m_DisableLocalHandleRefs;
- bool m_ForceAllowPartialLocalRefs;
- bool m_DisablePartialLocalRefs;
+ bool m_OnHost = false;
+ bool m_ShowMethodStats = false;
+ int m_ProcessCount = 1;
+ int m_ThreadCount = 0;
+ uint64_t m_Offset = 0;
+ uint64_t m_Stride = 1;
+ bool m_ForceAllowLocalRefs = false;
+ bool m_DisableLocalRefs = false;
+ bool m_ForceAllowLocalHandleRef = false;
+ bool m_DisableLocalHandleRefs = false;
+ bool m_ForceAllowPartialLocalRefs = false;
+ bool m_DisablePartialLocalRefs = false;
+ bool m_DryRun = false;
};
} // namespace zen
diff --git a/src/zencore/include/zencore/string.h b/src/zencore/include/zencore/string.h
index 2319d7ade..2b61b3954 100644
--- a/src/zencore/include/zencore/string.h
+++ b/src/zencore/include/zencore/string.h
@@ -690,7 +690,7 @@ struct NiceTimeSpanMs : public NiceBase
//////////////////////////////////////////////////////////////////////////
inline std::string
-NiceRate(uint64_t Num, uint32_t DurationMilliseconds, const char* Unit = "B")
+NiceRate(uint64_t Num, uint64_t DurationMilliseconds, const char* Unit = "B")
{
char Buffer[32];
diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h
index 044a3162e..0663abc43 100644
--- a/src/zenhttp/include/zenhttp/httpclient.h
+++ b/src/zenhttp/include/zenhttp/httpclient.h
@@ -87,7 +87,7 @@ public:
// The number of bytes sent as part of the request
int64_t UploadedBytes;
- // The number of bytes recevied as part of the response
+ // The number of bytes received as part of the response
int64_t DownloadedBytes;
// The elapsed time in seconds for the request to execute
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 4ec7c56db..91f895b0b 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -1691,16 +1691,22 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
{
WorkerPool.ScheduleWork([this, &Context, &JobLatch, &Replayer, RequestIndex]() {
- IoBuffer Body;
- std::pair<ZenContentType, ZenContentType> ContentType = Replayer.GetRequest(RequestIndex, Body);
+ IoBuffer Body;
+ zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body);
+
if (Body)
{
uint32_t AcceptMagic = 0;
RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
int TargetPid = 0;
CbPackage RpcResult;
- if (IsHttpSuccessCode(
- HandleRpcRequest(Context, ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult)))
+ if (IsHttpSuccessCode(HandleRpcRequest(Context,
+ RequestInfo.ContentType,
+ std::move(Body),
+ AcceptMagic,
+ AcceptFlags,
+ TargetPid,
+ RpcResult)))
{
if (AcceptMagic == kCbPkgMagic)
{
@@ -1764,8 +1770,15 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
auto HandleRpc =
[this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable {
- std::uint64_t RequestIndex =
- m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull;
+ uint64_t RequestIndex = ~0ull;
+
+ if (m_RequestRecorder)
+ {
+ RequestIndex = m_RequestRecorder->RecordRequest(
+ {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId},
+ Body);
+ }
+
uint32_t AcceptMagic = 0;
RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
int TargetProcessId = 0;
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 152a47584..6a937089d 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -268,10 +268,11 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
m_VfsService->AddService(Ref<ZenCacheStore>(m_CacheStore));
m_Http->RegisterService(*m_VfsService);
- ZEN_INFO("initializing GC, enabled '{}', interval {}s, lightweight interval {}s",
+ ZEN_INFO("initializing GC, enabled '{}', interval {}, lightweight interval {}",
ServerOptions.GcConfig.Enabled,
- ServerOptions.GcConfig.IntervalSeconds,
- ServerOptions.GcConfig.LightweightIntervalSeconds);
+ NiceTimeSpanMs(ServerOptions.GcConfig.IntervalSeconds * 1000ull),
+ NiceTimeSpanMs(ServerOptions.GcConfig.LightweightIntervalSeconds * 1000ull));
+
GcSchedulerConfig GcConfig{.RootDirectory = m_DataRoot / "gc",
.MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds),
.Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds),
diff --git a/src/zenstore/caslog.cpp b/src/zenstore/caslog.cpp
index 2a978ae12..c04324fbc 100644
--- a/src/zenstore/caslog.cpp
+++ b/src/zenstore/caslog.cpp
@@ -2,16 +2,12 @@
#include <zenstore/caslog.h>
-#include "compactcas.h"
-
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/memory.h>
#include <zencore/string.h>
-#include <zencore/thread.h>
-#include <zencore/uid.h>
#include <xxhash.h>
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
index 4958a27f6..054ac0e56 100644
--- a/src/zenutil/cache/rpcrecording.cpp
+++ b/src/zenutil/cache/rpcrecording.cpp
@@ -1,5 +1,9 @@
// Copyright Epic Games, Inc. All Rights Reserved.
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/filesystem.h>
+#include <zencore/logging.h>
+#include <zencore/system.h>
#include <zenutil/basicfile.h>
#include <zenutil/cache/rpcrecording.h>
@@ -9,6 +13,15 @@ ZEN_THIRD_PARTY_INCLUDES_START
ZEN_THIRD_PARTY_INCLUDES_END
namespace zen::cache {
+
+const RecordedRequestInfo RecordedRequestInfo::NullRequest = {.ContentType = ZenContentType::kUnknownContentType,
+ .AcceptType = ZenContentType::kUnknownContentType,
+ .SessionId = Oid::Zero};
+
+}
+
+namespace zen::cache::v1 {
+
struct RecordedRequest
{
uint64_t Offset;
@@ -17,7 +30,8 @@ struct RecordedRequest
ZenContentType AcceptType;
};
-const uint64_t RecordedRequestBlockSize = 1ull << 31u;
+const uint64_t RecordedRequestBlockSize = 1ull << 31u;
+const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull;
struct RecordedRequestsWriter
{
@@ -32,6 +46,31 @@ struct RecordedRequestsWriter
RwLock::ExclusiveLockScope _(m_Lock);
m_BlockFiles.clear();
+ try
+ {
+ // Emit some metadata alongside the recording
+
+ DateTime EndTime = DateTime::Now();
+ TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()};
+
+ CbObjectWriter Cbo;
+ Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration;
+ Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest);
+ Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold;
+
+ Cbo.BeginObject("system_info");
+ Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName();
+ Describe(GetSystemMetrics(), Cbo);
+ Cbo.EndObject();
+ CbObject Metadata = Cbo.Save();
+
+ WriteFile(m_BasePath / "rpc_recording_metadata.zcb", Metadata.GetBuffer().AsIoBuffer());
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("caught exception while generating metadata for RPC recording: {}", Ex.what());
+ }
+
IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest));
BasicFile IndexFile;
IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate);
@@ -41,15 +80,18 @@ struct RecordedRequestsWriter
m_Entries.clear();
}
- uint64_t WriteRequest(ZenContentType ContentType, ZenContentType AcceptType, const IoBuffer& RequestBuffer)
+ uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
{
RwLock::ExclusiveLockScope Lock(m_Lock);
uint64_t RequestIndex = m_Entries.size();
- RecordedRequest& Entry = m_Entries.emplace_back(
- RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType, .AcceptType = AcceptType});
- if (Entry.Length < 1 * 1024 * 1024)
+ RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull,
+ .Length = RequestBuffer.Size(),
+ .ContentType = RequestInfo.ContentType,
+ .AcceptType = RequestInfo.AcceptType});
+
+ if (Entry.Length < StandaloneFileSizeThreshold)
{
- uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
+ const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
if (BlockIndex == m_BlockFiles.size())
{
std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
@@ -92,6 +134,7 @@ struct RecordedRequestsWriter
std::vector<RecordedRequest> m_Entries;
std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
uint64_t m_ChunkOffset;
+ zen::DateTime m_StartTime = DateTime::Now();
};
struct RecordedRequestsReader
@@ -128,26 +171,31 @@ struct RecordedRequestsReader
}
void EndRead() { m_BlockFiles.clear(); }
- std::pair<ZenContentType, ZenContentType> ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
+ RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
{
if (RequestIndex >= m_Entries.size())
{
- return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType};
+ return RecordedRequestInfo::NullRequest;
}
+
const RecordedRequest& Entry = m_Entries[RequestIndex];
if (Entry.Length == 0)
{
- return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType};
+ return RecordedRequestInfo::NullRequest;
}
+
if (Entry.Offset != ~0ull)
{
uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
- return {Entry.ContentType, Entry.AcceptType};
}
- OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
- return {Entry.ContentType, Entry.AcceptType};
+ else
+ {
+ OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
+ }
+
+ return {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Oid::Zero};
}
std::filesystem::path m_BasePath;
@@ -162,14 +210,13 @@ public:
virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
private:
- virtual uint64_t RecordRequest(const ZenContentType ContentType,
- const ZenContentType AcceptType,
- const IoBuffer& RequestBuffer) override
+ virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
{
- return m_RecordedRequests.WriteRequest(ContentType, AcceptType, RequestBuffer);
+ return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
}
- virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
- virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
+ virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
+ virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
+
RecordedRequestsWriter m_RecordedRequests;
};
@@ -185,7 +232,7 @@ public:
private:
virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
- virtual std::pair<ZenContentType, ZenContentType> GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
+ virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
{
return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
}
@@ -195,16 +242,628 @@ private:
RecordedRequestsReader m_RequestBuffer;
};
+} // namespace zen::cache::v1
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen::cache::v2 {
+
+using namespace std::literals;
+
+struct RecordedRequest
+{
+ uint32_t Offset; // 4 bytes
+ uint32_t Length; // 4 bytes
+ ZenContentType ContentType; // 1 byte
+ ZenContentType AcceptType; // 1 byte
+ uint8_t Padding; // 1 byte
+ uint8_t Padding2; // 1 byte
+ Oid SessionId; // 12 bytes
+};
+
+static_assert(sizeof(RecordedRequest) == 24);
+
+const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB
+const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; // 1MiB
+const uint64_t SegmentRequestCount = 10 * 1000 * 1000;
+const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the
+ // number of files in a directory below this level
+ // for performance
+const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024;
+const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0};
+
+struct RecordedRequestsSegmentWriter
+{
+ RecordedRequestsSegmentWriter() = default;
+ ~RecordedRequestsSegmentWriter() = default;
+
+ RecordedRequestsSegmentWriter(const RecordedRequestsSegmentWriter&) = delete;
+ RecordedRequestsSegmentWriter& operator=(const RecordedRequestsSegmentWriter&) = delete;
+
+ void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex);
+ uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
+ void EndWrite();
+
+ inline uint64_t GetRequestCount() const
+ {
+ if (m_RequestCount)
+ {
+ return m_RequestCount;
+ }
+
+ return m_Entries.size();
+ }
+ inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; }
+ inline uint64_t GetSegmentIndex() const { return m_SegmentIndex; }
+ inline uint64_t GetFileCount() const { return m_FileCount; }
+ inline uint64_t GetRequestsByteCount() const { return m_RequestsByteCount; }
+ inline DateTime GetStartTime() const { return m_StartTime; }
+ inline DateTime GetEndTime() const { return m_EndTime; }
+
+private:
+ std::filesystem::path m_BasePath;
+ uint64_t m_SegmentIndex = 0;
+ uint64_t m_RequestBaseIndex = 0;
+ uint64_t m_RequestCount = 0;
+ std::atomic_uint64_t m_FileCount{};
+ std::atomic_uint64_t m_RequestsByteCount{};
+ mutable RwLock m_Lock;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
+ uint64_t m_ChunkOffset;
+ DateTime m_StartTime = DateTime::Now();
+ DateTime m_EndTime = DateTime::Now();
+};
+
+struct RecordedRequestsWriter
+{
+ void BeginWrite(const std::filesystem::path& BasePath);
+ RecordedRequestsSegmentWriter& EnsureCurrentSegment();
+ void CommitCurrentSegment(RwLock::ExclusiveLockScope&);
+ void EndWrite();
+ uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
+
+private:
+ std::filesystem::path m_BasePath;
+ zen::DateTime m_StartTime = DateTime::Now();
+
+ mutable RwLock m_Lock;
+ std::unique_ptr<RecordedRequestsSegmentWriter> m_CurrentWriter;
+ std::vector<std::unique_ptr<RecordedRequestsSegmentWriter>> m_FinishedSegments;
+ std::vector<uint64_t> m_SegmentBaseIndex;
+ uint64_t m_NextSegmentBaseIndex = 0;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+struct RecordedRequestsSegmentReader
+{
+ RecordedRequestsSegmentReader() = default;
+
+ RecordedRequestsSegmentReader(const RecordedRequestsSegmentReader&) = delete;
+ RecordedRequestsSegmentReader& operator=(const RecordedRequestsSegmentReader&) = delete;
+
+ uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory);
+ void EndRead();
+ RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const;
+
+private:
+ std::filesystem::path m_BasePath;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<IoBuffer> m_BlockFiles;
+};
+
+struct RecordedRequestsReader
+{
+ uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory);
+ void EndRead();
+ RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const;
+
+private:
+ struct SegmentInfo
+ {
+ uint64_t SegmentIndex;
+ uint64_t BaseRequestIndex;
+ uint64_t RequestCount;
+ uint64_t RequestBytes;
+ DateTime StartTime{0};
+ DateTime EndTime{0};
+ };
+
+ bool m_InMemory = false;
+ std::filesystem::path m_BasePath;
+ std::vector<SegmentInfo> m_KnownSegments;
+
+ mutable RwLock m_SegmentLock;
+ mutable std::vector<std::unique_ptr<RecordedRequestsSegmentReader>> m_SegmentReaders;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+RecordedRequestsSegmentWriter::BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex)
+{
+ m_BasePath = BasePath;
+ m_SegmentIndex = SegmentIndex;
+ m_RequestBaseIndex = RequestBaseIndex;
+ std::filesystem::create_directories(m_BasePath);
+}
+
+void
+RecordedRequestsSegmentWriter::EndWrite()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_RequestCount = m_Entries.size();
+ m_BlockFiles.clear();
+
+ // Emit some metadata alongside the recording
+
+ try
+ {
+ m_EndTime = DateTime::Now();
+ TimeSpan Duration{m_EndTime.GetTicks() - m_StartTime.GetTicks()};
+
+ CbObjectWriter Cbo;
+ Cbo << "time_start" << m_StartTime << "time_end" << m_EndTime << "duration" << Duration;
+ Cbo << "segment_index" << m_SegmentIndex;
+ Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest);
+ Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold;
+
+ Cbo.BeginObject("system_info");
+ Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName();
+ Cbo.EndObject();
+ CbObject Metadata = Cbo.Save();
+
+ WriteFile(m_BasePath / "rpc_segment_info.zcb", Metadata.GetBuffer().AsIoBuffer());
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("caught exception while writing segment metadata for RPC recording: {}", Ex.what());
+ }
+
+ IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest));
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ IndexFile.WriteAll(IndexBuffer, Ec);
+ IndexFile.Close();
+ m_Entries.clear();
+}
+
+uint64_t
+RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
+{
+ const uint64_t RequestBufferSize = RequestBuffer.GetSize();
+
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ uint64_t RequestIndex = m_Entries.size();
+ RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u,
+ .Length = uint32_t(RequestBufferSize & 0xffffFFFFu),
+ .ContentType = RequestInfo.ContentType,
+ .AcceptType = RequestInfo.AcceptType,
+ .Padding = 0,
+ .Padding2 = 0,
+ .SessionId = RequestInfo.SessionId});
+
+ if (Entry.Length < StandaloneFileSizeThreshold)
+ {
+ const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
+
+ if (BlockIndex == m_BlockFiles.size())
+ {
+ std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
+ NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
+ m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
+ ++m_FileCount;
+ }
+
+ ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
+ BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
+ ZEN_ASSERT(BlockFile != nullptr);
+
+ Entry.Offset = uint32_t(m_ChunkOffset & 0xffffFFFF);
+ m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u);
+ Lock.ReleaseNow();
+
+ std::error_code Ec;
+ BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec);
+ if (Ec)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+
+ m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
+
+ return RequestIndex;
+ }
+ Lock.ReleaseNow();
+
+ // Write request data to standalone file
+
+ try
+ {
+ BasicFile RequestFile;
+ RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
+
+ if (RequestBufferSize > std::numeric_limits<uint32_t>::max())
+ {
+ // The exact value of the entry is not important, we will use
+ // the size of the standalone file regardless when performing
+ // the read
+ Entry.Length = std::numeric_limits<uint32_t>::max();
+ }
+
+ ++m_FileCount;
+
+ std::error_code Ec;
+ RequestFile.WriteAll(RequestBuffer, Ec);
+
+ if (Ec)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+
+ m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
+
+ return RequestIndex;
+ }
+ catch (std::exception&)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+}
+
+uint64_t
+RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory)
+{
+ m_BasePath = BasePath;
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead);
+ m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest));
+ IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0);
+ uint64_t MaxChunkPosition = 0;
+ for (const RecordedRequest& R : m_Entries)
+ {
+ if (R.Offset != ~0u)
+ {
+ MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length);
+ }
+ }
+ uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1;
+ m_BlockFiles.resize(BlockCount);
+ for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex)
+ {
+ if (InMemory)
+ {
+ BasicFile Chunk;
+ Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead);
+ m_BlockFiles[BlockIndex] = Chunk.ReadAll();
+ continue;
+ }
+ m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex));
+ }
+ return m_Entries.size();
+}
+void
+RecordedRequestsSegmentReader::EndRead()
+{
+ m_BlockFiles.clear();
+}
+
+RecordedRequestInfo
+RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
+{
+ if (RequestIndex >= m_Entries.size())
+ {
+ return RecordedRequestInfo::NullRequest;
+ }
+ const RecordedRequest& Entry = m_Entries[RequestIndex];
+ if (Entry.Length == 0)
+ {
+ return RecordedRequestInfo::NullRequest;
+ }
+
+ RecordedRequestInfo RequestInfo = {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Entry.SessionId};
+
+ if (Entry.Offset != ~0u)
+ {
+ // Inline in block file
+ uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
+ uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
+ OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
+
+ return RequestInfo;
+ }
+
+ // Standalone file
+ OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
+
+ return RequestInfo;
+}
+
+void
+RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath)
+{
+ m_BasePath = BasePath;
+ EnsureCurrentSegment();
+}
+
+RecordedRequestsSegmentWriter&
+RecordedRequestsWriter::EnsureCurrentSegment()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (m_CurrentWriter)
+ {
+ bool StartNewSegment = false;
+
+ if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount)
+ {
+ ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount);
+ StartNewSegment = true;
+ }
+ else if (m_CurrentWriter->GetFileCount() >= LooseFileThreshold)
+ {
+ ZEN_DEBUG("starting new RPC recording segment due to file count >= {}", LooseFileThreshold);
+ StartNewSegment = true;
+ }
+ else if (m_CurrentWriter->GetRequestsByteCount() >= SegmentByteThreshold)
+ {
+ ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold);
+ StartNewSegment = true;
+ }
+
+ if (StartNewSegment)
+ {
+ CommitCurrentSegment(_);
+ }
+ }
+
+ if (!m_CurrentWriter)
+ {
+ const uint64_t SegmentIndex = m_FinishedSegments.size();
+ m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>();
+
+ m_CurrentWriter->BeginWrite(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex);
+ }
+
+ return *m_CurrentWriter;
+}
+
+void
+RecordedRequestsWriter::CommitCurrentSegment(RwLock::ExclusiveLockScope&)
+{
+ if (m_CurrentWriter)
+ {
+ RecordedRequestsSegmentWriter& Writer = *m_CurrentWriter;
+ m_FinishedSegments.push_back(std::move(m_CurrentWriter));
+ m_SegmentBaseIndex.push_back(m_NextSegmentBaseIndex);
+ m_NextSegmentBaseIndex += Writer.GetRequestCount();
+
+ Writer.EndWrite();
+ }
+}
+
+void
+RecordedRequestsWriter::EndWrite()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ CommitCurrentSegment(_);
+
+ // Emit some metadata alongside the recording
+
+ try
+ {
+ DateTime EndTime = DateTime::Now();
+ TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()};
+
+ CbObjectWriter Cbo;
+ Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration << "format_version" << 2;
+
+ Cbo.BeginObject("system_info");
+ Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName();
+ Cbo << "segment_count" << m_FinishedSegments.size();
+ Cbo << "block_size" << RecordedRequestBlockSize;
+ Cbo << "standalone_threshold" << StandaloneFileSizeThreshold;
+ Cbo << "segment_request_count" << SegmentRequestCount;
+ Cbo << "segment_file_threshold" << LooseFileThreshold;
+ Cbo << "segment_byte_threshold" << SegmentByteThreshold;
+
+ Describe(GetSystemMetrics(), Cbo);
+ Cbo.EndObject();
+
+ Cbo.BeginArray("segments");
+
+ for (auto& Segment : m_FinishedSegments)
+ {
+ Cbo.BeginObject();
+ Cbo << "segment" << Segment->GetSegmentIndex();
+ Cbo << "base_index" << Segment->GetBaseRequestIndex();
+ Cbo << "request_count" << Segment->GetRequestCount();
+ Cbo << "request_bytes" << Segment->GetRequestsByteCount();
+ Cbo << "start_time" << Segment->GetStartTime();
+ Cbo << "end_time" << Segment->GetEndTime();
+ Cbo.EndObject();
+ }
+
+ Cbo.EndArray();
+
+ CbObject Metadata = Cbo.Save();
+
+ WriteFile(m_BasePath / "rpc_recording_info.zcb", Metadata.GetBuffer().AsIoBuffer());
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("caught exception while writing metadata for RPC recording: {}", Ex.what());
+ }
+}
+
+uint64_t
+RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
+{
+ RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
+
+ const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer);
+
+ return Writer.GetBaseRequestIndex() + SegmentLocalIndex;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+uint64_t
+RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory)
+{
+ m_InMemory = InMemory;
+ m_BasePath = BasePath;
+
+ BasicFile InfoFile;
+ InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead);
+ CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll());
+
+ uint64_t TotalRequestCount = 0;
+ uint64_t MaxSegmentIndex = 0;
+
+ for (auto SegmentElement : CbInfo["segments"])
+ {
+ CbObjectView Segment = SegmentElement.AsObjectView();
+
+ const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(),
+ .BaseRequestIndex = Segment["base_index"sv].AsUInt64(),
+ .RequestCount = Segment["request_count"sv].AsUInt64(),
+ .RequestBytes = Segment["request_bytes"sv].AsUInt64(),
+ .StartTime = Segment["start_time"sv].AsDateTime(),
+ .EndTime = Segment["end_time"sv].AsDateTime()});
+
+ TotalRequestCount += Info.RequestCount;
+ MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
+ }
+
+ m_SegmentReaders.resize(MaxSegmentIndex + 1);
+
+ return TotalRequestCount;
+}
+
+void
+RecordedRequestsReader::EndRead()
+{
+ RwLock::ExclusiveLockScope _(m_SegmentLock);
+ m_KnownSegments.clear();
+ m_SegmentReaders.clear();
+ m_BasePath.clear();
+}
+
+RecordedRequestInfo
+RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
+{
+ auto EnsureSegment = [&](uint64_t SegmentIndex) -> const RecordedRequestsSegmentReader& {
+ {
+ RwLock::SharedLockScope _(m_SegmentLock);
+
+ if (auto SegmentReaderPtr = m_SegmentReaders[SegmentIndex].get())
+ {
+ return *SegmentReaderPtr;
+ }
+ }
+
+ RwLock::ExclusiveLockScope _(m_SegmentLock);
+
+ auto& SegmentReaderPtr = m_SegmentReaders[SegmentIndex];
+
+ if (!SegmentReaderPtr)
+ {
+ RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader;
+ NewSegment->BeginRead(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), m_InMemory);
+ SegmentReaderPtr.reset(NewSegment);
+ }
+
+ return *(SegmentReaderPtr.get());
+ };
+
+ for (auto& Segment : m_KnownSegments)
+ {
+ if (Segment.RequestCount > RequestIndex)
+ {
+ return EnsureSegment(Segment.SegmentIndex).ReadRequest(RequestIndex, OutBuffer);
+ }
+ else
+ {
+ RequestIndex -= Segment.RequestCount;
+ }
+ }
+
+ return RecordedRequestInfo::NullRequest;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+class DiskRequestRecorder : public IRpcRequestRecorder
+{
+public:
+ DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
+ virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
+
+private:
+ virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
+ {
+ return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
+ }
+ virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
+ virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
+
+ RecordedRequestsWriter m_RecordedRequests;
+};
+
+class DiskRequestReplayer : public IRpcRequestReplayer
+{
+public:
+ DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
+ {
+ m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory);
+ }
+ virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
+
+ static bool IsCompatible(const std::filesystem::path& BasePath) { return std::filesystem::exists(BasePath / "rpc_recording_info.zcb"); }
+
+private:
+ virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
+
+ virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
+ {
+ return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
+ }
+ virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
+
+ std::uint64_t m_RequestCount;
+ RecordedRequestsReader m_RequestBuffer;
+};
+
+} // namespace zen::cache::v2
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen::cache {
+
std::unique_ptr<cache::IRpcRequestRecorder>
MakeDiskRequestRecorder(const std::filesystem::path& BasePath)
{
- return std::make_unique<DiskRequestRecorder>(BasePath);
+ return std::make_unique<v2::DiskRequestRecorder>(BasePath);
}
std::unique_ptr<cache::IRpcRequestReplayer>
MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
{
- return std::make_unique<DiskRequestReplayer>(BasePath, InMemory);
+ if (v2::DiskRequestReplayer::IsCompatible(BasePath))
+ {
+ return std::make_unique<v2::DiskRequestReplayer>(BasePath, InMemory);
+ }
+ else
+ {
+ return std::make_unique<v1::DiskRequestReplayer>(BasePath, InMemory);
+ }
}
} // namespace zen::cache
diff --git a/src/zenutil/include/zenutil/cache/rpcrecording.h b/src/zenutil/include/zenutil/cache/rpcrecording.h
index 6d65a532a..fd5df26ad 100644
--- a/src/zenutil/include/zenutil/cache/rpcrecording.h
+++ b/src/zenutil/include/zenutil/cache/rpcrecording.h
@@ -6,21 +6,32 @@
#include <zencore/iobuffer.h>
namespace zen::cache {
+
+struct RecordedRequestInfo
+{
+ ZenContentType ContentType;
+ ZenContentType AcceptType;
+ Oid SessionId;
+
+ static const RecordedRequestInfo NullRequest;
+};
+
class IRpcRequestRecorder
{
public:
virtual ~IRpcRequestRecorder() {}
- virtual uint64_t RecordRequest(const ZenContentType ContentType, const ZenContentType AcceptType, const IoBuffer& RequestBuffer) = 0;
- virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const IoBuffer& ResponseBuffer) = 0;
- virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const CompositeBuffer& ResponseBuffer) = 0;
+ virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) = 0;
+ virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const IoBuffer& ResponseBuffer) = 0;
+ virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const CompositeBuffer& ResponseBuffer) = 0;
};
+
class IRpcRequestReplayer
{
public:
virtual ~IRpcRequestReplayer() {}
- virtual uint64_t GetRequestCount() const = 0;
- virtual std::pair<ZenContentType, ZenContentType> GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0;
- virtual ZenContentType GetResponse(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0;
+ virtual uint64_t GetRequestCount() const = 0;
+ virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0;
+ virtual ZenContentType GetResponse(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0;
};
std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath);