// Copyright Epic Games, Inc. All Rights Reserved. #include "rpcreplay_cmd.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #include #include namespace zen { using namespace std::literals; RpcStartRecordingCommand::RpcStartRecordingCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), ""); m_Options.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), ""); m_Options.parse_positional("path"); } RpcStartRecordingCommand::~RpcStartRecordingCommand() = default; void RpcStartRecordingCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { ZEN_UNUSED(GlobalOptions, argc, argv); if (!ParseOptions(argc, argv)) { return; } m_HostName = ResolveTargetHostSpec(m_HostName); if (m_HostName.empty()) { throw OptionParseException("Unable to resolve server specification", m_Options.help()); } if (m_RecordingPath.empty()) { throw OptionParseException("'--path' is required", m_Options.help()); } HttpClient Http(m_HostName); if (HttpClient::Response Response = Http.Post("/z$/exec$/start-recording"sv, HttpClient::KeyValueMap{}, HttpClient::KeyValueMap({{"path", m_RecordingPath}}))) { ZEN_CONSOLE("{}", Response.ToText()); } else { Response.ThrowError("Failed to start recording"); } } //////////////////////////////////////////////////// RpcStopRecordingCommand::RpcStopRecordingCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), ""); } RpcStopRecordingCommand::~RpcStopRecordingCommand() = default; void RpcStopRecordingCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { ZEN_UNUSED(GlobalOptions, argc, argv); if (!ParseOptions(argc, argv)) { return; } m_HostName = ResolveTargetHostSpec(m_HostName); if (m_HostName.empty()) { throw OptionParseException("Unable to resolve server specification", m_Options.help()); } HttpClient Http(m_HostName); if (HttpClient::Response Response = Http.Post("/z$/exec$/stop-recording"sv)) { ZEN_CONSOLE("{}", Response.ToText()); } else { Response.ThrowError("Failed to stop recording"); } } //////////////////////////////////////////////////// RpcReplayCommand::RpcReplayCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), ""); m_Options.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), ""); m_Options.add_option("", "", "dry", "Do a dry run", cxxopts::value(m_DryRun), ""); m_Options.add_option("", "w", "numthreads", "Number of worker threads per process", cxxopts::value(m_ThreadCount)->default_value(fmt::format("{}", std::thread::hardware_concurrency())), ""); m_Options.add_option("", "", "onhost", "Replay on host, bypassing http/network layer", cxxopts::value(m_OnHost), ""); m_Options.add_option("", "", "showmethodstats", "Show statistics of which RPC methods are used", cxxopts::value(m_ShowMethodStats), ""); m_Options.add_option("", "", "offset", "Offset into request recording to start replay", cxxopts::value(m_Offset)->default_value("0"), ""); m_Options.add_option("", "", "stride", "Stride for request recording when replaying requests", cxxopts::value(m_Stride)->default_value("1"), ""); m_Options.add_option("", "", "numproc", "Number of worker processes", cxxopts::value(m_ProcessCount)->default_value("1"), ""); m_Options.add_option("", "", "forceallowlocalrefs", "Force enable local refs in requests", cxxopts::value(m_ForceAllowLocalRefs), ""); m_Options .add_option("", "", "disablelocalrefs", "Force disable local refs in requests", cxxopts::value(m_DisableLocalRefs), ""); m_Options.add_option("", "", "forceallowlocalhandlerefs", "Force enable local refs as handles in requests", cxxopts::value(m_ForceAllowLocalHandleRef), ""); m_Options.add_option("", "", "disablelocalhandlerefs", "Force disable local refs as handles in requests", cxxopts::value(m_DisableLocalHandleRefs), ""); m_Options.add_option("", "", "forceallowpartiallocalrefs", "Force enable local refs for all sizes", cxxopts::value(m_ForceAllowPartialLocalRefs), ""); m_Options.add_option("", "", "disablepartiallocalrefs", "Force disable local refs for all sizes", cxxopts::value(m_DisablePartialLocalRefs), ""); m_Options.parse_positional("path"); } RpcReplayCommand::~RpcReplayCommand() = default; void RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { ZEN_UNUSED(GlobalOptions, argc, argv); if (!ParseOptions(argc, argv)) { return; } m_HostName = ResolveTargetHostSpec(m_HostName); if (m_HostName.empty()) { throw OptionParseException("Unable to resolve server specification", m_Options.help()); } if (m_RecordingPath.empty()) { throw OptionParseException("'--path' is required", m_Options.help()); } if (!IsDir(m_RecordingPath)) { throw std::runtime_error(fmt::format("could not find recording at '{}'", m_RecordingPath)); } m_ThreadCount = Max(m_ThreadCount, 1); ZEN_CONSOLE("Replay '{}' (start offset {}, stride {}) to '{}', {} threads", m_RecordingPath, m_Offset, m_Stride, m_HostName, m_ThreadCount); Stopwatch TotalTimer; if (m_OnHost) { HttpClient Http(m_HostName); if (HttpClient::Response Response = Http.Post("/z$/exec$/replay-recording"sv, HttpClient::KeyValueMap{}, HttpClient::KeyValueMap({{"path", m_RecordingPath}, {"thread-count", fmt::format("{}", m_ThreadCount)}}))) { ZEN_CONSOLE("{}", Response.ToText()); return; } else { Response.ThrowError("Failed to start replay"); } } std::unique_ptr Replayer = cache::MakeDiskRequestReplayer(m_RecordingPath, true); uint64_t EntryCount = Replayer->GetRequestCount(); std::atomic_uint64_t EntryOffset = m_Offset; std::atomic_uint64_t BytesSent = 0; std::atomic_uint64_t BytesReceived = 0; Stopwatch Timer; if (m_ProcessCount > 1) { std::vector> WorkerProcesses; WorkerProcesses.resize(m_ProcessCount); ProcessMonitor Monitor; for (int ProcessIndex = 0; ProcessIndex < m_ProcessCount; ++ProcessIndex) { std::string CommandLine = fmt::format("{} rpc-record-replay --hosturl {} --path \"{}\" --offset {} --stride {} --numthreads {} --numproc {}"sv, argv[0], m_HostName, m_RecordingPath, m_Stride == 1 ? 0 : m_Offset + ProcessIndex, m_Stride, m_ThreadCount, 1); CreateProcResult Result(CreateProc(std::filesystem::path(std::string(argv[0])), CommandLine)); WorkerProcesses[ProcessIndex] = std::make_unique(); WorkerProcesses[ProcessIndex]->Initialize(Result); Monitor.AddPid(WorkerProcesses[ProcessIndex]->Pid()); } while (Monitor.IsRunning()) { ZEN_CONSOLE("Waiting for worker processes..."); Sleep(1000); } return; } else { std::map MethodTypes; RwLock MethodTypesLock; WorkerThreadPool WorkerPool(m_ThreadCount); Latch WorkLatch(m_ThreadCount); for (int WorkerIndex = 0; WorkerIndex < m_ThreadCount; ++WorkerIndex) { WorkerPool.ScheduleWork( [this, &WorkLatch, EntryCount, &EntryOffset, &Replayer, &BytesSent, &BytesReceived, &MethodTypes, &MethodTypesLock]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); std::map LocalMethodTypes; auto ReduceTypes = MakeGuard([&] { RwLock::ExclusiveLockScope __(MethodTypesLock); for (auto& Entry : LocalMethodTypes) { MethodTypes[Entry.first] += Entry.second; } }); HttpClient Http{m_HostName}; uint64_t EntryIndex = EntryOffset.fetch_add(m_Stride); while (EntryIndex < EntryCount) { IoBuffer Payload; const zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload); if (RequestInfo != zen::cache::RecordedRequestInfo::NullRequest) { CbPackage RequestPackage; CbObject Request; switch (RequestInfo.ContentType) { case ZenContentType::kCbPackage: if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage)) { Request = RequestPackage.GetObject(); } break; case ZenContentType::kCbObject: Request = LoadCompactBinaryObject(Payload); break; } RpcAcceptOptions OriginalAcceptOptions = static_cast(Request["AcceptFlags"sv].AsUInt16(0u)); int OriginalProcessPid = Request["Pid"sv].AsInt32(0); int AdjustedPid = 0; RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone; if (!m_DisableLocalRefs) { if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || m_ForceAllowLocalRefs) { AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences; if (!m_DisablePartialLocalRefs) { if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) || m_ForceAllowPartialLocalRefs) { AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences; } } if (!m_DisableLocalHandleRefs) { if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef) { AdjustedPid = GetCurrentProcessId(); } } } } if (m_ShowMethodStats) { std::string MethodName = std::string(Request["Method"sv].AsString()); if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end()) { It->second++; } else { LocalMethodTypes[MethodName] = 1; } } if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid) { CbObjectWriter RequestCopyWriter; for (const CbFieldView& Field : Request) { if (!Field.HasName()) { RequestCopyWriter.AddField(Field); continue; } std::string_view FieldName = Field.GetName(); if (FieldName == "Pid"sv) { continue; } if (FieldName == "AcceptFlags"sv) { continue; } RequestCopyWriter.AddField(FieldName, Field); } if (AdjustedPid != 0) { RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid); } if (AdjustedAcceptOptions != RpcAcceptOptions::kNone) { RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast(AdjustedAcceptOptions)); } if (RequestInfo.ContentType == ZenContentType::kCbPackage) { RequestPackage.SetObject(RequestCopyWriter.Save()); std::vector Buffers = FormatPackageMessage(RequestPackage); std::vector SharedBuffers(Buffers.begin(), Buffers.end()); Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer(); } else { RequestCopyWriter.Finalize(); Payload = IoBuffer(RequestCopyWriter.GetSaveSize()); RequestCopyWriter.Save(Payload.GetMutableView()); } } if (!m_DryRun) { Http.SetSessionId(RequestInfo.SessionId); Payload.SetContentType(RequestInfo.ContentType); HttpClient::Response Response = Http.Post("/z$/$rpc", Payload, {HttpClient::Accept(RequestInfo.AcceptType)}); BytesSent.fetch_add(Payload.GetSize()); if (!Response) { ZEN_CONSOLE_ERROR("{}", Response); break; } BytesReceived.fetch_add(Response.DownloadedBytes); } } EntryIndex = EntryOffset.fetch_add(m_Stride); } }, WorkerThreadPool::EMode::EnableBacklog); } while (!WorkLatch.Wait(1000)) { const uint64_t RequestsTotal = (EntryCount - m_Offset) / m_Stride; const uint64_t RequestsRemaining = (EntryCount - EntryOffset.load()) / m_Stride; ZEN_CONSOLE("[{:3}%] [{}] {} requests, {} remaining (sent {}, received {})", (RequestsTotal - RequestsRemaining) * 100 / RequestsTotal, NiceTimeSpanMs(Timer.GetElapsedTimeMs()), RequestsTotal, RequestsRemaining, NiceBytes(BytesSent.load()), NiceBytes(BytesReceived.load())); } if (m_ShowMethodStats) { for (const auto& It : MethodTypes) { ZEN_CONSOLE("{:18}: {:10}", It.first, It.second); } } } const uint64_t RequestsSent = (EntryOffset.load() - m_Offset) / m_Stride; const uint64_t ElapsedMS = Timer.GetElapsedTimeMs(); const uint64_t Sent = BytesSent.load(); const uint64_t Received = BytesReceived.load(); ZEN_CONSOLE("Processed requests: {} ({}), payloads sent {} ({}), payloads received {} ({}) in {}.\nTotal runtime: {}", RequestsSent, NiceRate(RequestsSent, ElapsedMS, "req"), NiceBytes(Sent), NiceByteRate(Sent, ElapsedMS), NiceBytes(Received), NiceByteRate(Received, ElapsedMS), NiceTimeSpanMs(ElapsedMS), NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs())); } ////////////////////////////////////////////////////////////////////////// RpcAnalyzeCommand::RpcAnalyzeCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), ""); m_Options.add_option("", "o", "output", "Report directory path", cxxopts::value(m_ReportPath), ""); m_Options.add_option("", "", "clean", "Clean output directory", cxxopts::value(m_Clean), ""); m_Options.parse_positional("path"); m_Options.parse_positional("output"); } RpcAnalyzeCommand::~RpcAnalyzeCommand() = default; void RpcAnalyzeCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { ZEN_UNUSED(GlobalOptions, argc, argv); if (!ParseOptions(argc, argv)) { return; } if (m_RecordingPath.empty()) { throw OptionParseException("Rpc analyze command requires a recording path", m_Options.help()); } if (m_ReportPath.empty()) { throw OptionParseException("Rpc analyze command requires an output path", m_Options.help()); } if (!std::filesystem::exists(m_RecordingPath) || !std::filesystem::is_directory(m_RecordingPath)) { throw std::runtime_error(fmt::format("could not find recording at '{}'", m_RecordingPath)); } std::filesystem::path TargetPath = m_ReportPath; ZenCacheStore::Configuration Config; Config.AllowAutomaticCreationOfNamespaces = true; struct DummyClient : public UpstreamCacheClient { virtual bool IsActive() override { return false; } virtual void GetCacheValues(std::string_view Namespace, std::span CacheValueRequests, OnCacheValueGetComplete&& OnComplete) override { ZEN_UNUSED(Namespace, CacheValueRequests, OnComplete); return; } virtual void GetCacheRecords(std::string_view Namespace, std::span Requests, OnCacheRecordGetComplete&& OnComplete) override { ZEN_UNUSED(Namespace, Requests, OnComplete); return; } virtual void GetCacheChunks(std::string_view Namespace, std::span CacheChunkRequests, OnCacheChunksGetComplete&& OnComplete) override { ZEN_UNUSED(Namespace, CacheChunkRequests, OnComplete); return; } virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { ZEN_UNUSED(CacheRecord); return; } }; if (m_Clean) { ZEN_CONSOLE("cleaning '{}'...", TargetPath); std::error_code Ec; CleanDirectory(TargetPath, true, Ec); if (Ec) { ZEN_CONSOLE_WARN("clean failed: '{}'", Ec.message()); } } GcManager Gc; std::unique_ptr Jq = MakeJobQueue(2, "cache_jobs"); Ref Store(new ZenCacheStore(Gc, *Jq, TargetPath / "cache", Config, nullptr)); CidStore Cids{Gc}; CidStoreConfiguration CidConfig; CidConfig.RootDirectory = TargetPath / "cas"; Cids.Initialize(CidConfig); DummyClient DummyUpstream; CacheStats Stats; CacheRpcHandler RpcHandler{logging::Default(), Stats, DummyUpstream, *Store, Cids, nullptr}; std::unique_ptr Replayer = cache::MakeDiskRequestReplayer(m_RecordingPath, true); const uint64_t EntryCount = Replayer->GetRequestCount(); Stopwatch Timer; std::set RequestMethods; int MaxBatchSeen = 0; uint64_t RequestsSeen = 0; uint64_t InsertedChunkCount = 0; uint64_t InsertedDataBytes = 0; for (uint64_t EntryIndex = 0; EntryIndex < EntryCount; ++EntryIndex) { IoBuffer Payload; const zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload); if (RequestInfo == zen::cache::RecordedRequestInfo::NullRequest) { continue; } CbPackage RequestPackage; CbObject Request; std::unordered_map Attachments; std::vector Hashes; std::vector AttachmentData; auto OnAttachment = [&](const IoHash& Hash, const CompressedBuffer& Data) { Attachments[Hash] = std::move(Data); Hashes.push_back(Hash); AttachmentData.emplace_back(Data.GetCompressed().ToShared().AsIoBuffer()); }; switch (RequestInfo.ContentType) { case ZenContentType::kCbPackage: if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage)) { Request = RequestPackage.GetObject(); for (const auto& Attachment : RequestPackage.GetAttachments()) { const auto& Hash = Attachment.GetHash(); const auto& Binary = Attachment.AsCompressedBinary(); OnAttachment(Hash, Binary); } } break; case ZenContentType::kCbObject: if (ValidateCompactBinary(Payload, CbValidateMode::All) == CbValidateError::None) { Request = LoadCompactBinaryObject(Payload); } break; } if (!Request) { continue; } if (!Hashes.empty()) { auto Results = Cids.AddChunks(AttachmentData, Hashes); for (int i = 0; i < Results.size(); ++i) { if (Results[i].New) { InsertedChunkCount += 1; InsertedDataBytes += AttachmentData[i].GetSize(); } } } ++RequestsSeen; if (RequestsSeen % 100'000 == 0) { ZEN_CONSOLE("Requests: {:12}", RequestsSeen); } std::string_view MethodString = Request["Method"sv].AsString(); const uint32_t MethodHash = HashStringDjb2(MethodString); if (auto It = RequestMethods.find(MethodHash); It == RequestMethods.end()) { RequestMethods.insert(MethodHash); ExtendableStringBuilder<1024> ObjStr; CompactBinaryToJson(Request, ObjStr); ZEN_CONSOLE("{}", ObjStr); } CbObjectView Params = Request["Params"sv].AsObjectView(); std::string_view Namespace = Params["Namespace"sv].AsString(); int BatchSize = 0; for (auto RequestEntry : Params["Requests"sv]) { auto Key = RequestEntry.AsObjectView()["Key"sv].AsObjectView(); std::string_view Bucket = Key["Bucket"sv].AsString(); const IoHash Hash = Key["Hash"sv].AsHash(); if (Bucket.empty()) { continue; } ++BatchSize; switch (MethodHash) { case "GetCacheValues"_djb2: // ZEN_CONSOLE("GETCACHEVALUES: {}/{}/{}", Namespace, Bucket, Hash); break; case "GetCacheRecords"_djb2: // ZEN_CONSOLE("GETCACHERECORDS: {}/{}/{}", Namespace, Bucket, Hash); break; case "PutCacheValues"_djb2: // ZEN_CONSOLE("PUTCACHEVALUES: {}/{}/{}", Namespace, Bucket, Hash); break; case "PutCacheRecords"_djb2: // ZEN_CONSOLE("PUTCACHERECORDS: {}/{}/{}", Namespace, Bucket, Hash); break; case "GetCacheChunks"_djb2: // ZEN_CONSOLE("GETCACHECHUNKS: {}/{}/{}", Namespace, Bucket, Hash); break; } #if 0 static constexpr uint32_t kGetCacheValues = HashStringAsLowerDjb2("GetCacheValues"sv); static constexpr uint32_t kGetCacheRecords = HashStringAsLowerDjb2("GetCacheRecords"sv); static constexpr uint32_t kPutCacheValues = HashStringAsLowerDjb2("PutCacheValues"sv); static constexpr uint32_t kPutCacheRecords = HashStringAsLowerDjb2("PutCacheRecords"sv); static constexpr uint32_t kGetCacheChunks = HashStringAsLowerDjb2("GetCacheChunks"sv); if (MethodHash == kGetCacheValues) { ZEN_CONSOLE("GETCACHEVALUES: {}/{}/{}", Namespace, Bucket, Hash); /* { "Method": "GetCacheValues", "Accept": 2859969228, "Params": { "DefaultPolicy": "Default,PartialRecord", "Namespace": "ue.ddc", "Requests": [ { "Key": { "Bucket": "AnimationSequence", "Hash": "272757e7b8d4638acb6c725d8538b87dfebb3cf9" } } ] } } */ // Record GET } else if (MethodHash == kGetCacheRecords) { if (CbObjectView Params = RequestEntry["Params"sv].AsObject()) { const std::string_view Namesoace = Params["Namespace"sv].AsString(); CbObjectArrayView Requests = Params["Requests"sv].AsArray(); for (auto Request : Requests) { CbObjectView RequestView{Request}; CbObjectView Key = RequestView["Key"sv]; const std::string_view Bucket = Key["Bucket"sv]; const IoHash Hash = Key["Hash"sv]; ZEN_CONSOLE("{}/{}/{}", Namespace, Bucket, Hash); } } /* { "Method": "GetCacheRecords", "Accept": 2859969228, "Params": { "DefaultPolicy": "Default,PartialRecord", "Namespace": "fortnite.sddc", "Requests": [ { "Key": { "Bucket": "BulkDataList", "Hash": "6a9dd7da26cca613821bf5aa434ce64bcef6712e" } } ] } } */ // Record GET } else if (MethodHash == kPutCacheValues) { /* { "Method": "PutCacheValues", "Accept": 2859969228, "Params": { "DefaultPolicy": "Default", "Namespace": "ue.ddc", "Requests": [ { "Key": { "Bucket": "MaterialShaderMap", "Hash": "bbdb638a34449e8cc3c89efc63431fc7193124ec" }, "RawHash": "2a9d7bab0c776ff299b927d70d3e400851cdddd6" } ] } } */ } else if (MethodHash == kPutCacheRecords) { /* { "Method": "PutCacheRecords", "Accept": 2859969228, "Params": { "DefaultPolicy": "Default", "Namespace": "ue.ddc", "Requests": [ { "Record": { "Key": { "Bucket": "MaterialTranslation", "Hash": "4eaea52861838d09645b228e97bd246cbdb76bac" }, "Values": [ { "Id": "0d586d5a600af5163b955814", "RawHash": "bfef9d6bfabecce61880b1d2f06e6e25838d5b8e", "RawSize": 3609 }, { "Id": "52d979e80b2ccf0dbc303f10", "RawHash": "17cbea1698d9904fd1272261f0a08c221e5e6ba7", "RawSize": 12665 }, { "Id": "a2b11ed5fab981f31e6504df", "RawHash": "0542d53449a8f0e9b8f0c7ac8506bc2f6e060d75", "RawSize": 326 } ] } } ] } } */ } else if (MethodHash == kGetCacheChunks) { /* { "Method": "GetCacheChunks", "Accept": 2859969228, "Params": { "DefaultPolicy": "Default", "Namespace": "fortnite.sddc", "ChunkRequests": [ { "Key": { "Bucket": "StaticMesh", "Hash": "267ab8179533a57eaffec8de3bf032df17c23fc4" }, "ValueId": "7fdeebb90ed07ba4a490149b", "ChunkId": "54897be0cf047e5b14476cdea8390e80deab0a29" } ] } } */ } #endif // ZEN_CONSOLE("key: {}/{}", Bucket, Hash); } if (BatchSize > MaxBatchSeen) { ZEN_CONSOLE("wow {}", BatchSize); ExtendableStringBuilder<1024> ObjStr; CompactBinaryToJson(Request, ObjStr); ZEN_CONSOLE("{}", ObjStr); MaxBatchSeen = BatchSize; } } ZEN_CONSOLE("Total Requests: {:12}, inserted {} bytes in {} chunks", RequestsSeen, InsertedDataBytes, InsertedChunkCount); } } // namespace zen