diff options
Diffstat (limited to 'src')
44 files changed, 1280 insertions, 487 deletions
diff --git a/src/zen/cmds/info_cmd.cpp b/src/zen/cmds/info_cmd.cpp new file mode 100644 index 000000000..aec8ca46b --- /dev/null +++ b/src/zen/cmds/info_cmd.cpp @@ -0,0 +1,51 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "info_cmd.h" + +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/string.h> +#include <zenhttp/httpclient.h> + +using namespace std::literals; + +namespace zen { + +InfoCommand::InfoCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); +} + +InfoCommand::~InfoCommand() +{ +} + +int +InfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw OptionParseException("unable to resolve server specification"); + } + + HttpClient Http(m_HostName); + + if (HttpClient::Response Result = Http.Get("/admin/info", HttpClient::Accept(ZenContentType::kJSON))) + { + ZEN_CONSOLE("{}", Result.AsText()); + } + + return 0; +} + +} // namespace zen diff --git a/src/zen/cmds/info_cmd.h b/src/zen/cmds/info_cmd.h new file mode 100644 index 000000000..9723a075b --- /dev/null +++ b/src/zen/cmds/info_cmd.h @@ -0,0 +1,24 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "../zen.h" + +namespace zen { + +class InfoCommand : public ZenCmdBase +{ +public: + InfoCommand(); + ~InfoCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options& Options() override { return m_Options; } + // virtual ZenCmdCategory& CommandCategory() const override { return g_UtilitiesCategory; } + +private: + cxxopts::Options m_Options{"info", "Show high level zen store information"}; + std::string m_HostName; +}; + +} // namespace zen diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp index 409d3393e..d307ef0e8 100644 --- a/src/zen/cmds/rpcreplay_cmd.cpp +++ b/src/zen/cmds/rpcreplay_cmd.cpp @@ -201,6 +201,8 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw std::runtime_error(fmt::format("could not find recording at '{}'", m_RecordingPath)); } + m_ThreadCount = Max(m_ThreadCount, 1); + Stopwatch TotalTimer; if (m_OnHost) @@ -282,152 +284,156 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) uint64_t EntryIndex = EntryOffset.fetch_add(m_Stride); while (EntryIndex < EntryCount) { - IoBuffer Payload; - zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload); - - CbPackage RequestPackage; - CbObject Request; + IoBuffer Payload; + const zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload); - switch (RequestInfo.ContentType) + if (RequestInfo != zen::cache::RecordedRequestInfo::NullRequest) { - case ZenContentType::kCbPackage: - { - if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage)) + CbPackage RequestPackage; + CbObject Request; + + switch (RequestInfo.ContentType) + { + case ZenContentType::kCbPackage: { - Request = RequestPackage.GetObject(); + if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage)) + { + Request = RequestPackage.GetObject(); + } } - } - break; - case ZenContentType::kCbObject: - { - Request = LoadCompactBinaryObject(Payload); - } - break; - } + break; + case ZenContentType::kCbObject: + { + Request = LoadCompactBinaryObject(Payload); + } + break; + } - RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u)); - int OriginalProcessPid = Request["Pid"sv].AsInt32(0); + RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u)); + int OriginalProcessPid = Request["Pid"sv].AsInt32(0); - int AdjustedPid = 0; - RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone; + int AdjustedPid = 0; + RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone; - if (!m_DisableLocalRefs) - { - if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || m_ForceAllowLocalRefs) + if (!m_DisableLocalRefs) { - AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences; - if (!m_DisablePartialLocalRefs) + if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || + m_ForceAllowLocalRefs) { - if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) || - m_ForceAllowPartialLocalRefs) + AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences; + if (!m_DisablePartialLocalRefs) { - AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences; + if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) || + m_ForceAllowPartialLocalRefs) + { + AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences; + } } - } - if (!m_DisableLocalHandleRefs) - { - if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef) + if (!m_DisableLocalHandleRefs) { - AdjustedPid = GetCurrentProcessId(); + if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef) + { + AdjustedPid = GetCurrentProcessId(); + } } } } - } - if (m_ShowMethodStats) - { - std::string MethodName = std::string(Request["Method"sv].AsString()); - if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end()) - { - It->second++; - } - else + if (m_ShowMethodStats) { - LocalMethodTypes[MethodName] = 1; + std::string MethodName = std::string(Request["Method"sv].AsString()); + if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end()) + { + It->second++; + } + else + { + LocalMethodTypes[MethodName] = 1; + } } - } - if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid) - { - CbObjectWriter RequestCopyWriter; - for (const CbFieldView& Field : Request) + if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid) { - if (!Field.HasName()) + CbObjectWriter RequestCopyWriter; + for (const CbFieldView& Field : Request) { - RequestCopyWriter.AddField(Field); - continue; + if (!Field.HasName()) + { + RequestCopyWriter.AddField(Field); + continue; + } + std::string_view FieldName = Field.GetName(); + if (FieldName == "Pid"sv) + { + continue; + } + if (FieldName == "AcceptFlags"sv) + { + continue; + } + RequestCopyWriter.AddField(FieldName, Field); } - std::string_view FieldName = Field.GetName(); - if (FieldName == "Pid"sv) + if (AdjustedPid != 0) { - continue; + RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid); } - if (FieldName == "AcceptFlags"sv) + if (AdjustedAcceptOptions != RpcAcceptOptions::kNone) { - continue; + RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions)); } - RequestCopyWriter.AddField(FieldName, Field); - } - if (AdjustedPid != 0) - { - RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid); - } - if (AdjustedAcceptOptions != RpcAcceptOptions::kNone) - { - RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions)); - } - if (RequestInfo.ContentType == ZenContentType::kCbPackage) - { - RequestPackage.SetObject(RequestCopyWriter.Save()); - std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage); - std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end()); - Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer(); - } - else - { - RequestCopyWriter.Finalize(); - Payload = IoBuffer(RequestCopyWriter.GetSaveSize()); - RequestCopyWriter.Save(Payload.GetMutableView()); + if (RequestInfo.ContentType == ZenContentType::kCbPackage) + { + RequestPackage.SetObject(RequestCopyWriter.Save()); + std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage); + std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end()); + Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer(); + } + else + { + RequestCopyWriter.Finalize(); + Payload = IoBuffer(RequestCopyWriter.GetSaveSize()); + RequestCopyWriter.Save(Payload.GetMutableView()); + } } - } - - if (!m_DryRun) - { - StringBuilder<32> SessionIdString; - if (RequestInfo.SessionId != Oid::Zero) + if (!m_DryRun) { - RequestInfo.SessionId.ToString(SessionIdString); - } - else - { - GetSessionId().ToString(SessionIdString); - } + StringBuilder<32> SessionIdString; - Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestInfo.ContentType))}, - {"Accept", std::string(MapContentTypeToString(RequestInfo.AcceptType))}, - {"UE-Session", std::string(SessionIdString)}}); - - uint64_t Offset = 0; - auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, Payload.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; - }; - Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - cpr::Response Response = Session.Post(); - BytesSent.fetch_add(Payload.GetSize()); - if (Response.error || !(IsHttpSuccessCode(Response.status_code) || - Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound))) - { - ZEN_CONSOLE("{}", FormatHttpResponse(Response)); - break; + if (RequestInfo.SessionId != Oid::Zero) + { + RequestInfo.SessionId.ToString(SessionIdString); + } + else + { + GetSessionId().ToString(SessionIdString); + } + + Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestInfo.ContentType))}, + {"Accept", std::string(MapContentTypeToString(RequestInfo.AcceptType))}, + {"UE-Session", std::string(SessionIdString)}}); + + uint64_t Offset = 0; + auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); + MutableMemoryView Data(buffer, size); + Data.CopyFrom(PayloadRange.GetView()); + Offset += size; + return true; + }; + Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + cpr::Response Response = Session.Post(); + BytesSent.fetch_add(Payload.GetSize()); + if (Response.error || !(IsHttpSuccessCode(Response.status_code) || + Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound))) + { + ZEN_CONSOLE("{}", FormatHttpResponse(Response)); + break; + } + BytesReceived.fetch_add(Response.downloaded_bytes); } - BytesReceived.fetch_add(Response.downloaded_bytes); } EntryIndex = EntryOffset.fetch_add(m_Stride); diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index c949008ff..10d2f5593 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -10,6 +10,7 @@ #include "cmds/cache_cmd.h" #include "cmds/copy_cmd.h" #include "cmds/dedup_cmd.h" +#include "cmds/info_cmd.h" #include "cmds/print_cmd.h" #include "cmds/projectstore_cmd.h" #include "cmds/rpcreplay_cmd.h" @@ -273,6 +274,7 @@ main(int argc, char** argv) GcStatusCommand GcStatusCmd; GcStopCommand GcStopCmd; ImportOplogCommand ImportOplogCmd; + InfoCommand InfoCmd; JobCommand JobCmd; OplogMirrorCommand OplogMirrorCmd; PrintCommand PrintCmd; @@ -316,6 +318,7 @@ main(int argc, char** argv) {"gc-status", &GcStatusCmd, "Garbage collect zen storage status check"}, {"gc-stop", &GcStopCmd, "Request cancel of running garbage collection in zen storage"}, {"gc", &GcCmd, "Garbage collect zen storage"}, + {"info", &InfoCmd, "Show high level Zen server information"}, {"jobs", &JobCmd, "Show/cancel zen background jobs"}, {"logs", &LoggingCmd, "Show/control zen logging"}, {"oplog-create", &CreateOplogCmd, "Create a project oplog"}, diff --git a/src/zenbase/include/zenbase/refcount.h b/src/zenbase/include/zenbase/refcount.h index 3afcf467c..6ad49cba2 100644 --- a/src/zenbase/include/zenbase/refcount.h +++ b/src/zenbase/include/zenbase/refcount.h @@ -107,6 +107,8 @@ public: Rhs.m_Ref = nullptr; } + inline void Swap(RefPtr& Rhs) noexcept { std::swap(m_Ref, Rhs.m_Ref); } + private: T* m_Ref = nullptr; template<typename U> diff --git a/src/zencore/compactbinary.cpp b/src/zencore/compactbinary.cpp index 9152a8bfc..6677b5a61 100644 --- a/src/zencore/compactbinary.cpp +++ b/src/zencore/compactbinary.cpp @@ -2463,6 +2463,58 @@ TEST_CASE("json.uson") } } +////////////////////////////////////////////////////////////////////////// + +TEST_SUITE_BEGIN("core.datetime"); + +TEST_CASE("core.datetime.compare") +{ + DateTime T1(2000, 12, 13); + DateTime T2(2000, 12, 14); + CHECK(T1 < T2); + CHECK(T2 > T1); + CHECK(T1 == T1); + CHECK(T1 != T2); + CHECK(T1 >= T1); + CHECK(T2 >= T1); + CHECK(T1 <= T1); + CHECK(T1 <= T2); +} + +TEST_CASE("core.datetime.add") +{ + DateTime T1(2000, 12, 13); + DateTime T2(2000, 12, 14); + TimeSpan dT = T2 - T1; + TimeSpan dT1 = T1 - T1; + + CHECK(T1 + dT == T2); + CHECK(dT + T1 == T2); + CHECK(dT + T1 - T2 == dT1); +} + +TEST_SUITE_END(); + +TEST_SUITE_BEGIN("core.timespan"); + +TEST_CASE("core.timespan.compare") +{ + TimeSpan T1(1000); + TimeSpan T2(1001); + CHECK(T1 < T2); + CHECK(T2 > T1); + CHECK(T1 == T1); + CHECK(T1 != T2); + CHECK(T1 >= T1); + CHECK(T2 >= T1); + CHECK(T1 <= T1); + CHECK(T1 <= T2); +} + +TEST_SUITE_END(); + +////////////////////////////////////////////////////////////////////////// + #endif } // namespace zen diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp index 2362d8e78..c41bdac42 100644 --- a/src/zencore/compress.cpp +++ b/src/zencore/compress.cpp @@ -1268,7 +1268,7 @@ CompressedBuffer::FromCompressed(SharedBuffer&& InCompressedData, IoHash& OutRaw CompressedBuffer CompressedBuffer::FromCompressedNoValidate(IoBuffer&& InCompressedData) { - if (InCompressedData.GetSize() <= sizeof(detail::BufferHeader)) + if (InCompressedData.GetSize() < sizeof(detail::BufferHeader)) { return CompressedBuffer(); } @@ -1280,7 +1280,7 @@ CompressedBuffer::FromCompressedNoValidate(IoBuffer&& InCompressedData) CompressedBuffer CompressedBuffer::FromCompressedNoValidate(CompositeBuffer&& InCompressedData) { - if (InCompressedData.GetSize() <= sizeof(detail::BufferHeader)) + if (InCompressedData.GetSize() < sizeof(detail::BufferHeader)) { return CompressedBuffer(); } diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index a0ff3793e..36195f7c7 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -1287,12 +1287,12 @@ PathFromHandle(void* NativeHandle, std::error_code& Ec) { if (NativeHandle == nullptr) { - return std::filesystem::path(); + return "<error handle 'nullptr'>"; } #if ZEN_PLATFORM_WINDOWS if (NativeHandle == INVALID_HANDLE_VALUE) { - return std::filesystem::path(); + return "<error handle 'invalid handle'>"; } auto GetFinalPathNameByHandleWRetry = @@ -1329,7 +1329,7 @@ PathFromHandle(void* NativeHandle, std::error_code& Ec) if (Error != ERROR_SUCCESS) { Ec = MakeErrorCodeFromLastError(); - return std::filesystem::path(); + return fmt::format("<error handle '{}'>", Ec.message()); } if (RequiredLengthIncludingNul < PathDataSize) @@ -1346,7 +1346,7 @@ PathFromHandle(void* NativeHandle, std::error_code& Ec) if (Error != ERROR_SUCCESS) { Ec = MakeErrorCodeFromLastError(); - return std::filesystem::path(); + return fmt::format("<error handle '{}'>", Ec.message()); } ZEN_UNUSED(FinalLength); return FullPath; @@ -1360,7 +1360,7 @@ PathFromHandle(void* NativeHandle, std::error_code& Ec) if (BytesRead <= 0) { Ec = MakeErrorCodeFromLastError(); - return {}; + return fmt::format("<error handle '{}'>", Ec.message()); } Link[BytesRead] = '\0'; @@ -1371,7 +1371,7 @@ PathFromHandle(void* NativeHandle, std::error_code& Ec) if (fcntl(Fd, F_GETPATH, Path) < 0) { Ec = MakeErrorCodeFromLastError(); - return {}; + return fmt::format("<error handle '{}'>", Ec.message()); } return Path; diff --git a/src/zencore/include/zencore/compactbinary.h b/src/zencore/include/zencore/compactbinary.h index cb032e34a..675e2a8d4 100644 --- a/src/zencore/include/zencore/compactbinary.h +++ b/src/zencore/include/zencore/compactbinary.h @@ -26,12 +26,13 @@ namespace zen { -class CbObjectView; class CbArrayView; +class CbObjectView; +class CbValue; +class CompressedBuffer; class BinaryReader; class BinaryWriter; -class CompressedBuffer; -class CbValue; +class TimeSpan; class DateTime { @@ -58,7 +59,11 @@ public: void GetDate(int& Year, int& Month, int& Day) const; inline bool operator==(const DateTime& Rhs) const { return Ticks == Rhs.Ticks; } - inline auto operator<=>(const DateTime& Rhs) const { return Ticks - Rhs.Ticks; } + inline auto operator<=>(const DateTime& Rhs) const = default; + + friend inline TimeSpan operator-(const DateTime& Lhs, const DateTime& Rhs); + friend inline DateTime operator+(const DateTime& Lhs, const TimeSpan& Rhs); + friend inline DateTime operator+(const TimeSpan& Lhs, const DateTime& Rhs); std::string ToString(const char* Format) const; std::string ToIso8601() const; @@ -78,7 +83,7 @@ public: inline uint64_t GetTicks() const { return Ticks; } inline bool operator==(const TimeSpan& Rhs) const { return Ticks == Rhs.Ticks; } - inline auto operator<=>(const TimeSpan& Rhs) const { return Ticks - Rhs.Ticks; } + inline auto operator<=>(const TimeSpan& Rhs) const = default; /** * Time span related constants. @@ -136,12 +141,33 @@ public: ZENCORE_API std::string ToString(const char* Format) const; ZENCORE_API std::string ToString() const; + friend inline DateTime operator+(const DateTime& Lhs, const TimeSpan& Rhs); + friend inline DateTime operator+(const TimeSpan& Lhs, const DateTime& Rhs); + private: void Set(int Days, int Hours, int Minutes, int Seconds, int FractionNano); uint64_t Ticks; }; +inline TimeSpan +operator-(const DateTime& Lhs, const DateTime& Rhs) +{ + return TimeSpan(Lhs.Ticks - Rhs.Ticks); +} + +inline DateTime +operator+(const DateTime& Lhs, const TimeSpan& Rhs) +{ + return DateTime(Lhs.Ticks + Rhs.Ticks); +} + +inline DateTime +operator+(const TimeSpan& Lhs, const DateTime& Rhs) +{ + return DateTime(Lhs.Ticks + Rhs.Ticks); +} + ////////////////////////////////////////////////////////////////////////// /** diff --git a/src/zencore/include/zencore/compactbinarybuilder.h b/src/zencore/include/zencore/compactbinarybuilder.h index dcb767d96..9c81cf490 100644 --- a/src/zencore/include/zencore/compactbinarybuilder.h +++ b/src/zencore/include/zencore/compactbinarybuilder.h @@ -10,7 +10,6 @@ #include <zencore/enumflags.h> #include <zencore/iobuffer.h> #include <zencore/iohash.h> -#include <zencore/sha1.h> #include <atomic> #include <memory> diff --git a/src/zencore/include/zencore/compactbinaryvalidation.h b/src/zencore/include/zencore/compactbinaryvalidation.h index b23c6d51d..ddecc8a38 100644 --- a/src/zencore/include/zencore/compactbinaryvalidation.h +++ b/src/zencore/include/zencore/compactbinaryvalidation.h @@ -9,7 +9,6 @@ #include <zencore/enumflags.h> #include <zencore/iobuffer.h> #include <zencore/iohash.h> -#include <zencore/sha1.h> #include <gsl/gsl-lite.hpp> diff --git a/src/zencore/include/zencore/iobuffer.h b/src/zencore/include/zencore/iobuffer.h index d891ed55b..b9e503354 100644 --- a/src/zencore/include/zencore/iobuffer.h +++ b/src/zencore/include/zencore/iobuffer.h @@ -337,11 +337,20 @@ public: BorrowedFile }; - inline IoBuffer() = default; - inline IoBuffer(IoBuffer&& Rhs) noexcept = default; - inline IoBuffer(const IoBuffer& Rhs) = default; + inline IoBuffer() = default; + inline IoBuffer(IoBuffer&& Rhs) noexcept + { + m_Core.Swap(Rhs.m_Core); + Rhs.m_Core = NullBufferCore; + } + inline IoBuffer(const IoBuffer& Rhs) = default; inline IoBuffer& operator=(const IoBuffer& Rhs) = default; - inline IoBuffer& operator=(IoBuffer&& Rhs) noexcept = default; + inline IoBuffer& operator =(IoBuffer&& Rhs) noexcept + { + m_Core.Swap(Rhs.m_Core); + Rhs.m_Core = NullBufferCore; + return *this; + } /** Create an uninitialized buffer of the given size */ diff --git a/src/zencore/include/zencore/string.h b/src/zencore/include/zencore/string.h index 3aec1647d..b0232d883 100644 --- a/src/zencore/include/zencore/string.h +++ b/src/zencore/include/zencore/string.h @@ -638,7 +638,12 @@ ToHexNumber(UnsignedIntegral auto Value, char* OutString) bool ParseHexNumber(const std::string_view HexString, UnsignedIntegral auto& OutValue) { - return ParseHexNumber(HexString.data(), sizeof(OutValue) * 2, (uint8_t*)&OutValue); + size_t ExpectedCharacterCount = sizeof(OutValue) * 2; + if (HexString.size() != ExpectedCharacterCount) + { + return false; + } + return ParseHexNumber(HexString.data(), ExpectedCharacterCount, (uint8_t*)&OutValue); } ////////////////////////////////////////////////////////////////////////// diff --git a/src/zencore/include/zencore/trace.h b/src/zencore/include/zencore/trace.h index 2d4c1e610..89e4b76bf 100644 --- a/src/zencore/include/zencore/trace.h +++ b/src/zencore/include/zencore/trace.h @@ -35,6 +35,7 @@ bool TraceStop(); #else #define ZEN_TRACE_CPU(x) +#define ZEN_TRACE_CPU_FLUSH(x) #endif // ZEN_WITH_TRACE diff --git a/src/zencore/iobuffer.cpp b/src/zencore/iobuffer.cpp index 912f9ce4e..80d0f4ee4 100644 --- a/src/zencore/iobuffer.cpp +++ b/src/zencore/iobuffer.cpp @@ -209,6 +209,8 @@ IoBufferExtendedCore::~IoBufferExtendedCore() uint64_t MapSize = ~uint64_t(uintptr_t(m_MmapHandle)); munmap(m_MappedPointer, MapSize); #endif + + m_DataPtr = nullptr; // prevent any buffer deallocation attempts } const uint32_t LocalFlags = m_Flags.load(std::memory_order_relaxed); @@ -244,8 +246,6 @@ IoBufferExtendedCore::~IoBufferExtendedCore() ZEN_WARN("Error reported on file handle close, reason '{}'", GetLastErrorAsString()); } } - - m_DataPtr = nullptr; } static constexpr size_t MappingLockCount = 128; diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp index 1755b9fe9..4bcc5c885 100644 --- a/src/zencore/jobqueue.cpp +++ b/src/zencore/jobqueue.cpp @@ -422,8 +422,10 @@ TEST_CASE("JobQueue") { JobsLatch.AddCount(1); Pool.ScheduleWork([&Queue, &JobsLatch, I]() { - auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); - auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&](JobContext& Context) { + auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); + JobsLatch.AddCount(1); + auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&JobsLatch, I](JobContext& Context) { + auto $ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); }); if (Context.IsCancelled()) { return; @@ -523,7 +525,6 @@ TEST_CASE("JobQueue") } JobsLatch.Wait(); } - #endif } // namespace zen diff --git a/src/zencore/memory.cpp b/src/zencore/memory.cpp index 546296b10..808c9fcb6 100644 --- a/src/zencore/memory.cpp +++ b/src/zencore/memory.cpp @@ -7,13 +7,12 @@ #include <zencore/testing.h> #include <zencore/zencore.h> -#if ZEN_PLATFORM_WINDOWS -# include <malloc.h> +#include <cstdlib> + +#if ZEN_USE_MIMALLOC ZEN_THIRD_PARTY_INCLUDES_START # include <mimalloc.h> ZEN_THIRD_PARTY_INCLUDES_END -#else -# include <cstdlib> #endif namespace zen { @@ -23,16 +22,15 @@ namespace zen { static void* AlignedAllocImpl(size_t Size, size_t Alignment) { -#if ZEN_PLATFORM_WINDOWS -# if ZEN_USE_MIMALLOC && 0 /* this path is not functional */ - return mi_aligned_alloc(Alignment, Size); -# else - return _aligned_malloc(Size, Alignment); -# endif -#else // aligned_alloc() states that size must be a multiple of alignment. Some // platforms return null if this requirement isn't met. Size = (Size + Alignment - 1) & ~(Alignment - 1); + +#if ZEN_USE_MIMALLOC + return mi_aligned_alloc(Alignment, Size); +#elif ZEN_PLATFORM_WINDOWS + return _aligned_malloc(Size, Alignment); +#else return std::aligned_alloc(Alignment, Size); #endif } @@ -43,12 +41,10 @@ AlignedFreeImpl(void* ptr) if (ptr == nullptr) return; -#if ZEN_PLATFORM_WINDOWS -# if ZEN_USE_MIMALLOC && 0 /* this path is not functional */ +#if ZEN_USE_MIMALLOC return mi_free(ptr); -# else +#elif ZEN_PLATFORM_WINDOWS _aligned_free(ptr); -# endif #else std::free(ptr); #endif diff --git a/src/zencore/thread.cpp b/src/zencore/thread.cpp index 149a0d781..cb3aced33 100644 --- a/src/zencore/thread.cpp +++ b/src/zencore/thread.cpp @@ -156,6 +156,7 @@ Event::Event() auto* Inner = new EventInner(); Inner->bSet = bInitialState; m_EventHandle = Inner; + std::atomic_thread_fence(std::memory_order_release); #endif } @@ -170,12 +171,13 @@ Event::Set() #if ZEN_USE_WINDOWS_EVENTS SetEvent(m_EventHandle); #else - auto* Inner = (EventInner*)m_EventHandle; + std::atomic_thread_fence(std::memory_order_acquire); + auto* Inner = (EventInner*)m_EventHandle; { std::unique_lock Lock(Inner->Mutex); Inner->bSet.store(true); + Inner->CondVar.notify_all(); } - Inner->CondVar.notify_all(); #endif } @@ -185,6 +187,7 @@ Event::Reset() #if ZEN_USE_WINDOWS_EVENTS ResetEvent(m_EventHandle); #else + std::atomic_thread_fence(std::memory_order_acquire); auto* Inner = (EventInner*)m_EventHandle; { std::unique_lock Lock(Inner->Mutex); @@ -198,15 +201,18 @@ Event::Close() { #if ZEN_USE_WINDOWS_EVENTS CloseHandle(m_EventHandle); + m_EventHandle = nullptr; #else + std::atomic_thread_fence(std::memory_order_acquire); auto* Inner = (EventInner*)m_EventHandle; { std::unique_lock Lock(Inner->Mutex); Inner->bSet.store(true); } + m_EventHandle = nullptr; + std::atomic_thread_fence(std::memory_order_release); delete Inner; #endif - m_EventHandle = nullptr; } bool @@ -226,6 +232,7 @@ Event::Wait(int TimeoutMs) return (Result == WAIT_OBJECT_0); #else + std::atomic_thread_fence(std::memory_order_acquire); auto* Inner = reinterpret_cast<EventInner*>(m_EventHandle); if (Inner->bSet.load()) diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index 6ff6463dd..16b2310ff 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -132,7 +132,9 @@ struct WorkerThreadPool::Impl Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName) { +# if ZEN_WITH_TRACE trace::ThreadGroupBegin(m_WorkerThreadBaseName.c_str()); +# endif zen::Latch WorkerLatch{InThreadCount}; @@ -143,7 +145,9 @@ struct WorkerThreadPool::Impl WorkerLatch.Wait(); +# if ZEN_WITH_TRACE trace::ThreadGroupEnd(); +# endif } ~Impl() diff --git a/src/zencore/zencore.cpp b/src/zencore/zencore.cpp index 3b938a6ef..8dd687fbd 100644 --- a/src/zencore/zencore.cpp +++ b/src/zencore/zencore.cpp @@ -36,6 +36,8 @@ #include <fmt/format.h> +#include <atomic> + namespace zen::assert { void @@ -103,8 +105,8 @@ IsInteractiveSession() ////////////////////////////////////////////////////////////////////////// -static int s_ApplicationExitCode = 0; -static bool s_ApplicationExitRequested; +static std::atomic_int s_ApplicationExitCode{0}; +static std::atomic_bool s_ApplicationExitRequested{false}; bool IsApplicationExitRequested() diff --git a/src/zenhttp/httpshared.cpp b/src/zenhttp/httpshared.cpp index 5421fcba5..ca014bf1c 100644 --- a/src/zenhttp/httpshared.cpp +++ b/src/zenhttp/httpshared.cpp @@ -19,6 +19,10 @@ #include <span> #include <vector> +#if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> +#endif + ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> ZEN_THIRD_PARTY_INCLUDES_END diff --git a/src/zenhttp/servers/httpparser.h b/src/zenhttp/servers/httpparser.h index 219ac351d..bdbcab4d9 100644 --- a/src/zenhttp/servers/httpparser.h +++ b/src/zenhttp/servers/httpparser.h @@ -9,6 +9,8 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <http_parser.h> ZEN_THIRD_PARTY_INCLUDES_END +#include <atomic> + namespace zen { class HttpRequestParserCallbacks @@ -85,7 +87,7 @@ private: int8_t m_ContentTypeHeaderIndex; int8_t m_RangeHeaderIndex; HttpVerb m_RequestVerb; - bool m_KeepAlive = false; + std::atomic_bool m_KeepAlive{false}; bool m_Expect100Continue = false; int m_RequestId = -1; Oid m_SessionId{}; diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp index c2df847ad..cc1ffdcdc 100644 --- a/src/zenserver/admin/admin.cpp +++ b/src/zenserver/admin/admin.cpp @@ -3,6 +3,7 @@ #include "admin.h" #include <zencore/compactbinarybuilder.h> +#include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/jobqueue.h> #include <zencore/logging.h> @@ -20,24 +21,86 @@ #include <zenstore/gc.h> #include "cache/structuredcachestore.h" +#include "config.h" #include "projectstore/projectstore.h" #include <chrono> namespace zen { -HttpAdminService::HttpAdminService(GcScheduler& Scheduler, - JobQueue& BackgroundJobQueue, - ZenCacheStore* CacheStore, - CidStore* CidStore, - ProjectStore* ProjectStore, - const LogPaths& LogPaths) +struct DirStats +{ + uint64_t FileCount = 0; + uint64_t DirCount = 0; + uint64_t ByteCount = 0; +}; + +DirStats +GetStatsForDirectory(std::filesystem::path Dir) +{ + if (!std::filesystem::exists(Dir)) + return {}; + + FileSystemTraversal Traversal; + + struct StatsTraversal : public FileSystemTraversal::TreeVisitor + { + virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override + { + ZEN_UNUSED(Parent, File); + ++TotalFileCount; + TotalBytes += FileSize; + } + virtual bool VisitDirectory(const std::filesystem::path&, const path_view&) override + { + ++TotalDirCount; + return true; + } + + uint64_t TotalBytes = 0; + uint64_t TotalFileCount = 0; + uint64_t TotalDirCount = 0; + + DirStats GetStats() { return {.FileCount = TotalFileCount, .DirCount = TotalDirCount, .ByteCount = TotalBytes}; } + }; + + StatsTraversal DirTraverser; + Traversal.TraverseFileSystem(Dir, DirTraverser); + + return DirTraverser.GetStats(); +} + +struct StateDiskStats +{ + DirStats CacheStats; + DirStats CasStats; + DirStats ProjectStats; +}; + +StateDiskStats +GetStatsForStateDirectory(std::filesystem::path StateDir) +{ + StateDiskStats Stats; + Stats.CacheStats = GetStatsForDirectory(StateDir / "cache"); + Stats.CasStats = GetStatsForDirectory(StateDir / "cas"); + Stats.ProjectStats = GetStatsForDirectory(StateDir / "projects"); + return Stats; +} + +HttpAdminService::HttpAdminService(GcScheduler& Scheduler, + JobQueue& BackgroundJobQueue, + ZenCacheStore* CacheStore, + CidStore* CidStore, + ProjectStore* ProjectStore, + const LogPaths& LogPaths, + const ZenServerOptions& ServerOptions) : m_GcScheduler(Scheduler) , m_BackgroundJobQueue(BackgroundJobQueue) , m_CacheStore(CacheStore) , m_CidStore(CidStore) , m_ProjectStore(ProjectStore) , m_LogPaths(LogPaths) +, m_ServerOptions(ServerOptions) { using namespace std::literals; @@ -509,6 +572,60 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, #endif // ZEN_WITH_TRACE m_Router.RegisterRoute( + "info", + [this](HttpRouterRequest& Req) { + CbObjectWriter Obj; + + Obj << "root" << m_ServerOptions.SystemRootDir.generic_wstring(); + Obj << "install" << (m_ServerOptions.SystemRootDir / "Install").generic_wstring(); + + Obj.BeginObject("primary"); + Obj << "data" << m_ServerOptions.DataDir.generic_wstring(); + + try + { + auto Stats = GetStatsForStateDirectory(m_ServerOptions.DataDir); + + auto EmitStats = [&](std::string_view Tag, const DirStats& Stats) { + Obj.BeginObject(Tag); + Obj << "bytes" << Stats.ByteCount; + Obj << "files" << Stats.FileCount; + Obj << "dirs" << Stats.DirCount; + Obj.EndObject(); + }; + + EmitStats("cache", Stats.CacheStats); + EmitStats("cas", Stats.CasStats); + EmitStats("project", Stats.ProjectStats); + } + catch (std::exception& Ex) + { + ZEN_WARN("exception in disk stats gathering for '{}': {}", m_ServerOptions.DataDir, Ex.what()); + } + Obj.EndObject(); + + try + { + std::vector<CbObject> Manifests = ReadAllCentralManifests(m_ServerOptions.SystemRootDir); + + Obj.BeginArray("known"); + + for (const auto& Manifest : Manifests) + { + Obj.AddObject(Manifest); + } + + Obj.EndArray(); + } + catch (std::exception& Ex) + { + ZEN_WARN("exception in state gathering for '{}': {}", m_ServerOptions.SystemRootDir, Ex.what()); + } + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( "logs", [this](HttpRouterRequest& Req) { CbObjectWriter Obj; diff --git a/src/zenserver/admin/admin.h b/src/zenserver/admin/admin.h index 9d8bdfe50..563c4f536 100644 --- a/src/zenserver/admin/admin.h +++ b/src/zenserver/admin/admin.h @@ -12,6 +12,7 @@ class JobQueue; class ZenCacheStore; class CidStore; class ProjectStore; +struct ZenServerOptions; class HttpAdminService : public zen::HttpService { @@ -22,25 +23,27 @@ public: std::filesystem::path HttpLogPath; std::filesystem::path CacheLogPath; }; - HttpAdminService(GcScheduler& Scheduler, - JobQueue& BackgroundJobQueue, - ZenCacheStore* CacheStore, - CidStore* CidStore, - ProjectStore* ProjectStore, - const LogPaths& LogPaths); + HttpAdminService(GcScheduler& Scheduler, + JobQueue& BackgroundJobQueue, + ZenCacheStore* CacheStore, + CidStore* CidStore, + ProjectStore* ProjectStore, + const LogPaths& LogPaths, + const ZenServerOptions& ServerOptions); ~HttpAdminService(); virtual const char* BaseUri() const override; virtual void HandleRequest(zen::HttpServerRequest& Request) override; private: - HttpRequestRouter m_Router; - GcScheduler& m_GcScheduler; - JobQueue& m_BackgroundJobQueue; - ZenCacheStore* m_CacheStore; - CidStore* m_CidStore; - ProjectStore* m_ProjectStore; - LogPaths m_LogPaths; + HttpRequestRouter m_Router; + GcScheduler& m_GcScheduler; + JobQueue& m_BackgroundJobQueue; + ZenCacheStore* m_CacheStore; + CidStore* m_CidStore; + ProjectStore* m_ProjectStore; + LogPaths m_LogPaths; + const ZenServerOptions& m_ServerOptions; }; } // namespace zen diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 13f3c9e58..8d046105d 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -209,9 +209,6 @@ namespace { zen::Sleep(100); } while (true); } - - uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize) { return 8u + 32u + RoundUp(PayloadSize, 8u); } - } // namespace namespace fs = std::filesystem; @@ -507,6 +504,8 @@ BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& B std::vector<AccessTime>& AccessTimes, std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads) { + ZEN_TRACE_CPU("Z$::ReadSidecarFile"); + ZEN_ASSERT(AccessTimes.size() == Payloads.size()); std::error_code Ec; @@ -593,6 +592,8 @@ BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&, const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads, const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas) { + ZEN_TRACE_CPU("Z$::WriteSidecarFile"); + BucketMetaHeader Header; Header.EntryCount = m_ManifestEntryCount; Header.LogPosition = SnapshotLogPosition; @@ -701,7 +702,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo { using namespace std::literals; - ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate"); + ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate"); ZEN_ASSERT(m_IsFlushing.load()); // We want to take the lock here since we register as a GC referencer a construction @@ -768,7 +769,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo void ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uint64_t()>& ClaimDiskReserveFunc) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::WriteIndexSnapshot"); + ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot"); const uint64_t LogCount = m_SlogFile.GetLogCount(); if (m_LogFlushPosition == LogCount) @@ -878,7 +879,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uin uint64_t ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadIndexFile"); + ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile"); if (!std::filesystem::is_regular_file(IndexPath)) { @@ -967,7 +968,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const uint64_t ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadLog"); + ZEN_TRACE_CPU("Z$::Bucket::ReadLog"); if (!std::filesystem::is_regular_file(LogPath)) { @@ -1037,7 +1038,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std:: void ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockScope& IndexLock, const bool IsNew) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog"); + ZEN_TRACE_CPU("Z$::Bucket::Initialize"); m_StandaloneSize = 0; @@ -1139,7 +1140,7 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& H IoBuffer ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const { - ZEN_TRACE_CPU("Z$::Disk::Bucket::GetInlineCacheValue"); + ZEN_TRACE_CPU("Z$::Bucket::GetInlineCacheValue"); BlockStoreLocation Location = Loc.GetBlockLocation(m_Configuration.PayloadAlignment); @@ -1155,7 +1156,7 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) con IoBuffer ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const { - ZEN_TRACE_CPU("Z$::Disk::Bucket::GetStandaloneCacheValue"); + ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue"); ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); @@ -1175,6 +1176,8 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentTy bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { + ZEN_TRACE_CPU("Z$::Bucket::Get"); + metrics::RequestStats::Scope StatsScope(m_GetOps, 0); RwLock::SharedLockScope IndexLock(m_IndexLock); @@ -1189,7 +1192,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal return false; } - size_t EntryIndex = It.value(); + PayloadIndex EntryIndex = It.value(); m_AccessTimes[EntryIndex] = GcClock::TickCount(); DiskLocation Location = m_Payloads[EntryIndex].Location; @@ -1206,7 +1209,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal if (Payload->MemCached) { - OutValue.Value = m_MemCachedPayloads[Payload->MemCached]; + OutValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload; Payload = nullptr; IndexLock.ReleaseNow(); m_MemoryHitCount++; @@ -1231,7 +1234,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal size_t ValueSize = OutValue.Value.GetSize(); if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MemCache"); + ZEN_TRACE_CPU("Z$::Bucket::Get::MemCache"); OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end()) @@ -1240,7 +1243,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal // Only update if it has not already been updated by other thread if (!WritePayload.MemCached) { - SetMemCachedData(UpdateIndexLock, WritePayload, OutValue.Value); + SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value); } } } @@ -1250,7 +1253,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal if (FillRawHashAndRawSize) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MetaData"); + ZEN_TRACE_CPU("Z$::Bucket::Get::MetaData"); if (Location.IsFlagSet(DiskLocation::kCompressed)) { if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) @@ -1293,6 +1296,8 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal void ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) { + ZEN_TRACE_CPU("Z$::Bucket::Put"); + metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) @@ -1307,71 +1312,91 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& m_DiskWriteCount++; } -void +uint64_t ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime) { + ZEN_TRACE_CPU("Z$::Bucket::MemCacheTrim"); + + uint64_t Trimmed = 0; GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - if (m_MemCachedPayloads.empty()) + uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size()); + if (MemCachedCount == 0) { - return; + return 0; } - for (const auto& Kv : m_Index) + + uint32_t WriteIndex = 0; + for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) { - size_t Index = Kv.second; - BucketPayload& Payload = m_Payloads[Index]; - if (!Payload.MemCached) + MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; + if (!Data.Payload) + { + continue; + } + PayloadIndex Index = Data.OwnerIndex; + ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); + GcClock::Tick AccessTime = m_AccessTimes[Index]; + if (AccessTime < ExpireTicks) { + size_t PayloadSize = Data.Payload.GetSize(); + RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); + Data = {}; + m_Payloads[Index].MemCached = {}; + Trimmed += PayloadSize; continue; } - if (m_AccessTimes[Index] < ExpireTicks) + if (ReadIndex > WriteIndex) { - RemoveMemCachedData(IndexLock, Payload); + m_MemCachedPayloads[WriteIndex] = MemCacheData{.Payload = std::move(Data.Payload), .OwnerIndex = Index}; + m_Payloads[Index].MemCached = MemCachedIndex(WriteIndex); } + WriteIndex++; } + m_MemCachedPayloads.resize(WriteIndex); m_MemCachedPayloads.shrink_to_fit(); - m_FreeMemCachedPayloads.shrink_to_fit(); - m_FreeMetaDatas.shrink_to_fit(); + zen::Reset(m_FreeMemCachedPayloads); + return Trimmed; } void -ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart, - GcClock::Duration SectionLength, - std::vector<uint64_t>& InOutUsageSlots) +ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots) { + ZEN_TRACE_CPU("Z$::Bucket::GetUsageByAccess"); + + size_t SlotCount = InOutUsageSlots.capacity(); RwLock::SharedLockScope _(m_IndexLock); - if (m_MemCachedPayloads.empty()) + uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size()); + if (MemCachedCount == 0) { return; } - for (const auto& It : m_Index) + for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) { - size_t Index = It.second; - BucketPayload& Payload = m_Payloads[Index]; - if (!Payload.MemCached) + MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; + if (!Data.Payload) { continue; } + PayloadIndex Index = Data.OwnerIndex; + ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index])); - GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch(); - uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0); - if (Slot >= InOutUsageSlots.capacity()) - { - Slot = InOutUsageSlots.capacity() - 1; - } - if (Slot > InOutUsageSlots.size()) + GcClock::Duration Age = Now > ItemAccessTime ? Now - ItemAccessTime : GcClock::Duration(0); + size_t Slot = Age < MaxAge ? gsl::narrow<size_t>((Age.count() * SlotCount) / MaxAge.count()) : (SlotCount - 1); + ZEN_ASSERT_SLOW(Slot < SlotCount); + if (Slot >= InOutUsageSlots.size()) { - InOutUsageSlots.resize(uint64_t(Slot + 1), 0); + InOutUsageSlots.resize(Slot + 1, 0); } - InOutUsageSlots[Slot] += m_MemCachedPayloads[Payload.MemCached].GetSize(); + InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(Data.Payload.GetSize()); } } bool ZenCacheDiskLayer::CacheBucket::Drop() { - ZEN_TRACE_CPU("Z$::Disk::Bucket::Drop"); + ZEN_TRACE_CPU("Z$::Bucket::Drop"); RwLock::ExclusiveLockScope _(m_IndexLock); @@ -1407,7 +1432,7 @@ ZenCacheDiskLayer::CacheBucket::Drop() void ZenCacheDiskLayer::CacheBucket::Flush() { - ZEN_TRACE_CPU("Z$::Disk::Bucket::Flush"); + ZEN_TRACE_CPU("Z$::Bucket::Flush"); bool Expected = false; if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) { @@ -1433,6 +1458,7 @@ ZenCacheDiskLayer::CacheBucket::Flush() void ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc) { + ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot"); try { bool UseLegacyScheme = false; @@ -1607,7 +1633,7 @@ ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer) void ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::Scrub"); + ZEN_TRACE_CPU("Z$::Bucket::Scrub"); ZEN_INFO("scrubbing '{}'", m_BucketDir); @@ -1823,7 +1849,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) std::vector<BucketPayload> Payloads; std::vector<AccessTime> AccessTimes; std::vector<BucketMetaData> MetaDatas; - std::vector<IoBuffer> MemCachedPayloads; + std::vector<MemCacheData> MemCachedPayloads; std::vector<ReferenceIndex> FirstReferenceIndex; IndexMap Index; @@ -1847,7 +1873,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) void ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::GatherReferences"); + ZEN_TRACE_CPU("Z$::Bucket::GatherReferences"); #define CALCULATE_BLOCKING_TIME 0 @@ -1999,10 +2025,10 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) #endif // CALCULATE_BLOCKING_TIME if (auto It = m_Index.find(Key); It != m_Index.end()) { - const BucketPayload& CachedPayload = Payloads[It->second]; + const BucketPayload& CachedPayload = m_Payloads[It->second]; if (CachedPayload.MemCached) { - Buffer = m_MemCachedPayloads[CachedPayload.MemCached]; + Buffer = m_MemCachedPayloads[CachedPayload.MemCached].Payload; ZEN_ASSERT_SLOW(Buffer); } else @@ -2065,7 +2091,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) void ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage"); + ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage"); ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir); @@ -2124,7 +2150,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) std::vector<BucketPayload> Payloads; std::vector<AccessTime> AccessTimes; std::vector<BucketMetaData> MetaDatas; - std::vector<IoBuffer> MemCachedPayloads; + std::vector<MemCacheData> MemCachedPayloads; std::vector<ReferenceIndex> FirstReferenceIndex; IndexMap Index; { @@ -2165,7 +2191,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State"); + ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::State"); RwLock::SharedLockScope IndexLock(m_IndexLock); Stopwatch Timer; @@ -2213,7 +2239,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) if (GcCtx.IsDeletionMode()) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::Delete"); + ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::Delete"); ExtendablePathBuilder<256> Path; @@ -2281,7 +2307,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment); size_t ChunkIndex = ChunkLocations.size(); ChunkLocations.push_back(Location); - ChunkIndexToChunkHash[ChunkIndex] = Key; + ChunkIndexToChunkHash.push_back(Key); if (ExpiredCacheKeys.contains(Key)) { continue; @@ -2453,7 +2479,7 @@ ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents( void ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) { - ZEN_TRACE_CPU("Z$::Disk::CollectGarbage"); + ZEN_TRACE_CPU("Z$::CollectGarbage"); std::vector<CacheBucket*> Buckets; { @@ -2468,13 +2494,16 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) { Bucket->CollectGarbage(GcCtx); } - MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); + if (!m_IsMemCacheTrimming) + { + MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); + } } void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::PutStandaloneCacheValue"); + ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue"); uint64_t NewFileSize = Value.Value.Size(); @@ -2671,16 +2700,17 @@ ZenCacheDiskLayer::CacheBucket::RemoveMetaData(RwLock::ExclusiveLockScope&, Buck } void -ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload, IoBuffer& MemCachedData) +ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData) { - uint64_t PayloadSize = MemCachedData.GetSize(); + BucketPayload& Payload = m_Payloads[PayloadIndex]; + uint64_t PayloadSize = MemCachedData.GetSize(); ZEN_ASSERT(PayloadSize != 0); if (m_FreeMemCachedPayloads.empty()) { if (m_MemCachedPayloads.size() != std::numeric_limits<uint32_t>::max()) { Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(m_MemCachedPayloads.size())); - m_MemCachedPayloads.push_back(MemCachedData); + m_MemCachedPayloads.emplace_back(MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex}); AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemoryWriteCount++; } @@ -2689,7 +2719,7 @@ ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, Bu { Payload.MemCached = m_FreeMemCachedPayloads.back(); m_FreeMemCachedPayloads.pop_back(); - m_MemCachedPayloads[Payload.MemCached] = MemCachedData; + m_MemCachedPayloads[Payload.MemCached] = MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex}; AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemoryWriteCount++; } @@ -2700,9 +2730,9 @@ ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(RwLock::ExclusiveLockScope&, { if (Payload.MemCached) { - size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].GetSize(); + size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].Payload.GetSize(); RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); - m_MemCachedPayloads[Payload.MemCached] = IoBuffer{}; + m_MemCachedPayloads[Payload.MemCached] = {}; m_FreeMemCachedPayloads.push_back(Payload.MemCached); Payload.MemCached = {}; return PayloadSize; @@ -2723,7 +2753,7 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck void ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::PutInlineCacheValue"); + ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); uint8_t EntryFlags = 0; @@ -2800,7 +2830,7 @@ public: virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactStore"); + ZEN_TRACE_CPU("Z$::Bucket::CompactStore"); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3023,7 +3053,7 @@ private: GcStoreCompactor* ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveExpiredData"); + ZEN_TRACE_CPU("Z$::Bucket::RemoveExpiredData"); size_t TotalEntries = 0; @@ -3117,7 +3147,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) std::vector<BucketPayload> Payloads; std::vector<AccessTime> AccessTimes; std::vector<BucketMetaData> MetaDatas; - std::vector<IoBuffer> MemCachedPayloads; + std::vector<MemCacheData> MemCachedPayloads; std::vector<ReferenceIndex> FirstReferenceIndex; IndexMap Index; { @@ -3164,7 +3194,7 @@ public: virtual void PreCache(GcCtx& Ctx) override { - ZEN_TRACE_CPU("Z$::Disk::Bucket::PreCache"); + ZEN_TRACE_CPU("Z$::Bucket::PreCache"); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3385,7 +3415,7 @@ public: virtual void LockState(GcCtx& Ctx) override { - ZEN_TRACE_CPU("Z$::Disk::Bucket::LockState"); + ZEN_TRACE_CPU("Z$::Bucket::LockState"); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3458,7 +3488,7 @@ public: virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override { - ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveUsedReferencesFromSet"); + ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet"); ZEN_ASSERT(m_IndexLock); size_t InitialCount = IoCids.size(); @@ -3505,7 +3535,7 @@ public: std::vector<GcReferenceChecker*> ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CreateReferenceCheckers"); + ZEN_TRACE_CPU("Z$::Bucket::CreateReferenceCheckers"); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3530,7 +3560,7 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) void ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactReferences"); + ZEN_TRACE_CPU("Z$::Bucket::CompactReferences"); std::vector<ReferenceIndex> FirstReferenceIndex; std::vector<IoHash> NewReferenceHashes; @@ -3708,12 +3738,12 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, std::vector<BucketPayload>& Payloads, std::vector<AccessTime>& AccessTimes, std::vector<BucketMetaData>& MetaDatas, - std::vector<IoBuffer>& MemCachedPayloads, + std::vector<MemCacheData>& MemCachedPayloads, std::vector<ReferenceIndex>& FirstReferenceIndex, IndexMap& Index, RwLock::ExclusiveLockScope& IndexLock) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactState"); + ZEN_TRACE_CPU("Z$::Bucket::CompactState"); size_t EntryCount = m_Index.size(); Payloads.reserve(EntryCount); @@ -3738,7 +3768,8 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, } if (Payload.MemCached) { - MemCachedPayloads.push_back(std::move(m_MemCachedPayloads[Payload.MemCached])); + MemCachedPayloads.emplace_back( + MemCacheData{.Payload = std::move(m_MemCachedPayloads[Payload.MemCached].Payload), .OwnerIndex = EntryIndex}); Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(MemCachedPayloads.size() - 1)); } if (m_Configuration.EnableReferenceCaching) @@ -3811,7 +3842,7 @@ ZenCacheDiskLayer::~ZenCacheDiskLayer() ZenCacheDiskLayer::CacheBucket* ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) { - ZEN_TRACE_CPU("Z$::Disk::GetOrCreateBucket"); + ZEN_TRACE_CPU("Z$::GetOrCreateBucket"); const auto BucketName = std::string(InBucket); { @@ -3858,7 +3889,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) bool ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { - ZEN_TRACE_CPU("Z$::Disk::Get"); + ZEN_TRACE_CPU("Z$::Get"); if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { @@ -3874,7 +3905,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach void ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) { - ZEN_TRACE_CPU("Z$::Disk::Put"); + ZEN_TRACE_CPU("Z$::Put"); if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { @@ -3886,6 +3917,8 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z void ZenCacheDiskLayer::DiscoverBuckets() { + ZEN_TRACE_CPU("Z$::DiscoverBuckets"); + DirectoryContent DirContent; GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); @@ -3986,6 +4019,8 @@ ZenCacheDiskLayer::DiscoverBuckets() bool ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { + ZEN_TRACE_CPU("Z$::DropBucket"); + RwLock::ExclusiveLockScope _(m_Lock); auto It = m_Buckets.find(std::string(InBucket)); @@ -4008,6 +4043,8 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket) bool ZenCacheDiskLayer::Drop() { + ZEN_TRACE_CPU("Z$::Drop"); + RwLock::ExclusiveLockScope _(m_Lock); std::vector<std::unique_ptr<CacheBucket>> Buckets; @@ -4029,6 +4066,8 @@ ZenCacheDiskLayer::Drop() void ZenCacheDiskLayer::Flush() { + ZEN_TRACE_CPU("Z$::Flush"); + std::vector<CacheBucket*> Buckets; Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -4070,6 +4109,8 @@ ZenCacheDiskLayer::Flush() void ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) { + ZEN_TRACE_CPU("Z$::ScrubStorage"); + RwLock::SharedLockScope _(m_Lock); { std::vector<std::future<void>> Results; @@ -4096,7 +4137,7 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) void ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) { - ZEN_TRACE_CPU("Z$::Disk::GatherReferences"); + ZEN_TRACE_CPU("Z$::GatherReferences"); std::vector<CacheBucket*> Buckets; { @@ -4213,20 +4254,11 @@ ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const st void ZenCacheDiskLayer::MemCacheTrim() { - ZEN_TRACE_CPU("Z$::Disk::MemCacheTrim"); + ZEN_TRACE_CPU("Z$::MemCacheTrim"); ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0); - - const GcClock::TimePoint Now = GcClock::Now(); - - const GcClock::Tick NowTick = Now.time_since_epoch().count(); - const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); - GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim; - const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count(); - if (NowTick < NextAllowedTrimTick) - { - return; - } + ZEN_ASSERT(m_Configuration.MemCacheMaxAgeSeconds != 0); + ZEN_ASSERT(m_Configuration.MemCacheTrimIntervalSeconds != 0); bool Expected = false; if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true)) @@ -4234,75 +4266,90 @@ ZenCacheDiskLayer::MemCacheTrim() return; } - // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong - const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); - m_LastTickMemCacheTrim.store(NextTrimTick); + try + { + m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) { + ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]"); + + const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); + uint64_t TrimmedSize = 0; + Stopwatch Timer; + const auto Guard = MakeGuard([&] { + ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", + NiceBytes(TrimmedSize), + NiceBytes(m_TotalMemCachedSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + + const GcClock::Tick NowTick = GcClock::TickCount(); + const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); + m_NextAllowedTrimTick.store(NextTrimTick); + m_IsMemCacheTrimming.store(false); + }); - m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this, Now, TrimInterval](JobContext&) { - ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]"); + const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); - uint64_t StartSize = m_TotalMemCachedSize.load(); - Stopwatch Timer; - const auto Guard = MakeGuard([&] { - uint64_t EndSize = m_TotalMemCachedSize.load(); - ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", - NiceBytes(StartSize > EndSize ? StartSize - EndSize : 0), - NiceBytes(m_TotalMemCachedSize), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - m_IsMemCacheTrimming.store(false); - }); + static const size_t UsageSlotCount = 2048; + std::vector<uint64_t> UsageSlots; + UsageSlots.reserve(UsageSlotCount); - const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); - - std::vector<uint64_t> UsageSlots; - UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count()); + std::vector<CacheBucket*> Buckets; + { + RwLock::SharedLockScope __(m_Lock); + Buckets.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + Buckets.push_back(Kv.second.get()); + } + } - std::vector<CacheBucket*> Buckets; - { - RwLock::SharedLockScope __(m_Lock); - Buckets.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) + const GcClock::TimePoint Now = GcClock::Now(); { - Buckets.push_back(Kv.second.get()); + ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess"); + for (CacheBucket* Bucket : Buckets) + { + Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots); + } } - } - for (CacheBucket* Bucket : Buckets) - { - Bucket->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots); - } - uint64_t TotalSize = 0; - for (size_t Index = 0; Index < UsageSlots.size(); ++Index) - { - TotalSize += UsageSlots[Index]; - if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes) + uint64_t TotalSize = 0; + for (size_t Index = 0; Index < UsageSlots.size(); ++Index) { - GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index); - MemCacheTrim(Buckets, ExpireTime); - break; + TotalSize += UsageSlots[Index]; + if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes) + { + GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount); + TrimmedSize = MemCacheTrim(Buckets, ExpireTime); + break; + } } - } - }); + }); + } + catch (std::exception& Ex) + { + ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what()); + m_IsMemCacheTrimming.store(false); + } } -void +uint64_t ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime) { if (m_Configuration.MemCacheTargetFootprintBytes == 0) { - return; + return 0; } - RwLock::SharedLockScope __(m_Lock); + uint64_t TrimmedSize = 0; for (CacheBucket* Bucket : Buckets) { - Bucket->MemCacheTrim(ExpireTime); + TrimmedSize += Bucket->MemCacheTrim(ExpireTime); } const GcClock::TimePoint Now = GcClock::Now(); const GcClock::Tick NowTick = Now.time_since_epoch().count(); const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); - GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim; + GcClock::Tick LastTrimTick = m_NextAllowedTrimTick; const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); - m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); + m_NextAllowedTrimTick.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); + return TrimmedSize; } #if ZEN_WITH_TESTS diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h index 277371f2c..6997a12e4 100644 --- a/src/zenserver/cache/cachedisklayer.h +++ b/src/zenserver/cache/cachedisklayer.h @@ -197,15 +197,15 @@ public: CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config); ~CacheBucket(); - bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); - bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); - void MemCacheTrim(GcClock::TimePoint ExpireTime); - bool Drop(); - void Flush(); - void ScrubStorage(ScrubContext& Ctx); - void GatherReferences(GcContext& GcCtx); - void CollectGarbage(GcContext& GcCtx); + bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); + bool Drop(); + void Flush(); + void ScrubStorage(ScrubContext& Ctx); + void GatherReferences(GcContext& GcCtx); + void CollectGarbage(GcContext& GcCtx); inline GcStorageSize StorageSize() const { @@ -218,7 +218,7 @@ public: CacheValueDetails::BucketDetails GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const; void EnumerateBucketContents(std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const; - void GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector<uint64_t>& InOutUsageSlots); + void GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots); #if ZEN_WITH_TESTS void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time); #endif // ZEN_WITH_TESTS @@ -286,6 +286,11 @@ public: operator bool() const { return RawSize != 0 || RawHash != IoHash::Zero; }; }; + struct MemCacheData + { + IoBuffer Payload; + PayloadIndex OwnerIndex; + }; #pragma pack(pop) static_assert(sizeof(BucketPayload) == 20u); static_assert(sizeof(BucketMetaData) == 28u); @@ -323,7 +328,7 @@ public: std::vector<BucketPayload> m_Payloads; std::vector<BucketMetaData> m_MetaDatas; std::vector<MetaDataIndex> m_FreeMetaDatas; - std::vector<IoBuffer> m_MemCachedPayloads; + std::vector<MemCacheData> m_MemCachedPayloads; std::vector<MemCachedIndex> m_FreeMemCachedPayloads; std::vector<ReferenceIndex> m_FirstReferenceIndex; std::vector<IoHash> m_ReferenceHashes; @@ -364,7 +369,7 @@ public: const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData); void RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload); BucketMetaData GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const; - void SetMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload, IoBuffer& MemCachedData); + void SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData); size_t RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload); void InitializeIndexFromDisk(RwLock::ExclusiveLockScope&, bool IsNew); @@ -390,7 +395,7 @@ public: std::vector<BucketPayload>& Payloads, std::vector<AccessTime>& AccessTimes, std::vector<BucketMetaData>& MetaDatas, - std::vector<IoBuffer>& MemCachedPayloads, + std::vector<MemCacheData>& MemCachedPayloads, std::vector<ReferenceIndex>& FirstReferenceIndex, IndexMap& Index, RwLock::ExclusiveLockScope& IndexLock); @@ -405,6 +410,10 @@ public: m_MemCachedSize.fetch_sub(ValueSize, std::memory_order::relaxed); m_OuterCacheMemoryUsage.fetch_sub(ValueSize, std::memory_order::relaxed); } + static inline uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize) + { + return sizeof(MemCacheData) + sizeof(IoBufferCore) + RoundUp(PayloadSize, 8u); + } // These locks are here to avoid contention on file creation, therefore it's sufficient // that we take the same lock for the same hash @@ -436,10 +445,21 @@ private: { return; } + if (m_IsMemCacheTrimming) + { + return; + } + + const GcClock::Tick NowTick = GcClock::TickCount(); + if (NowTick < m_NextAllowedTrimTick) + { + return; + } + MemCacheTrim(); } - void MemCacheTrim(); - void MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime); + void MemCacheTrim(); + uint64_t MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime); GcManager& m_Gc; JobQueue& m_JobQueue; @@ -447,7 +467,7 @@ private: Configuration m_Configuration; std::atomic_uint64_t m_TotalMemCachedSize{}; std::atomic_bool m_IsMemCacheTrimming = false; - std::atomic<GcClock::Tick> m_LastTickMemCacheTrim; + std::atomic<GcClock::Tick> m_NextAllowedTrimTick; mutable RwLock m_Lock; std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 8db96f914..f61fbd8bc 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -338,7 +338,11 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach HttpStructuredCacheService::~HttpStructuredCacheService() { ZEN_INFO("closing structured cache"); - m_RequestRecorder.reset(); + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + } m_StatsService.UnregisterHandler("z$", *this); m_StatusService.UnregisterHandler("z$", *this); @@ -615,24 +619,44 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) if (Key == HttpZCacheUtilStartRecording) { - m_RequestRecorder.reset(); HttpServerRequest::QueryParams Params = Request.GetQueryParams(); std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); - m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath); + + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + + m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath); + m_RequestRecordingEnabled.store(true); + } + ZEN_INFO("cache RPC recording STARTED -> '{}'", RecordPath); Request.WriteResponse(HttpResponseCode::OK); return; } + if (Key == HttpZCacheUtilStopRecording) { - m_RequestRecorder.reset(); + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + } + ZEN_INFO("cache RPC recording STOPPED"); Request.WriteResponse(HttpResponseCode::OK); return; } + if (Key == HttpZCacheUtilReplayRecording) { CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; - m_RequestRecorder.reset(); + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + } + HttpServerRequest::QueryParams Params = Request.GetQueryParams(); std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); uint32_t ThreadCount = std::thread::hardware_concurrency(); @@ -643,11 +667,18 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) ThreadCount = gsl::narrow<uint32_t>(Value.value()); } } + + ZEN_INFO("initiating cache RPC replay using {} threads, from '{}'", ThreadCount, RecordPath); + std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false)); ReplayRequestRecorder(RequestContext, *Replayer, ThreadCount < 1 ? 1 : ThreadCount); + + ZEN_INFO("cache RPC replay STARTED"); + Request.WriteResponse(HttpResponseCode::OK); return; } + if (Key.starts_with(HttpZCacheDetailsPrefix)) { HandleDetailsRequest(Request); @@ -1776,11 +1807,15 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) [this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { uint64_t RequestIndex = ~0ull; - if (m_RequestRecorder) + if (m_RequestRecordingEnabled) { - RequestIndex = m_RequestRecorder->RecordRequest( - {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, - Body); + RwLock::SharedLockScope _(m_RequestRecordingLock); + if (m_RequestRecorder) + { + RequestIndex = m_RequestRecorder->RecordRequest( + {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, + Body); + } } uint32_t AcceptMagic = 0; @@ -1816,8 +1851,11 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle); if (RequestIndex != ~0ull) { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); + RwLock::SharedLockScope _(m_RequestRecordingLock); + if (m_RequestRecorder) + { + m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); + } } AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } @@ -1828,10 +1866,13 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) if (RequestIndex != ~0ull) { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->RecordResponse(RequestIndex, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + RwLock::SharedLockScope _(m_RequestRecordingLock); + if (m_RequestRecorder) + { + m_RequestRecorder->RecordResponse(RequestIndex, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } } AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h index 57a533029..2feaaead8 100644 --- a/src/zenserver/cache/httpstructuredcache.h +++ b/src/zenserver/cache/httpstructuredcache.h @@ -190,6 +190,12 @@ private: void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount); + // This exists to avoid taking locks when recording is not enabled + std::atomic_bool m_RequestRecordingEnabled{false}; + + // This lock should be taken in SHARED mode when calling into the recorder, + // and taken in EXCLUSIVE mode whenever the recorder is created or destroyed + RwLock m_RequestRecordingLock; std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder; }; diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index 5f2c3351e..012925b51 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -5,6 +5,8 @@ #include "config/luaconfig.h" #include "diag/logging.h" +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> #include <zencore/crypto.h> #include <zencore/except.h> #include <zencore/fmtutils.h> @@ -41,7 +43,7 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { std::filesystem::path -PickDefaultStateDirectory() +PickDefaultSystemRootDirectory() { // Pick sensible default PWSTR ProgramDataDir = nullptr; @@ -50,7 +52,7 @@ PickDefaultStateDirectory() if (SUCCEEDED(hRes)) { std::filesystem::path FinalPath(ProgramDataDir); - FinalPath /= L"Epic\\Zen\\Data"; + FinalPath /= L"Epic\\Zen"; ::CoTaskMemFree(ProgramDataDir); return FinalPath; @@ -66,7 +68,7 @@ PickDefaultStateDirectory() namespace zen { std::filesystem::path -PickDefaultStateDirectory() +PickDefaultSystemRootDirectory() { int UserId = getuid(); const passwd* Passwd = getpwuid(UserId); @@ -79,6 +81,62 @@ PickDefaultStateDirectory() namespace zen { +std::filesystem::path +PickDefaultStateDirectory(std::filesystem::path SystemRoot) +{ + if (SystemRoot.empty()) + return SystemRoot; + + return SystemRoot / "Data"; +} + +void +EmitCentralManifest(const std::filesystem::path& SystemRoot, Oid Identifier, CbObject Manifest, std::filesystem::path ManifestPath) +{ + CbObjectWriter Cbo; + Cbo << "path" << ManifestPath.generic_wstring(); + Cbo << "manifest" << Manifest; + + const std::filesystem::path StatesPath = SystemRoot / "States"; + + CreateDirectories(StatesPath); + WriteFile(StatesPath / fmt::format("{}", Identifier), Cbo.Save().GetBuffer().AsIoBuffer()); +} + +std::vector<CbObject> +ReadAllCentralManifests(const std::filesystem::path& SystemRoot) +{ + std::vector<CbObject> Manifests; + + DirectoryContent Content; + GetDirectoryContent(SystemRoot / "States", DirectoryContent::IncludeFilesFlag, Content); + + for (std::filesystem::path& File : Content.Files) + { + try + { + FileContents FileData = ReadFile(File); + IoBuffer DataBuffer = FileData.Flatten(); + CbValidateError ValidateError = ValidateCompactBinary(DataBuffer, CbValidateMode::All); + + if (ValidateError == CbValidateError::None) + { + Manifests.push_back(LoadCompactBinaryObject(DataBuffer)); + } + else + { + ZEN_WARN("failed to load manifest '{}': {}", File, ToString(ValidateError)); + } + } + catch (std::exception& Ex) + { + ZEN_WARN("failed to load manifest '{}': {}", File, Ex.what()); + } + } + + return Manifests; +} + void ValidateOptions(ZenServerOptions& ServerOptions) { @@ -343,6 +401,7 @@ ParseConfigFile(const std::filesystem::path& Path, LuaOptions.AddOption("server.logid"sv, ServerOptions.LogId, "log-id"sv); LuaOptions.AddOption("server.sentry.disable"sv, ServerOptions.NoSentry, "no-sentry"sv); LuaOptions.AddOption("server.sentry.allowpersonalinfo"sv, ServerOptions.SentryAllowPII, "sentry-allow-personal-info"sv); + LuaOptions.AddOption("server.systemrootdir"sv, ServerOptions.SystemRootDir, "system-dir"sv); LuaOptions.AddOption("server.datadir"sv, ServerOptions.DataDir, "data-dir"sv); LuaOptions.AddOption("server.contentdir"sv, ServerOptions.ContentDir, "content-dir"sv); LuaOptions.AddOption("server.abslog"sv, ServerOptions.AbsLogFile, "abslog"sv); @@ -370,9 +429,11 @@ ParseConfigFile(const std::filesystem::path& Path, ServerOptions.HttpServerConfig.HttpSys.IsRequestLoggingEnabled, "httpsys-enable-request-logging"sv); +#if ZEN_WITH_TRACE ////// trace LuaOptions.AddOption("trace.host"sv, ServerOptions.TraceHost, "tracehost"sv); LuaOptions.AddOption("trace.file"sv, ServerOptions.TraceFile, "tracefile"sv); +#endif ////// stats LuaOptions.AddOption("stats.enable"sv, ServerOptions.StatsConfig.Enabled); @@ -503,6 +564,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) // stream operator to convert argv value into the options type. std::fs::path // expects paths in streams to be quoted but argv paths are unquoted. By // going into a std::string first, paths with whitespace parse correctly. + std::string SystemRootDir; std::string DataDir; std::string ContentDir; std::string AbsLogFile; @@ -525,6 +587,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) options.add_options()("help", "Show command line help"); options.add_options()("t, test", "Enable test mode", cxxopts::value<bool>(ServerOptions.IsTest)->default_value("false")); options.add_options()("data-dir", "Specify persistence root", cxxopts::value<std::string>(DataDir)); + options.add_options()("system-dir", "Specify system root", cxxopts::value<std::string>(SystemRootDir)); options.add_options()("snapshot-dir", "Specify a snapshot of server state to mirror into the persistence root at startup", cxxopts::value<std::string>(BaseSnapshotDir)); @@ -975,6 +1038,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) } logging::RefreshLogLevels(); + ServerOptions.SystemRootDir = MakeSafePath(SystemRootDir); ServerOptions.DataDir = MakeSafePath(DataDir); ServerOptions.BaseSnapshotDir = MakeSafePath(BaseSnapshotDir); ServerOptions.ContentDir = MakeSafePath(ContentDir); @@ -1022,9 +1086,14 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) throw; } + if (ServerOptions.SystemRootDir.empty()) + { + ServerOptions.SystemRootDir = PickDefaultSystemRootDirectory(); + } + if (ServerOptions.DataDir.empty()) { - ServerOptions.DataDir = PickDefaultStateDirectory(); + ServerOptions.DataDir = PickDefaultStateDirectory(ServerOptions.SystemRootDir); } if (ServerOptions.AbsLogFile.empty()) diff --git a/src/zenserver/config.h b/src/zenserver/config.h index cd2d92523..b5314b600 100644 --- a/src/zenserver/config.h +++ b/src/zenserver/config.h @@ -128,6 +128,7 @@ struct ZenServerOptions zen::HttpServerConfig HttpServerConfig; ZenStructuredCacheConfig StructuredCacheConfig; ZenStatsConfig StatsConfig; + std::filesystem::path SystemRootDir; // System root directory (used for machine level config) std::filesystem::path DataDir; // Root directory for state (used for testing) std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental) std::filesystem::path AbsLogFile; // Absolute path to main log file @@ -162,4 +163,7 @@ struct ZenServerOptions void ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions); +void EmitCentralManifest(const std::filesystem::path& SystemRoot, Oid Identifier, CbObject Manifest, std::filesystem::path ManifestPath); +std::vector<CbObject> ReadAllCentralManifests(const std::filesystem::path& SystemRoot); + } // namespace zen diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 73cb35fb8..b7507bd17 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1176,8 +1176,6 @@ ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, const OplogEntryMapping& OpMapping, const OplogEntry& OpEntry) { - ZEN_TRACE_CPU("Store::Oplog::RegisterOplogEntry"); - // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however @@ -3662,11 +3660,11 @@ namespace testutils { return Result; } - uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset) + uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64_t RawOffset) { if (RawOffset > 0) { - uint64 BlockSize = 0; + uint64_t BlockSize = 0; OodleCompressor Compressor; OodleCompressionLevel CompressionLevel; if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) diff --git a/src/zenserver/upstream/zen.cpp b/src/zenserver/upstream/zen.cpp index 8ae33597a..2d52236b3 100644 --- a/src/zenserver/upstream/zen.cpp +++ b/src/zenserver/upstream/zen.cpp @@ -59,6 +59,11 @@ ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClien ZenStructuredCacheClient::~ZenStructuredCacheClient() { + RwLock::ExclusiveLockScope _(m_SessionStateLock); + for (auto& CacheEntry : m_SessionStateCache) + { + delete CacheEntry; + } } detail::ZenCacheSessionState* diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 336f715f4..f80f95f8e 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -305,7 +305,8 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen m_ProjectStore, HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile, .HttpLogPath = ServerOptions.DataDir / "logs" / "http.log", - .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"}); + .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"}, + ServerOptions); m_Http->RegisterService(*m_AdminService); return EffectiveBasePort; @@ -329,6 +330,8 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions) bool UpdateManifest = false; std::filesystem::path ManifestPath = m_DataRoot / "root_manifest"; + Oid StateId = Oid::Zero; + DateTime CreatedWhen{0}; if (!WipeState) { @@ -365,6 +368,8 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions) m_RootManifest = LoadCompactBinaryObject(Manifest); const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0); + StateId = m_RootManifest["state_id"].AsObjectId(); + CreatedWhen = m_RootManifest["created"].AsDateTime(); if (ManifestVersion != ZEN_CFG_SCHEMA_VERSION) { @@ -391,6 +396,20 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions) } } + if (StateId == Oid::Zero) + { + StateId = Oid::NewOid(); + UpdateManifest = true; + } + + const DateTime Now = DateTime::Now(); + + if (CreatedWhen.GetTicks() == 0) + { + CreatedWhen = Now; + UpdateManifest = true; + } + // Handle any state wipe if (WipeState) @@ -418,19 +437,36 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions) UpdateManifest = true; } - if (UpdateManifest) - { - // Write new manifest - - const DateTime Now = DateTime::Now(); + // Write manifest + { CbObjectWriter Cbo; - Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "created" << Now << "updated" << Now << "state_id" << Oid::NewOid(); + Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "created" << CreatedWhen << "updated" << Now << "state_id" << StateId; m_RootManifest = Cbo.Save(); - WriteFile(ManifestPath, m_RootManifest.GetBuffer().AsIoBuffer()); + if (UpdateManifest) + { + IoBuffer ManifestBuffer = m_RootManifest.GetBuffer().AsIoBuffer(); + + WriteFile(ManifestPath, ManifestBuffer); + } + + if (!ServerOptions.IsTest) + { + try + { + EmitCentralManifest(ServerOptions.SystemRootDir, StateId, m_RootManifest, ManifestPath); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Unable to emit central manifest: ", Ex.what()); + } + } } + + // Write state marker + { std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker"; static const std::string_view StateMarkerContent = "deleting this file will cause " ZEN_APP_NAME " to exit"sv; diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 71e306eca..73a8ad538 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -51,6 +51,7 @@ BlockStoreFile::GetPath() const void BlockStoreFile::Open() { + ZEN_TRACE_CPU("BlockStoreFile::Open"); uint32_t RetriesLeft = 3; m_File.Open(m_Path, BasicFile::Mode::kDelete, [&](std::error_code& Ec) { if (RetriesLeft == 0) @@ -149,6 +150,11 @@ BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::functio { m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun)); } +bool +BlockStoreFile::IsOpen() const +{ + return !!m_IoBuffer; +} constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024; @@ -285,6 +291,7 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations) BlockStore::BlockEntryCountMap BlockStore::GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent) { + ZEN_TRACE_CPU("BlockStoreFile::GetBlocksToCompact"); BlockEntryCountMap Result; { RwLock::SharedLockScope InsertLock(m_InsertLock); @@ -345,6 +352,7 @@ BlockStore::GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUs void BlockStore::Close() { + ZEN_TRACE_CPU("BlockStore::Close"); RwLock::ExclusiveLockScope InsertLock(m_InsertLock); m_WriteBlock = nullptr; m_CurrentInsertOffset = 0; @@ -666,7 +674,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, ZEN_TRACE_CPU("BlockStore::ReclaimSpace::Compact"); Ref<BlockStoreFile> NewBlockFile; auto NewBlockFileGuard = MakeGuard([&]() { - if (NewBlockFile) + if (NewBlockFile && NewBlockFile->IsOpen()) { ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath()); m_TotalSize.fetch_sub(NewBlockFile->FileSize(), std::memory_order::relaxed); @@ -754,10 +762,26 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, if (OldBlockFile) { ZEN_TRACE_CPU("BlockStore::ReclaimSpace::MoveBlock"); + + ZEN_INFO("Moving {} chunks from '{}' to new block", KeepMap.size(), GetBlockPath(m_BlocksBasePath, BlockIndex)); + + uint64_t OldBlockSize = OldBlockFile->FileSize(); std::vector<uint8_t> Chunk; for (const size_t& ChunkIndex : KeepMap) { const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; + if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize) + { + ZEN_WARN( + "ReclaimSpace skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block " + "size {}", + m_BlocksBasePath, + ChunkLocation.Offset, + ChunkLocation.Size, + OldBlockFile->GetPath(), + OldBlockSize); + continue; + } Chunk.resize(ChunkLocation.Size); OldBlockFile->Read(Chunk.data(), ChunkLocation.Size, ChunkLocation.Offset); @@ -767,6 +791,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, if (NewBlockFile) { + ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); NewBlockFile->Flush(); NewBlockFile = nullptr; } @@ -820,8 +845,8 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); + ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen()); m_ChunkBlocks.erase(NextBlockIndex); - NewBlockFile->MarkAsDeleteOnClose(); return; } @@ -846,6 +871,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, Chunk.clear(); if (NewBlockFile) { + ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); NewBlockFile->Flush(); } } @@ -880,6 +906,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, } if (NewBlockFile) { + ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); NewBlockFile->Flush(); NewBlockFile = nullptr; } @@ -1044,6 +1071,8 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, const CompactCallback& ChangeCallback, const ClaimDiskReserveCallback& DiskReserveCallback) { + ZEN_TRACE_CPU("BlockStore::CompactBlocks"); + uint64_t DeletedSize = 0; uint64_t MovedCount = 0; uint64_t MovedSize = 0; @@ -1069,7 +1098,6 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, auto NewBlockFileGuard = MakeGuard([&]() { if (NewBlockFile) { - ZEN_DEBUG("Dropping incomplete cas block store file '{}'", NewBlockFile->GetPath()); { RwLock::ExclusiveLockScope _l(m_InsertLock); if (m_ChunkBlocks[NewBlockIndex] == NewBlockFile) @@ -1077,7 +1105,11 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, m_ChunkBlocks.erase(NewBlockIndex); } } - NewBlockFile->MarkAsDeleteOnClose(); + if (NewBlockFile->IsOpen()) + { + ZEN_DEBUG("Dropping incomplete cas block store file '{}'", NewBlockFile->GetPath()); + NewBlockFile->MarkAsDeleteOnClose(); + } } }); @@ -1100,6 +1132,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, CompactState.IterateBlocks([&](uint32_t BlockIndex, const std::vector<size_t>& KeepChunkIndexes, const std::vector<BlockStoreLocation>& ChunkLocations) -> bool { + ZEN_TRACE_CPU("BlockStore::CompactBlock"); Ref<BlockStoreFile> OldBlockFile; { RwLock::SharedLockScope _(m_InsertLock); @@ -1125,6 +1158,8 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, } ZEN_ASSERT(OldBlockFile); + ZEN_INFO("Moving {} chunks from '{}' to new block", KeepChunkIndexes.size(), GetBlockPath(m_BlocksBasePath, BlockIndex)); + uint64_t OldBlockSize = OldBlockFile->FileSize(); std::vector<uint8_t> Chunk; @@ -1151,6 +1186,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, { if (NewBlockFile) { + ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); NewBlockFile->Flush(); MovedSize += NewBlockFile->FileSize(); NewBlockFile = nullptr; @@ -1179,7 +1215,6 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, NewBlockFile = new BlockStoreFile(NewBlockPath); m_ChunkBlocks[NextBlockIndex] = NewBlockFile; } - ZEN_ASSERT(NewBlockFile); std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); @@ -1191,7 +1226,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); m_ChunkBlocks.erase(NextBlockIndex); } - NewBlockFile->MarkAsDeleteOnClose(); + ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen()); NewBlockFile = nullptr; return false; } @@ -1210,7 +1245,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); m_ChunkBlocks.erase(NextBlockIndex); } - NewBlockFile->MarkAsDeleteOnClose(); + ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen()); NewBlockFile = nullptr; return false; } @@ -1251,6 +1286,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, if (NewBlockFile) { + ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); NewBlockFile->Flush(); MovedSize += NewBlockFile->FileSize(); NewBlockFile = nullptr; diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index d38099117..b20f2049a 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -91,7 +91,7 @@ CasImpl::~CasImpl() void CasImpl::Initialize(const CidStoreConfiguration& InConfig) { - ZEN_TRACE_CPU("Cas::Initialize"); + ZEN_TRACE_CPU("CAS::Initialize"); m_Config = InConfig; @@ -127,6 +127,7 @@ CasImpl::Initialize(const CidStoreConfiguration& InConfig) bool CasImpl::OpenOrCreateManifest() { + ZEN_TRACE_CPU("CAS::OpenOrCreateManifest"); bool IsNewStore = false; std::filesystem::path ManifestPath = m_Config.RootDirectory; @@ -189,6 +190,7 @@ CasImpl::OpenOrCreateManifest() void CasImpl::UpdateManifest() { + ZEN_TRACE_CPU("CAS::UpdateManifest"); if (!m_ManifestObject) { CbObjectWriter Cbo; @@ -266,6 +268,7 @@ CasImpl::ContainsChunk(const IoHash& ChunkHash) void CasImpl::FilterChunks(HashKeySet& InOutChunks) { + ZEN_TRACE_CPU("CAS::FilterChunks"); m_SmallStrategy.FilterChunks(InOutChunks); m_TinyStrategy.FilterChunks(InOutChunks); m_LargeStrategy.FilterChunks(InOutChunks); @@ -274,6 +277,7 @@ CasImpl::FilterChunks(HashKeySet& InOutChunks) void CasImpl::Flush() { + ZEN_TRACE_CPU("CAS::Flush"); ZEN_INFO("flushing CAS pool at '{}'", m_Config.RootDirectory); m_SmallStrategy.Flush(); m_TinyStrategy.Flush(); diff --git a/src/zenstore/caslog.cpp b/src/zenstore/caslog.cpp index cf3bd76da..2c26e522f 100644 --- a/src/zenstore/caslog.cpp +++ b/src/zenstore/caslog.cpp @@ -229,7 +229,8 @@ CasLogFile::Append(const void* DataPointer, uint64_t DataSize) if (Ec) { - throw std::system_error(Ec, fmt::format("Failed to write to log file '{}'", PathFromHandle(m_File.Handle()))); + std::error_code Dummy; + throw std::system_error(Ec, fmt::format("Failed to write to log file '{}'", PathFromHandle(m_File.Handle(), Dummy))); } } diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index b21f9f8d8..64c1dadf8 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -244,6 +244,7 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) void CasContainerStrategy::Flush() { + ZEN_TRACE_CPU("CasContainer::Flush"); m_BlockStore.Flush(/*ForceNewBlock*/ false); m_CasLog.Flush(); MakeIndexSnapshot(); @@ -470,7 +471,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) size_t ChunkIndex = ChunkLocations.size(); ChunkLocations.push_back(Location); - ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; + ChunkIndexToChunkHash.push_back(ChunkHash); if (Keep) { KeepChunkIndexes.push_back(ChunkIndex); diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index de653b0e3..4cc2c3ed1 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -1876,18 +1876,17 @@ GcScheduler::SchedulerThread() NextTriggerStatus = Sb; } - ZEN_INFO( - "{} used{}. '{}': {} in use, {} free. Disk writes last {} per {} [{}], peak {}/s.{}", - NiceBytes(TotalSize.DiskSize), - DiskSizeSoftLimit == 0 ? "" : fmt::format(", {} soft limit", NiceBytes(DiskSizeSoftLimit)), - m_Config.RootDirectory, - NiceBytes(Space.Total - Space.Free), - NiceBytes(Space.Free), - NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(LoadGraphTime).count())), - NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(LoadGraphTime).count() / PressureGraphLength)), - LoadGraph, - NiceBytes(MaxLoad * uint64_t(std::chrono::seconds(1).count()) / uint64_t(std::chrono::seconds(LoadGraphTime).count())), - NextTriggerStatus); + ZEN_INFO("{} used{}. '{}': {} in use, {} free. Disk writes last {} per {} [{}], peak {}/s.{}", + NiceBytes(TotalSize.DiskSize), + DiskSizeSoftLimit == 0 ? "" : fmt::format(", {} soft limit", NiceBytes(DiskSizeSoftLimit)), + m_Config.RootDirectory, + NiceBytes(Space.Total - Space.Free), + NiceBytes(Space.Free), + NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(LoadGraphTime).count())), + NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(LoadGraphTime).count() / PressureGraphLength)), + LoadGraph, + NiceBytes(MaxLoad / uint64_t(std::chrono::seconds(m_Config.MonitorInterval).count())), + NextTriggerStatus); if (!DiskSpaceGCTriggered && !TimeBasedGCTriggered) { diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index 786780b5e..bb36cb3cd 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -101,6 +101,7 @@ struct BlockStoreFile : public RefCounted void Flush(); BasicFile& GetBasicFile(); void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); + bool IsOpen() const; private: const std::filesystem::path m_Path; diff --git a/src/zenutil/basicfile.cpp b/src/zenutil/basicfile.cpp index 173b22449..024b1e5bf 100644 --- a/src/zenutil/basicfile.cpp +++ b/src/zenutil/basicfile.cpp @@ -241,7 +241,8 @@ BasicFile::Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset) if (!Success) { - ThrowLastError(fmt::format("Failed to read from file '{}'", zen::PathFromHandle(m_FileHandle))); + std::error_code Dummy; + ThrowLastError(fmt::format("Failed to read from file '{}'", zen::PathFromHandle(m_FileHandle, Dummy))); } BytesToRead -= NumberOfBytesToRead; @@ -374,7 +375,8 @@ BasicFile::Write(const void* Data, uint64_t Size, uint64_t Offset) if (Ec) { - throw std::system_error(Ec, fmt::format("Failed to write to file '{}'", zen::PathFromHandle(m_FileHandle))); + std::error_code Dummy; + throw std::system_error(Ec, fmt::format("Failed to write to file '{}'", zen::PathFromHandle(m_FileHandle, Dummy))); } } @@ -426,7 +428,8 @@ BasicFile::FileSize() int Error = zen::GetLastError(); if (Error) { - ThrowSystemError(Error, fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle))); + std::error_code Dummy; + ThrowSystemError(Error, fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy))); } } return uint64_t(liFileSize.QuadPart); @@ -436,7 +439,8 @@ BasicFile::FileSize() struct stat Stat; if (fstat(Fd, &Stat) == -1) { - ThrowSystemError(GetLastError(), fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle))); + std::error_code Dummy; + ThrowSystemError(GetLastError(), fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy))); } return uint64_t(Stat.st_size); #endif @@ -483,7 +487,9 @@ BasicFile::SetFileSize(uint64_t FileSize) int Error = zen::GetLastError(); if (Error) { - ThrowSystemError(Error, fmt::format("Failed to set file pointer to {} for file {}", FileSize, PathFromHandle(m_FileHandle))); + std::error_code Dummy; + ThrowSystemError(Error, + fmt::format("Failed to set file pointer to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy))); } } OK = ::SetEndOfFile(m_FileHandle); @@ -492,7 +498,9 @@ BasicFile::SetFileSize(uint64_t FileSize) int Error = zen::GetLastError(); if (Error) { - ThrowSystemError(Error, fmt::format("Failed to set end of file to {} for file {}", FileSize, PathFromHandle(m_FileHandle))); + std::error_code Dummy; + ThrowSystemError(Error, + fmt::format("Failed to set end of file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy))); } } #elif ZEN_PLATFORM_MAC @@ -502,7 +510,9 @@ BasicFile::SetFileSize(uint64_t FileSize) int Error = zen::GetLastError(); if (Error) { - ThrowSystemError(Error, fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle))); + std::error_code Dummy; + ThrowSystemError(Error, + fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy))); } } #else @@ -512,7 +522,9 @@ BasicFile::SetFileSize(uint64_t FileSize) int Error = zen::GetLastError(); if (Error) { - ThrowSystemError(Error, fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle))); + std::error_code Dummy; + ThrowSystemError(Error, + fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy))); } } if (FileSize > 0) @@ -520,7 +532,9 @@ BasicFile::SetFileSize(uint64_t FileSize) int Error = posix_fallocate64(Fd, 0, (off64_t)FileSize); if (Error) { - ThrowSystemError(Error, fmt::format("Failed to allocate space of {} for file {}", FileSize, PathFromHandle(m_FileHandle))); + std::error_code Dummy; + ThrowSystemError(Error, + fmt::format("Failed to allocate space of {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy))); } } #endif diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index 054ac0e56..b8f9d65ef 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -2,8 +2,12 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/filesystem.h> +#include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/session.h> #include <zencore/system.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> #include <zenutil/basicfile.h> #include <zenutil/cache/rpcrecording.h> @@ -229,7 +233,6 @@ public: } virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } -private: virtual uint64_t GetRequestCount() const override { return m_RequestCount; } virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override @@ -238,6 +241,7 @@ private: } virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } +private: std::uint64_t m_RequestCount; RecordedRequestsReader m_RequestBuffer; }; @@ -256,15 +260,22 @@ struct RecordedRequest uint32_t Length; // 4 bytes ZenContentType ContentType; // 1 byte ZenContentType AcceptType; // 1 byte - uint8_t Padding; // 1 byte + uint8_t OffsetHigh; // 1 byte uint8_t Padding2; // 1 byte Oid SessionId; // 12 bytes + + inline uint64_t GetOffset() const { return uint64_t(Offset) + (uint64_t(OffsetHigh) << 32); } + inline void SetOffset(uint64_t NewOffset) + { + Offset = gsl::narrow_cast<uint32_t>(NewOffset & 0xffff'ffff); + OffsetHigh = gsl::narrow_cast<uint8_t>(NewOffset >> 32); + } }; static_assert(sizeof(RecordedRequest) == 24); const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB -const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; // 1MiB +const uint64_t StandaloneFileSizeThreshold = 16 * 1024 * 1024ull; // 16MiB const uint64_t SegmentRequestCount = 10 * 1000 * 1000; const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the // number of files in a directory below this level @@ -272,6 +283,12 @@ const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024; const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0}; +std::string +MakeSegmentPath(uint64_t SegmentIndex) +{ + return fmt::format("segment_{:06}", SegmentIndex); +} + struct RecordedRequestsSegmentWriter { RecordedRequestsSegmentWriter() = default; @@ -291,6 +308,7 @@ struct RecordedRequestsSegmentWriter return m_RequestCount; } + RwLock::SharedLockScope _(m_Lock); return m_Entries.size(); } inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; } @@ -321,6 +339,7 @@ struct RecordedRequestsWriter RecordedRequestsSegmentWriter& EnsureCurrentSegment(); void CommitCurrentSegment(RwLock::ExclusiveLockScope&); void EndWrite(); + void WriteRecordingMetadata(); uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); private: @@ -434,50 +453,58 @@ uint64_t RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { const uint64_t RequestBufferSize = RequestBuffer.GetSize(); + uint64_t RequestIndex = ~0ull; - RwLock::ExclusiveLockScope Lock(m_Lock); - uint64_t RequestIndex = m_Entries.size(); - RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u, - .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), - .ContentType = RequestInfo.ContentType, - .AcceptType = RequestInfo.AcceptType, - .Padding = 0, - .Padding2 = 0, - .SessionId = RequestInfo.SessionId}); - - if (Entry.Length < StandaloneFileSizeThreshold) { - const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); + RwLock::ExclusiveLockScope Lock(m_Lock); + RequestIndex = m_Entries.size(); + RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u, + .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), + .ContentType = RequestInfo.ContentType, + .AcceptType = RequestInfo.AcceptType, + .OffsetHigh = 0, + .Padding2 = 0, + .SessionId = RequestInfo.SessionId}); - if (BlockIndex == m_BlockFiles.size()) + if (Entry.Length < StandaloneFileSizeThreshold) { - std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); - NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); - m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; - ++m_FileCount; - } + const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); - ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); - BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); - ZEN_ASSERT(BlockFile != nullptr); + if (BlockIndex == m_BlockFiles.size()) + { + std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); + NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); + m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; + ++m_FileCount; + } - Entry.Offset = uint32_t(m_ChunkOffset & 0xffffFFFF); - m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); - Lock.ReleaseNow(); + ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); + BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); + ZEN_ASSERT(BlockFile != nullptr); - std::error_code Ec; - BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); - if (Ec) - { - Entry.Length = 0; - return ~0ull; - } + // Note that this is the overall logical offset, not the offset within a single file + const uint64_t ChunkWriteOffset = m_ChunkOffset; + m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u); + Entry.SetOffset(ChunkWriteOffset); + Lock.ReleaseNow(); - m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); + std::error_code Ec; + BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec); + if (Ec) + { + // We cannot simply use `Entry` here because the vector may + // have been reallocated causing the entry to be in a different + // location + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = 0; + return ~0ull; + } - return RequestIndex; + m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); + + return RequestIndex; + } } - Lock.ReleaseNow(); // Write request data to standalone file @@ -491,7 +518,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn // The exact value of the entry is not important, we will use // the size of the standalone file regardless when performing // the read - Entry.Length = std::numeric_limits<uint32_t>::max(); + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = std::numeric_limits<uint32_t>::max(); } ++m_FileCount; @@ -501,7 +529,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn if (Ec) { - Entry.Length = 0; + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = 0; return ~0ull; } @@ -511,7 +540,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn } catch (std::exception&) { - Entry.Length = 0; + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = 0; return ~0ull; } } @@ -529,7 +559,7 @@ RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, { if (R.Offset != ~0u) { - MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); + MaxChunkPosition = Max(MaxChunkPosition, R.GetOffset() + R.Length); } } uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1; @@ -547,6 +577,7 @@ RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, } return m_Entries.size(); } + void RecordedRequestsSegmentReader::EndRead() { @@ -571,9 +602,10 @@ RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutB if (Entry.Offset != ~0u) { // Inline in block file - uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); - uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); - OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); + const uint64_t EntryOffset = Entry.GetOffset(); + const uint32_t BlockIndex = gsl::narrow<uint32_t>((EntryOffset + Entry.Length) / RecordedRequestBlockSize); + const uint64_t ChunkOffset = EntryOffset - (BlockIndex * RecordedRequestBlockSize); + OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); return RequestInfo; } @@ -600,6 +632,8 @@ RecordedRequestsWriter::EnsureCurrentSegment() { bool StartNewSegment = false; + TimeSpan SegmentAge(DateTime::NowTicks() - m_CurrentWriter->GetStartTime().GetTicks()); + if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount) { ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount); @@ -615,6 +649,12 @@ RecordedRequestsWriter::EnsureCurrentSegment() ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold); StartNewSegment = true; } + else if (SegmentAge >= SegmentTimeThreshold) + { + ZEN_DEBUG("starting new RPC recording segment due to age >= {}", + NiceTimeSpanMs(SegmentTimeThreshold.GetTicks() / TimeSpan::TicksPerMillisecond)); + StartNewSegment = true; + } if (StartNewSegment) { @@ -627,7 +667,7 @@ RecordedRequestsWriter::EnsureCurrentSegment() const uint64_t SegmentIndex = m_FinishedSegments.size(); m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>(); - m_CurrentWriter->BeginWrite(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex); + m_CurrentWriter->BeginWrite(m_BasePath / MakeSegmentPath(SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex); } return *m_CurrentWriter; @@ -654,8 +694,22 @@ RecordedRequestsWriter::EndWrite() CommitCurrentSegment(_); - // Emit some metadata alongside the recording + WriteRecordingMetadata(); +} + +uint64_t +RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) +{ + RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); + + const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer); + + return Writer.GetBaseRequestIndex() + SegmentLocalIndex; +} +void +RecordedRequestsWriter::WriteRecordingMetadata() +{ try { DateTime EndTime = DateTime::Now(); @@ -702,16 +756,6 @@ RecordedRequestsWriter::EndWrite() } } -uint64_t -RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) -{ - RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); - - const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer); - - return Writer.GetBaseRequestIndex() + SegmentLocalIndex; -} - ////////////////////////////////////////////////////////////////////////// uint64_t @@ -720,26 +764,89 @@ RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool In m_InMemory = InMemory; m_BasePath = BasePath; - BasicFile InfoFile; - InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead); - CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll()); + std::error_code Ec; + BasicFile InfoFile; + InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead, Ec); + + if (!Ec) + { + try + { + CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll()); + + uint64_t TotalRequestCount = 0; + uint64_t MaxSegmentIndex = 0; + + for (auto SegmentElement : CbInfo["segments"]) + { + CbObjectView Segment = SegmentElement.AsObjectView(); + + const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(), + .BaseRequestIndex = Segment["base_index"sv].AsUInt64(), + .RequestCount = Segment["request_count"sv].AsUInt64(), + .RequestBytes = Segment["request_bytes"sv].AsUInt64(), + .StartTime = Segment["start_time"sv].AsDateTime(), + .EndTime = Segment["end_time"sv].AsDateTime()}); + + TotalRequestCount += Info.RequestCount; + MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + } + + m_SegmentReaders.resize(MaxSegmentIndex + 1); + + return TotalRequestCount; + } + catch (std::exception& Ex) + { + ZEN_WARN("could not read metadata file: {}", Ex.what()); + } + } + + ZEN_INFO("recovering segment info for '{}'", BasePath); uint64_t TotalRequestCount = 0; uint64_t MaxSegmentIndex = 0; - for (auto SegmentElement : CbInfo["segments"]) + try + { + for (int SegmentIndex = 0;; ++SegmentIndex) + { + const std::filesystem::path ZcbPath = BasePath / MakeSegmentPath(SegmentIndex) / "rpc_segment_info.zcb"; + FileContents Fc = ReadFile(ZcbPath); + + if (Fc.ErrorCode) + break; + + if (IoBuffer SegmentInfoBuffer = Fc.Flatten()) + { + CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer); + + const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(), + .BaseRequestIndex = 0, + .RequestCount = Segment["entry_count"sv].AsUInt64(), + .RequestBytes = 0, + .StartTime = Segment["time_start"sv].AsDateTime(), + .EndTime = Segment["time_end"sv].AsDateTime()}); + + TotalRequestCount += Info.RequestCount; + MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + } + } + } + catch (std::exception&) { - CbObjectView Segment = SegmentElement.AsObjectView(); + } + + std::sort(begin(m_KnownSegments), end(m_KnownSegments), [](const auto& Lhs, const auto& Rhs) { + return Lhs.SegmentIndex < Rhs.SegmentIndex; + }); - const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(), - .BaseRequestIndex = Segment["base_index"sv].AsUInt64(), - .RequestCount = Segment["request_count"sv].AsUInt64(), - .RequestBytes = Segment["request_bytes"sv].AsUInt64(), - .StartTime = Segment["start_time"sv].AsDateTime(), - .EndTime = Segment["end_time"sv].AsDateTime()}); + uint64_t SegmentRequestOffset = 0; - TotalRequestCount += Info.RequestCount; - MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + for (SegmentInfo& Info : m_KnownSegments) + { + Info.BaseRequestIndex = SegmentRequestOffset; + SegmentRequestOffset += Info.RequestCount; } m_SegmentReaders.resize(MaxSegmentIndex + 1); @@ -776,7 +883,7 @@ RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) if (!SegmentReaderPtr) { RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader; - NewSegment->BeginRead(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), m_InMemory); + NewSegment->BeginRead(m_BasePath / MakeSegmentPath(SegmentIndex), m_InMemory); SegmentReaderPtr.reset(NewSegment); } @@ -806,7 +913,6 @@ public: DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } -private: virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override { return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); @@ -814,6 +920,7 @@ private: virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} +private: RecordedRequestsWriter m_RecordedRequests; }; @@ -822,23 +929,41 @@ class DiskRequestReplayer : public IRpcRequestReplayer public: DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) { - m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory); + m_RequestCount = m_RequestReader.BeginRead(BasePath, InMemory); } - virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } + virtual ~DiskRequestReplayer() { m_RequestReader.EndRead(); } - static bool IsCompatible(const std::filesystem::path& BasePath) { return std::filesystem::exists(BasePath / "rpc_recording_info.zcb"); } + static bool IsCompatible(const std::filesystem::path& BasePath) + { + if (std::filesystem::exists(BasePath / "rpc_recording_info.zcb")) + { + return true; + } + + const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0); + + if (std::filesystem::exists(SegmentZero / "rpc_segment_info.zcb") && std::filesystem::exists(SegmentZero / "index.bin")) + { + // top-level metadata is missing, possibly because of premature exit + // on the recording side + + return true; + } + + return false; + } -private: virtual uint64_t GetRequestCount() const override { return m_RequestCount; } virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override { - return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); + return m_RequestReader.ReadRequest(RequestIndex, OutBuffer); } virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } +private: std::uint64_t m_RequestCount; - RecordedRequestsReader m_RequestBuffer; + RecordedRequestsReader m_RequestReader; }; } // namespace zen::cache::v2 @@ -866,4 +991,66 @@ MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) } } +#if ZEN_WITH_TESTS + +void +rpcrecord_forcelink() +{ +} + +TEST_SUITE_BEGIN("rpc.recording"); + +TEST_CASE("rpc.record") +{ + ScopedTemporaryDirectory TempDir; + auto Path = TempDir.Path(); + + const Oid SessionId = GetSessionId(); + + using namespace std::literals; + + { + cache::v2::DiskRequestRecorder Recorder{Path}; + + for (int i = 0; i < 1000; ++i) + { + RecordedRequestInfo RequestInfo{.ContentType = ZenContentType::kCbObject, + .AcceptType = ZenContentType::kCbObject, + .SessionId = SessionId}; + + CbObjectWriter RequestPayload; + RequestPayload << "test"sv << true; + RequestPayload << "index"sv << i; + CbObject Req = RequestPayload.Save(); + IoBuffer RequestBuffer = Req.GetBuffer().AsIoBuffer(); + + const uint64_t Index = Recorder.RecordRequest(RequestInfo, RequestBuffer); + + CHECK(Index == i); + } + } + + { + cache::v2::DiskRequestReplayer Replayer{Path, false}; + + for (int i = 0; i < 1000; ++i) + { + IoBuffer RequestBuffer; + RecordedRequestInfo RequestInfo = Replayer.GetRequest(i, RequestBuffer); + + CHECK(RequestInfo.AcceptType == ZenContentType::kCbObject); + CHECK(RequestInfo.ContentType == ZenContentType::kCbObject); + CHECK(RequestInfo.SessionId == SessionId); + + CbObject Req = LoadCompactBinaryObject(RequestBuffer); + CHECK_EQ(Req["index"sv].AsInt32(), i); + CHECK_EQ(Req["test"sv].AsBool(), true); + } + } +} + +TEST_SUITE_END(); + +#endif + } // namespace zen::cache diff --git a/src/zenutil/include/zenutil/cache/rpcrecording.h b/src/zenutil/include/zenutil/cache/rpcrecording.h index fd5df26ad..ab9b92dd3 100644 --- a/src/zenutil/include/zenutil/cache/rpcrecording.h +++ b/src/zenutil/include/zenutil/cache/rpcrecording.h @@ -4,6 +4,9 @@ #include <zencore/compositebuffer.h> #include <zencore/iobuffer.h> +#include <zencore/uid.h> + +#include <compare> namespace zen::cache { @@ -13,6 +16,7 @@ struct RecordedRequestInfo ZenContentType AcceptType; Oid SessionId; + inline std::strong_ordering operator<=>(const RecordedRequestInfo& Rhs) const = default; static const RecordedRequestInfo NullRequest; }; @@ -37,4 +41,6 @@ public: std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath); std::unique_ptr<cache::IRpcRequestReplayer> MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory); +void rpcrecord_forcelink(); + } // namespace zen::cache diff --git a/src/zenutil/logging.cpp b/src/zenutil/logging.cpp index fedfdc7e8..64230ea81 100644 --- a/src/zenutil/logging.cpp +++ b/src/zenutil/logging.cpp @@ -23,6 +23,7 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { +static bool g_IsLoggingInitialized; spdlog::sink_ptr g_FileSink; spdlog::sink_ptr @@ -83,6 +84,14 @@ BeginInitializeLogging(const LoggingOptions& LogOptions) /* max size */ 128 * 1024 * 1024, /* max files */ 16, /* rotate on open */ true); + if (LogOptions.AbsLogFile.extension() == ".json") + { + FileSink->set_formatter(std::make_unique<logging::json_formatter>(LogOptions.LogId)); + } + else + { + FileSink->set_formatter(std::make_unique<logging::full_formatter>(LogOptions.LogId)); // this will have a date prefix + } } std::set_terminate([]() { ZEN_CRITICAL("Program exited abnormally via std::terminate()"); }); @@ -173,31 +182,25 @@ FinishInitializeLogging(const LoggingOptions& LogOptions) spdlog::set_formatter( std::make_unique<logging::full_formatter>(LogOptions.LogId, std::chrono::system_clock::now())); // default to duration prefix - if (g_FileSink) - { - if (LogOptions.AbsLogFile.extension() == ".json") - { - g_FileSink->set_formatter(std::make_unique<logging::json_formatter>(LogOptions.LogId)); - } - else - { - g_FileSink->set_formatter(std::make_unique<logging::full_formatter>(LogOptions.LogId)); // this will have a date prefix - } - } - const std::string StartLogTime = zen::DateTime::Now().ToIso8601(); spdlog::apply_all([&](auto Logger) { Logger->info("log starting at {}", StartLogTime); }); + + g_IsLoggingInitialized = true; } void ShutdownLogging() { - g_FileSink.reset(); + if (g_IsLoggingInitialized) + { + auto DefaultLogger = zen::logging::Default(); + ZEN_LOG_INFO(DefaultLogger, "log ending at {}", zen::DateTime::Now().ToIso8601()); + } - auto DefaultLogger = zen::logging::Default(); - ZEN_LOG_INFO(DefaultLogger, "log ending at {}", zen::DateTime::Now().ToIso8601()); zen::logging::ShutdownLogging(); + + g_FileSink.reset(); } } // namespace zen diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp index 00db5a25b..eba3613f1 100644 --- a/src/zenutil/zenutil.cpp +++ b/src/zenutil/zenutil.cpp @@ -6,6 +6,7 @@ # include <zenutil/basicfile.h> # include <zenutil/process.h> +# include <zenutil/cache/rpcrecording.h> namespace zen { @@ -14,6 +15,7 @@ zenutil_forcelinktests() { basicfile_forcelink(); process_forcelink(); + cache::rpcrecord_forcelink(); } } // namespace zen |