aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-12-19 12:06:13 +0100
committerGitHub <[email protected]>2023-12-19 12:06:13 +0100
commit519d942d809e740a3b1fe5a1f6a57a4cfe43408b (patch)
tree9b3c084e21bb7fd5e6bb3335e890647062d0703b
parentadded mimalloc_hooks (diff)
parentensure we can build without trace (#619) (diff)
downloadzen-273-integrated-memory-tracking.tar.xz
zen-273-integrated-memory-tracking.zip
Merge branch 'main' into 273-integrated-memory-tracking273-integrated-memory-tracking
-rw-r--r--.github/workflows/create_release.yml2
-rw-r--r--.github/workflows/validate.yml3
-rw-r--r--CHANGELOG.md31
-rw-r--r--VERSION.txt2
-rw-r--r--scripts/bundle.lua30
-rw-r--r--src/zen/cmds/info_cmd.cpp51
-rw-r--r--src/zen/cmds/info_cmd.h24
-rw-r--r--src/zen/cmds/rpcreplay_cmd.cpp234
-rw-r--r--src/zen/zen.cpp3
-rw-r--r--src/zenbase/include/zenbase/refcount.h2
-rw-r--r--src/zencore/compactbinary.cpp52
-rw-r--r--src/zencore/compress.cpp4
-rw-r--r--src/zencore/filesystem.cpp12
-rw-r--r--src/zencore/include/zencore/compactbinary.h36
-rw-r--r--src/zencore/include/zencore/compactbinarybuilder.h1
-rw-r--r--src/zencore/include/zencore/compactbinaryvalidation.h1
-rw-r--r--src/zencore/include/zencore/iobuffer.h17
-rw-r--r--src/zencore/include/zencore/string.h7
-rw-r--r--src/zencore/include/zencore/trace.h1
-rw-r--r--src/zencore/iobuffer.cpp4
-rw-r--r--src/zencore/jobqueue.cpp7
-rw-r--r--src/zencore/memory.cpp26
-rw-r--r--src/zencore/thread.cpp13
-rw-r--r--src/zencore/workthreadpool.cpp4
-rw-r--r--src/zencore/zencore.cpp6
-rw-r--r--src/zenhttp/httpshared.cpp4
-rw-r--r--src/zenhttp/servers/httpparser.h4
-rw-r--r--src/zenserver/admin/admin.cpp129
-rw-r--r--src/zenserver/admin/admin.h29
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp323
-rw-r--r--src/zenserver/cache/cachedisklayer.h52
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp71
-rw-r--r--src/zenserver/cache/httpstructuredcache.h6
-rw-r--r--src/zenserver/config.cpp77
-rw-r--r--src/zenserver/config.h4
-rw-r--r--src/zenserver/projectstore/projectstore.cpp6
-rw-r--r--src/zenserver/upstream/zen.cpp5
-rw-r--r--src/zenserver/zenserver.cpp52
-rw-r--r--src/zenstore/blockstore.cpp50
-rw-r--r--src/zenstore/cas.cpp6
-rw-r--r--src/zenstore/caslog.cpp3
-rw-r--r--src/zenstore/compactcas.cpp3
-rw-r--r--src/zenstore/gc.cpp23
-rw-r--r--src/zenstore/include/zenstore/blockstore.h1
-rw-r--r--src/zenutil/basicfile.cpp32
-rw-r--r--src/zenutil/cache/rpcrecording.cpp341
-rw-r--r--src/zenutil/include/zenutil/cache/rpcrecording.h6
-rw-r--r--src/zenutil/logging.cpp33
-rw-r--r--src/zenutil/zenutil.cpp2
49 files changed, 1343 insertions, 492 deletions
diff --git a/.github/workflows/create_release.yml b/.github/workflows/create_release.yml
index e6616aa25..03295eee5 100644
--- a/.github/workflows/create_release.yml
+++ b/.github/workflows/create_release.yml
@@ -44,7 +44,7 @@ jobs:
- name: Bundle
run: |
- xmake bundle -v -y
+ xmake bundle -v -y --codesignidentity="Epic Games"
env:
VCPKG_ROOT: ${{ github.workspace }}/.vcpkg
diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml
index bdaf672f2..0702e6fa0 100644
--- a/.github/workflows/validate.yml
+++ b/.github/workflows/validate.yml
@@ -2,6 +2,7 @@ name: Validate
env:
VCPKG_VERSION: 2023.07.21
XMAKE_VERSION: 2.8.2 # 2.8.3 breaks fetching of asio package on MacOS ARM
+ WINDOWS_SDK_VERSION: 22621
on:
pull_request:
@@ -96,7 +97,7 @@ jobs:
- name: Bundle
if: ${{ matrix.config == 'release' }}
run: |
- xmake bundle -v -y
+ xmake bundle -v -y --codesignidentity="Epic Games"
env:
VCPKG_ROOT: ${{ github.workspace }}/.vcpkg
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3d38746af..9c6b761da 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,27 @@
##
+- Bugfix: Cache RPC recording would drop data when it reached 4GB of inline chunk data in a segment
+- Bugfix: Fixed thread safety issues in RPC recorder v2
+- Bugfix: `IoBuffer::Materialize` would leak memory for small buffers
+- Bugfix: Fix crash bug when trying to inspect non-open block file in GC
+- Bugfix: Fixed up code so we can build everything even when trace support is disabled
+- Bugfix: Make sure we initialize the pattern of FileSink before it is added as a usable logger
+- Bugfix: Various minor TSAN/ASAN fixes (see PR #622)
+- Improvement: Cache RPC replay can now process partial recordings by recovering metadata from available files
+- Improvement: Cache RPC recording now limits duration of individual segments to 1h
+- Improvement: Made RPC replay command line parsing more robust by ensuring at least one processing thread is in use
+- Improvement: Windows executables are now signed with official cert when creating a release
+- Improvement: Each block in block store that is rewritten will now be logged for better feedback
+
+## 0.2 37
+- Bugfix: ShutdownLogging code would throw an exception if it was called before everything had been initialised properly
+- Bugfix: Reorder shutdown to avoid crash due to late async log messages (spdlog workaround)
+- Bugfix: Correctly calculate peak disk write size in GC status message
+- Bugfix: Skip invalid chunks in block store GC when moving existing chunks
+- Bugfix: Don't use copy of Payloads array when fetching memcached payload in GC
+- Bugfix: Make sure IoBuffer is a valid null-buffer after move operation
+- Improvement: Adjusted and added some trace scopes
+
+## 0.2.36
- Feature: Added xmake task `updatefrontend` which updates the zip file containing the frontend html (`/src/zenserver/frontend/html.zip`)
- Feature: Added `--powercycle` option to zenserver which causes it do shut down immediately after initialization is completed. This is useful for profiling startup/shutdown primarily but could also be useful for some kinds of validation/state upgrade scenarios
- Feature: New endpoint `/admin/gc-stop` to cancel a running garbage collect operation
@@ -14,6 +37,8 @@
- Bugfix: Use correct lookup index when checking for memcached buffer when finding references in diskcache GC
- Bugfix: CasContainerStrategy::ReadIndexFile issue could cause CAS items to not be found after a shutdown/restart cycle
- Bugfix: Make sure we don't hold the namespace bucket lock when we create buckets to avoid deadlock
+- Bugfix: Make sure that PathFromHandle don't hide true error when throwing exceptions
+- Bugfix: Allow attachments that contains a raw size of zero
- Improvement: The frontend html content is no longer appended at the end of the executable which prevented signing, instead it is compiled in from the `/src/zenserver/frontend/html.zip` archive
- Improvement: MacOS now does ad-hoc code signing by default when issuing `xmake bundle`, signing with proper cert is done on CI builds
- Improvement: Updated branding to be consistent with current working name ("Unreal Zen Storage Server" etc)
@@ -36,9 +61,15 @@
- Improvement: Make a more accurate estimation of memory usage for in-memory cache values
- Improvement: Added detailed debug logging for pluggable transports
- Improvement: Improved formatting of multi-line logging. Each line is now indented to line up with the initial line to make reading the output easier
+- Improvement: Refactor memory cache for faster trimming and correct trim reporting
+- Improvement: Added trace scopes for memory cache trimming
- Improvement: Pass lock scope to helper functions to clarify locking rules
- Improvement: Block flush and gc operations for a bucket that is not yet initialized
- Improvement: Add ZenCacheDiskLayer::GetOrCreateBucket to avoid code duplication
+- Improvement: Scrub operation now validates compressed buffer hashes in filecas storage (used for large chunks)
+- Improvement: Added `--dry`, `--no-gc` and `--no-cas` options to `zen scrub` command
+- Improvement: Implemented oplog scrubbing (previously was a no-op)
+- Improvement: Implemented support for running scrubbint at startup with --scrub=<options>
## 0.2.35
- Bugfix: Fix timeout calculation for semtimedop call
diff --git a/VERSION.txt b/VERSION.txt
index 48a7bba93..834592c35 100644
--- a/VERSION.txt
+++ b/VERSION.txt
@@ -1 +1 @@
-0.2.36-pre3 \ No newline at end of file
+0.2.38-pre1 \ No newline at end of file
diff --git a/scripts/bundle.lua b/scripts/bundle.lua
index 207122345..7294043ec 100644
--- a/scripts/bundle.lua
+++ b/scripts/bundle.lua
@@ -156,7 +156,7 @@ local function _find_vcpkg_binary(triple, port, binary)
end
--------------------------------------------------------------------------------
-local function main_windows()
+local function main_windows(signidentity)
import("core.base.option")
zip_path = "build/zenserver-win64.zip"
@@ -168,6 +168,31 @@ local function main_windows()
_build("x64", false, config_args)
+ if signidentity == nil or signidentity == "" then
+ print("Skipping signing since no signidentity was given")
+ else
+ program_files_path = os.getenv("PROGRAMFILES(x86)")
+ signtool_path = program_files_path .. "/Windows Kits/10/App Certification Kit/signtool.exe"
+ if not os.isfile(signtool_path) then
+ raise("Failed signing, unable to locate signtool at "..signtool_path)
+ end
+ local ret = _exec(signtool_path,
+ "sign",
+ "/sm",
+ "/a",
+ "/n",
+ signidentity,
+ "/tr",
+ "http://timestamp.digicert.com",
+ "/v",
+ "/as",
+ "build/windows/x64/release/zenserver.exe",
+ "build/windows/x64/release/zen.exe")
+ if ret > 0 then
+ raise("Failed signing zenserver binary")
+ end
+ end
+
local crashpad_handler_path = _find_vcpkg_binary(
"x64-windows-static",
"sentry-native",
@@ -272,7 +297,8 @@ import("core.base.option")
function main()
if is_host("windows") then
- return main_windows()
+ signidentity = option.get("codesignidentity")
+ return main_windows(signidentity)
end
if is_host("mac") then
diff --git a/src/zen/cmds/info_cmd.cpp b/src/zen/cmds/info_cmd.cpp
new file mode 100644
index 000000000..aec8ca46b
--- /dev/null
+++ b/src/zen/cmds/info_cmd.cpp
@@ -0,0 +1,51 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "info_cmd.h"
+
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/string.h>
+#include <zenhttp/httpclient.h>
+
+using namespace std::literals;
+
+namespace zen {
+
+InfoCommand::InfoCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>");
+}
+
+InfoCommand::~InfoCommand()
+{
+}
+
+int
+InfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ ZEN_UNUSED(GlobalOptions);
+
+ if (!ParseOptions(argc, argv))
+ {
+ return 0;
+ }
+
+ m_HostName = ResolveTargetHostSpec(m_HostName);
+
+ if (m_HostName.empty())
+ {
+ throw OptionParseException("unable to resolve server specification");
+ }
+
+ HttpClient Http(m_HostName);
+
+ if (HttpClient::Response Result = Http.Get("/admin/info", HttpClient::Accept(ZenContentType::kJSON)))
+ {
+ ZEN_CONSOLE("{}", Result.AsText());
+ }
+
+ return 0;
+}
+
+} // namespace zen
diff --git a/src/zen/cmds/info_cmd.h b/src/zen/cmds/info_cmd.h
new file mode 100644
index 000000000..9723a075b
--- /dev/null
+++ b/src/zen/cmds/info_cmd.h
@@ -0,0 +1,24 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "../zen.h"
+
+namespace zen {
+
+class InfoCommand : public ZenCmdBase
+{
+public:
+ InfoCommand();
+ ~InfoCommand();
+
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options& Options() override { return m_Options; }
+ // virtual ZenCmdCategory& CommandCategory() const override { return g_UtilitiesCategory; }
+
+private:
+ cxxopts::Options m_Options{"info", "Show high level zen store information"};
+ std::string m_HostName;
+};
+
+} // namespace zen
diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp
index 409d3393e..d307ef0e8 100644
--- a/src/zen/cmds/rpcreplay_cmd.cpp
+++ b/src/zen/cmds/rpcreplay_cmd.cpp
@@ -201,6 +201,8 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw std::runtime_error(fmt::format("could not find recording at '{}'", m_RecordingPath));
}
+ m_ThreadCount = Max(m_ThreadCount, 1);
+
Stopwatch TotalTimer;
if (m_OnHost)
@@ -282,152 +284,156 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
uint64_t EntryIndex = EntryOffset.fetch_add(m_Stride);
while (EntryIndex < EntryCount)
{
- IoBuffer Payload;
- zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload);
-
- CbPackage RequestPackage;
- CbObject Request;
+ IoBuffer Payload;
+ const zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload);
- switch (RequestInfo.ContentType)
+ if (RequestInfo != zen::cache::RecordedRequestInfo::NullRequest)
{
- case ZenContentType::kCbPackage:
- {
- if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage))
+ CbPackage RequestPackage;
+ CbObject Request;
+
+ switch (RequestInfo.ContentType)
+ {
+ case ZenContentType::kCbPackage:
{
- Request = RequestPackage.GetObject();
+ if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage))
+ {
+ Request = RequestPackage.GetObject();
+ }
}
- }
- break;
- case ZenContentType::kCbObject:
- {
- Request = LoadCompactBinaryObject(Payload);
- }
- break;
- }
+ break;
+ case ZenContentType::kCbObject:
+ {
+ Request = LoadCompactBinaryObject(Payload);
+ }
+ break;
+ }
- RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u));
- int OriginalProcessPid = Request["Pid"sv].AsInt32(0);
+ RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u));
+ int OriginalProcessPid = Request["Pid"sv].AsInt32(0);
- int AdjustedPid = 0;
- RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone;
+ int AdjustedPid = 0;
+ RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone;
- if (!m_DisableLocalRefs)
- {
- if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || m_ForceAllowLocalRefs)
+ if (!m_DisableLocalRefs)
{
- AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences;
- if (!m_DisablePartialLocalRefs)
+ if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) ||
+ m_ForceAllowLocalRefs)
{
- if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) ||
- m_ForceAllowPartialLocalRefs)
+ AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences;
+ if (!m_DisablePartialLocalRefs)
{
- AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences;
+ if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) ||
+ m_ForceAllowPartialLocalRefs)
+ {
+ AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences;
+ }
}
- }
- if (!m_DisableLocalHandleRefs)
- {
- if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef)
+ if (!m_DisableLocalHandleRefs)
{
- AdjustedPid = GetCurrentProcessId();
+ if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef)
+ {
+ AdjustedPid = GetCurrentProcessId();
+ }
}
}
}
- }
- if (m_ShowMethodStats)
- {
- std::string MethodName = std::string(Request["Method"sv].AsString());
- if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end())
- {
- It->second++;
- }
- else
+ if (m_ShowMethodStats)
{
- LocalMethodTypes[MethodName] = 1;
+ std::string MethodName = std::string(Request["Method"sv].AsString());
+ if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end())
+ {
+ It->second++;
+ }
+ else
+ {
+ LocalMethodTypes[MethodName] = 1;
+ }
}
- }
- if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid)
- {
- CbObjectWriter RequestCopyWriter;
- for (const CbFieldView& Field : Request)
+ if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid)
{
- if (!Field.HasName())
+ CbObjectWriter RequestCopyWriter;
+ for (const CbFieldView& Field : Request)
{
- RequestCopyWriter.AddField(Field);
- continue;
+ if (!Field.HasName())
+ {
+ RequestCopyWriter.AddField(Field);
+ continue;
+ }
+ std::string_view FieldName = Field.GetName();
+ if (FieldName == "Pid"sv)
+ {
+ continue;
+ }
+ if (FieldName == "AcceptFlags"sv)
+ {
+ continue;
+ }
+ RequestCopyWriter.AddField(FieldName, Field);
}
- std::string_view FieldName = Field.GetName();
- if (FieldName == "Pid"sv)
+ if (AdjustedPid != 0)
{
- continue;
+ RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid);
}
- if (FieldName == "AcceptFlags"sv)
+ if (AdjustedAcceptOptions != RpcAcceptOptions::kNone)
{
- continue;
+ RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions));
}
- RequestCopyWriter.AddField(FieldName, Field);
- }
- if (AdjustedPid != 0)
- {
- RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid);
- }
- if (AdjustedAcceptOptions != RpcAcceptOptions::kNone)
- {
- RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions));
- }
- if (RequestInfo.ContentType == ZenContentType::kCbPackage)
- {
- RequestPackage.SetObject(RequestCopyWriter.Save());
- std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage);
- std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end());
- Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer();
- }
- else
- {
- RequestCopyWriter.Finalize();
- Payload = IoBuffer(RequestCopyWriter.GetSaveSize());
- RequestCopyWriter.Save(Payload.GetMutableView());
+ if (RequestInfo.ContentType == ZenContentType::kCbPackage)
+ {
+ RequestPackage.SetObject(RequestCopyWriter.Save());
+ std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage);
+ std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end());
+ Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer();
+ }
+ else
+ {
+ RequestCopyWriter.Finalize();
+ Payload = IoBuffer(RequestCopyWriter.GetSaveSize());
+ RequestCopyWriter.Save(Payload.GetMutableView());
+ }
}
- }
-
- if (!m_DryRun)
- {
- StringBuilder<32> SessionIdString;
- if (RequestInfo.SessionId != Oid::Zero)
+ if (!m_DryRun)
{
- RequestInfo.SessionId.ToString(SessionIdString);
- }
- else
- {
- GetSessionId().ToString(SessionIdString);
- }
+ StringBuilder<32> SessionIdString;
- Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestInfo.ContentType))},
- {"Accept", std::string(MapContentTypeToString(RequestInfo.AcceptType))},
- {"UE-Session", std::string(SessionIdString)}});
-
- uint64_t Offset = 0;
- auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, Payload.GetSize() - Offset);
- IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
- MutableMemoryView Data(buffer, size);
- Data.CopyFrom(PayloadRange.GetView());
- Offset += size;
- return true;
- };
- Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
- cpr::Response Response = Session.Post();
- BytesSent.fetch_add(Payload.GetSize());
- if (Response.error || !(IsHttpSuccessCode(Response.status_code) ||
- Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound)))
- {
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- break;
+ if (RequestInfo.SessionId != Oid::Zero)
+ {
+ RequestInfo.SessionId.ToString(SessionIdString);
+ }
+ else
+ {
+ GetSessionId().ToString(SessionIdString);
+ }
+
+ Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestInfo.ContentType))},
+ {"Accept", std::string(MapContentTypeToString(RequestInfo.AcceptType))},
+ {"UE-Session", std::string(SessionIdString)}});
+
+ uint64_t Offset = 0;
+ auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, Payload.GetSize() - Offset);
+ IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
+ MutableMemoryView Data(buffer, size);
+ Data.CopyFrom(PayloadRange.GetView());
+ Offset += size;
+ return true;
+ };
+ Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+ cpr::Response Response = Session.Post();
+ BytesSent.fetch_add(Payload.GetSize());
+ if (Response.error || !(IsHttpSuccessCode(Response.status_code) ||
+ Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound)))
+ {
+ ZEN_CONSOLE("{}", FormatHttpResponse(Response));
+ break;
+ }
+ BytesReceived.fetch_add(Response.downloaded_bytes);
}
- BytesReceived.fetch_add(Response.downloaded_bytes);
}
EntryIndex = EntryOffset.fetch_add(m_Stride);
diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp
index c949008ff..10d2f5593 100644
--- a/src/zen/zen.cpp
+++ b/src/zen/zen.cpp
@@ -10,6 +10,7 @@
#include "cmds/cache_cmd.h"
#include "cmds/copy_cmd.h"
#include "cmds/dedup_cmd.h"
+#include "cmds/info_cmd.h"
#include "cmds/print_cmd.h"
#include "cmds/projectstore_cmd.h"
#include "cmds/rpcreplay_cmd.h"
@@ -273,6 +274,7 @@ main(int argc, char** argv)
GcStatusCommand GcStatusCmd;
GcStopCommand GcStopCmd;
ImportOplogCommand ImportOplogCmd;
+ InfoCommand InfoCmd;
JobCommand JobCmd;
OplogMirrorCommand OplogMirrorCmd;
PrintCommand PrintCmd;
@@ -316,6 +318,7 @@ main(int argc, char** argv)
{"gc-status", &GcStatusCmd, "Garbage collect zen storage status check"},
{"gc-stop", &GcStopCmd, "Request cancel of running garbage collection in zen storage"},
{"gc", &GcCmd, "Garbage collect zen storage"},
+ {"info", &InfoCmd, "Show high level Zen server information"},
{"jobs", &JobCmd, "Show/cancel zen background jobs"},
{"logs", &LoggingCmd, "Show/control zen logging"},
{"oplog-create", &CreateOplogCmd, "Create a project oplog"},
diff --git a/src/zenbase/include/zenbase/refcount.h b/src/zenbase/include/zenbase/refcount.h
index 3afcf467c..6ad49cba2 100644
--- a/src/zenbase/include/zenbase/refcount.h
+++ b/src/zenbase/include/zenbase/refcount.h
@@ -107,6 +107,8 @@ public:
Rhs.m_Ref = nullptr;
}
+ inline void Swap(RefPtr& Rhs) noexcept { std::swap(m_Ref, Rhs.m_Ref); }
+
private:
T* m_Ref = nullptr;
template<typename U>
diff --git a/src/zencore/compactbinary.cpp b/src/zencore/compactbinary.cpp
index 9152a8bfc..6677b5a61 100644
--- a/src/zencore/compactbinary.cpp
+++ b/src/zencore/compactbinary.cpp
@@ -2463,6 +2463,58 @@ TEST_CASE("json.uson")
}
}
+//////////////////////////////////////////////////////////////////////////
+
+TEST_SUITE_BEGIN("core.datetime");
+
+TEST_CASE("core.datetime.compare")
+{
+ DateTime T1(2000, 12, 13);
+ DateTime T2(2000, 12, 14);
+ CHECK(T1 < T2);
+ CHECK(T2 > T1);
+ CHECK(T1 == T1);
+ CHECK(T1 != T2);
+ CHECK(T1 >= T1);
+ CHECK(T2 >= T1);
+ CHECK(T1 <= T1);
+ CHECK(T1 <= T2);
+}
+
+TEST_CASE("core.datetime.add")
+{
+ DateTime T1(2000, 12, 13);
+ DateTime T2(2000, 12, 14);
+ TimeSpan dT = T2 - T1;
+ TimeSpan dT1 = T1 - T1;
+
+ CHECK(T1 + dT == T2);
+ CHECK(dT + T1 == T2);
+ CHECK(dT + T1 - T2 == dT1);
+}
+
+TEST_SUITE_END();
+
+TEST_SUITE_BEGIN("core.timespan");
+
+TEST_CASE("core.timespan.compare")
+{
+ TimeSpan T1(1000);
+ TimeSpan T2(1001);
+ CHECK(T1 < T2);
+ CHECK(T2 > T1);
+ CHECK(T1 == T1);
+ CHECK(T1 != T2);
+ CHECK(T1 >= T1);
+ CHECK(T2 >= T1);
+ CHECK(T1 <= T1);
+ CHECK(T1 <= T2);
+}
+
+TEST_SUITE_END();
+
+//////////////////////////////////////////////////////////////////////////
+
#endif
} // namespace zen
diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp
index 2362d8e78..c41bdac42 100644
--- a/src/zencore/compress.cpp
+++ b/src/zencore/compress.cpp
@@ -1268,7 +1268,7 @@ CompressedBuffer::FromCompressed(SharedBuffer&& InCompressedData, IoHash& OutRaw
CompressedBuffer
CompressedBuffer::FromCompressedNoValidate(IoBuffer&& InCompressedData)
{
- if (InCompressedData.GetSize() <= sizeof(detail::BufferHeader))
+ if (InCompressedData.GetSize() < sizeof(detail::BufferHeader))
{
return CompressedBuffer();
}
@@ -1280,7 +1280,7 @@ CompressedBuffer::FromCompressedNoValidate(IoBuffer&& InCompressedData)
CompressedBuffer
CompressedBuffer::FromCompressedNoValidate(CompositeBuffer&& InCompressedData)
{
- if (InCompressedData.GetSize() <= sizeof(detail::BufferHeader))
+ if (InCompressedData.GetSize() < sizeof(detail::BufferHeader))
{
return CompressedBuffer();
}
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index a0ff3793e..36195f7c7 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -1287,12 +1287,12 @@ PathFromHandle(void* NativeHandle, std::error_code& Ec)
{
if (NativeHandle == nullptr)
{
- return std::filesystem::path();
+ return "<error handle 'nullptr'>";
}
#if ZEN_PLATFORM_WINDOWS
if (NativeHandle == INVALID_HANDLE_VALUE)
{
- return std::filesystem::path();
+ return "<error handle 'invalid handle'>";
}
auto GetFinalPathNameByHandleWRetry =
@@ -1329,7 +1329,7 @@ PathFromHandle(void* NativeHandle, std::error_code& Ec)
if (Error != ERROR_SUCCESS)
{
Ec = MakeErrorCodeFromLastError();
- return std::filesystem::path();
+ return fmt::format("<error handle '{}'>", Ec.message());
}
if (RequiredLengthIncludingNul < PathDataSize)
@@ -1346,7 +1346,7 @@ PathFromHandle(void* NativeHandle, std::error_code& Ec)
if (Error != ERROR_SUCCESS)
{
Ec = MakeErrorCodeFromLastError();
- return std::filesystem::path();
+ return fmt::format("<error handle '{}'>", Ec.message());
}
ZEN_UNUSED(FinalLength);
return FullPath;
@@ -1360,7 +1360,7 @@ PathFromHandle(void* NativeHandle, std::error_code& Ec)
if (BytesRead <= 0)
{
Ec = MakeErrorCodeFromLastError();
- return {};
+ return fmt::format("<error handle '{}'>", Ec.message());
}
Link[BytesRead] = '\0';
@@ -1371,7 +1371,7 @@ PathFromHandle(void* NativeHandle, std::error_code& Ec)
if (fcntl(Fd, F_GETPATH, Path) < 0)
{
Ec = MakeErrorCodeFromLastError();
- return {};
+ return fmt::format("<error handle '{}'>", Ec.message());
}
return Path;
diff --git a/src/zencore/include/zencore/compactbinary.h b/src/zencore/include/zencore/compactbinary.h
index cb032e34a..675e2a8d4 100644
--- a/src/zencore/include/zencore/compactbinary.h
+++ b/src/zencore/include/zencore/compactbinary.h
@@ -26,12 +26,13 @@
namespace zen {
-class CbObjectView;
class CbArrayView;
+class CbObjectView;
+class CbValue;
+class CompressedBuffer;
class BinaryReader;
class BinaryWriter;
-class CompressedBuffer;
-class CbValue;
+class TimeSpan;
class DateTime
{
@@ -58,7 +59,11 @@ public:
void GetDate(int& Year, int& Month, int& Day) const;
inline bool operator==(const DateTime& Rhs) const { return Ticks == Rhs.Ticks; }
- inline auto operator<=>(const DateTime& Rhs) const { return Ticks - Rhs.Ticks; }
+ inline auto operator<=>(const DateTime& Rhs) const = default;
+
+ friend inline TimeSpan operator-(const DateTime& Lhs, const DateTime& Rhs);
+ friend inline DateTime operator+(const DateTime& Lhs, const TimeSpan& Rhs);
+ friend inline DateTime operator+(const TimeSpan& Lhs, const DateTime& Rhs);
std::string ToString(const char* Format) const;
std::string ToIso8601() const;
@@ -78,7 +83,7 @@ public:
inline uint64_t GetTicks() const { return Ticks; }
inline bool operator==(const TimeSpan& Rhs) const { return Ticks == Rhs.Ticks; }
- inline auto operator<=>(const TimeSpan& Rhs) const { return Ticks - Rhs.Ticks; }
+ inline auto operator<=>(const TimeSpan& Rhs) const = default;
/**
* Time span related constants.
@@ -136,12 +141,33 @@ public:
ZENCORE_API std::string ToString(const char* Format) const;
ZENCORE_API std::string ToString() const;
+ friend inline DateTime operator+(const DateTime& Lhs, const TimeSpan& Rhs);
+ friend inline DateTime operator+(const TimeSpan& Lhs, const DateTime& Rhs);
+
private:
void Set(int Days, int Hours, int Minutes, int Seconds, int FractionNano);
uint64_t Ticks;
};
+inline TimeSpan
+operator-(const DateTime& Lhs, const DateTime& Rhs)
+{
+ return TimeSpan(Lhs.Ticks - Rhs.Ticks);
+}
+
+inline DateTime
+operator+(const DateTime& Lhs, const TimeSpan& Rhs)
+{
+ return DateTime(Lhs.Ticks + Rhs.Ticks);
+}
+
+inline DateTime
+operator+(const TimeSpan& Lhs, const DateTime& Rhs)
+{
+ return DateTime(Lhs.Ticks + Rhs.Ticks);
+}
+
//////////////////////////////////////////////////////////////////////////
/**
diff --git a/src/zencore/include/zencore/compactbinarybuilder.h b/src/zencore/include/zencore/compactbinarybuilder.h
index dcb767d96..9c81cf490 100644
--- a/src/zencore/include/zencore/compactbinarybuilder.h
+++ b/src/zencore/include/zencore/compactbinarybuilder.h
@@ -10,7 +10,6 @@
#include <zencore/enumflags.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
-#include <zencore/sha1.h>
#include <atomic>
#include <memory>
diff --git a/src/zencore/include/zencore/compactbinaryvalidation.h b/src/zencore/include/zencore/compactbinaryvalidation.h
index b23c6d51d..ddecc8a38 100644
--- a/src/zencore/include/zencore/compactbinaryvalidation.h
+++ b/src/zencore/include/zencore/compactbinaryvalidation.h
@@ -9,7 +9,6 @@
#include <zencore/enumflags.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
-#include <zencore/sha1.h>
#include <gsl/gsl-lite.hpp>
diff --git a/src/zencore/include/zencore/iobuffer.h b/src/zencore/include/zencore/iobuffer.h
index d891ed55b..b9e503354 100644
--- a/src/zencore/include/zencore/iobuffer.h
+++ b/src/zencore/include/zencore/iobuffer.h
@@ -337,11 +337,20 @@ public:
BorrowedFile
};
- inline IoBuffer() = default;
- inline IoBuffer(IoBuffer&& Rhs) noexcept = default;
- inline IoBuffer(const IoBuffer& Rhs) = default;
+ inline IoBuffer() = default;
+ inline IoBuffer(IoBuffer&& Rhs) noexcept
+ {
+ m_Core.Swap(Rhs.m_Core);
+ Rhs.m_Core = NullBufferCore;
+ }
+ inline IoBuffer(const IoBuffer& Rhs) = default;
inline IoBuffer& operator=(const IoBuffer& Rhs) = default;
- inline IoBuffer& operator=(IoBuffer&& Rhs) noexcept = default;
+ inline IoBuffer& operator =(IoBuffer&& Rhs) noexcept
+ {
+ m_Core.Swap(Rhs.m_Core);
+ Rhs.m_Core = NullBufferCore;
+ return *this;
+ }
/** Create an uninitialized buffer of the given size
*/
diff --git a/src/zencore/include/zencore/string.h b/src/zencore/include/zencore/string.h
index 3aec1647d..b0232d883 100644
--- a/src/zencore/include/zencore/string.h
+++ b/src/zencore/include/zencore/string.h
@@ -638,7 +638,12 @@ ToHexNumber(UnsignedIntegral auto Value, char* OutString)
bool
ParseHexNumber(const std::string_view HexString, UnsignedIntegral auto& OutValue)
{
- return ParseHexNumber(HexString.data(), sizeof(OutValue) * 2, (uint8_t*)&OutValue);
+ size_t ExpectedCharacterCount = sizeof(OutValue) * 2;
+ if (HexString.size() != ExpectedCharacterCount)
+ {
+ return false;
+ }
+ return ParseHexNumber(HexString.data(), ExpectedCharacterCount, (uint8_t*)&OutValue);
}
//////////////////////////////////////////////////////////////////////////
diff --git a/src/zencore/include/zencore/trace.h b/src/zencore/include/zencore/trace.h
index 2d4c1e610..89e4b76bf 100644
--- a/src/zencore/include/zencore/trace.h
+++ b/src/zencore/include/zencore/trace.h
@@ -35,6 +35,7 @@ bool TraceStop();
#else
#define ZEN_TRACE_CPU(x)
+#define ZEN_TRACE_CPU_FLUSH(x)
#endif // ZEN_WITH_TRACE
diff --git a/src/zencore/iobuffer.cpp b/src/zencore/iobuffer.cpp
index 912f9ce4e..80d0f4ee4 100644
--- a/src/zencore/iobuffer.cpp
+++ b/src/zencore/iobuffer.cpp
@@ -209,6 +209,8 @@ IoBufferExtendedCore::~IoBufferExtendedCore()
uint64_t MapSize = ~uint64_t(uintptr_t(m_MmapHandle));
munmap(m_MappedPointer, MapSize);
#endif
+
+ m_DataPtr = nullptr; // prevent any buffer deallocation attempts
}
const uint32_t LocalFlags = m_Flags.load(std::memory_order_relaxed);
@@ -244,8 +246,6 @@ IoBufferExtendedCore::~IoBufferExtendedCore()
ZEN_WARN("Error reported on file handle close, reason '{}'", GetLastErrorAsString());
}
}
-
- m_DataPtr = nullptr;
}
static constexpr size_t MappingLockCount = 128;
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp
index 1755b9fe9..4bcc5c885 100644
--- a/src/zencore/jobqueue.cpp
+++ b/src/zencore/jobqueue.cpp
@@ -422,8 +422,10 @@ TEST_CASE("JobQueue")
{
JobsLatch.AddCount(1);
Pool.ScheduleWork([&Queue, &JobsLatch, I]() {
- auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
- auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&](JobContext& Context) {
+ auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
+ JobsLatch.AddCount(1);
+ auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&JobsLatch, I](JobContext& Context) {
+ auto $ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
if (Context.IsCancelled())
{
return;
@@ -523,7 +525,6 @@ TEST_CASE("JobQueue")
}
JobsLatch.Wait();
}
-
#endif
} // namespace zen
diff --git a/src/zencore/memory.cpp b/src/zencore/memory.cpp
index 546296b10..808c9fcb6 100644
--- a/src/zencore/memory.cpp
+++ b/src/zencore/memory.cpp
@@ -7,13 +7,12 @@
#include <zencore/testing.h>
#include <zencore/zencore.h>
-#if ZEN_PLATFORM_WINDOWS
-# include <malloc.h>
+#include <cstdlib>
+
+#if ZEN_USE_MIMALLOC
ZEN_THIRD_PARTY_INCLUDES_START
# include <mimalloc.h>
ZEN_THIRD_PARTY_INCLUDES_END
-#else
-# include <cstdlib>
#endif
namespace zen {
@@ -23,16 +22,15 @@ namespace zen {
static void*
AlignedAllocImpl(size_t Size, size_t Alignment)
{
-#if ZEN_PLATFORM_WINDOWS
-# if ZEN_USE_MIMALLOC && 0 /* this path is not functional */
- return mi_aligned_alloc(Alignment, Size);
-# else
- return _aligned_malloc(Size, Alignment);
-# endif
-#else
// aligned_alloc() states that size must be a multiple of alignment. Some
// platforms return null if this requirement isn't met.
Size = (Size + Alignment - 1) & ~(Alignment - 1);
+
+#if ZEN_USE_MIMALLOC
+ return mi_aligned_alloc(Alignment, Size);
+#elif ZEN_PLATFORM_WINDOWS
+ return _aligned_malloc(Size, Alignment);
+#else
return std::aligned_alloc(Alignment, Size);
#endif
}
@@ -43,12 +41,10 @@ AlignedFreeImpl(void* ptr)
if (ptr == nullptr)
return;
-#if ZEN_PLATFORM_WINDOWS
-# if ZEN_USE_MIMALLOC && 0 /* this path is not functional */
+#if ZEN_USE_MIMALLOC
return mi_free(ptr);
-# else
+#elif ZEN_PLATFORM_WINDOWS
_aligned_free(ptr);
-# endif
#else
std::free(ptr);
#endif
diff --git a/src/zencore/thread.cpp b/src/zencore/thread.cpp
index 149a0d781..cb3aced33 100644
--- a/src/zencore/thread.cpp
+++ b/src/zencore/thread.cpp
@@ -156,6 +156,7 @@ Event::Event()
auto* Inner = new EventInner();
Inner->bSet = bInitialState;
m_EventHandle = Inner;
+ std::atomic_thread_fence(std::memory_order_release);
#endif
}
@@ -170,12 +171,13 @@ Event::Set()
#if ZEN_USE_WINDOWS_EVENTS
SetEvent(m_EventHandle);
#else
- auto* Inner = (EventInner*)m_EventHandle;
+ std::atomic_thread_fence(std::memory_order_acquire);
+ auto* Inner = (EventInner*)m_EventHandle;
{
std::unique_lock Lock(Inner->Mutex);
Inner->bSet.store(true);
+ Inner->CondVar.notify_all();
}
- Inner->CondVar.notify_all();
#endif
}
@@ -185,6 +187,7 @@ Event::Reset()
#if ZEN_USE_WINDOWS_EVENTS
ResetEvent(m_EventHandle);
#else
+ std::atomic_thread_fence(std::memory_order_acquire);
auto* Inner = (EventInner*)m_EventHandle;
{
std::unique_lock Lock(Inner->Mutex);
@@ -198,15 +201,18 @@ Event::Close()
{
#if ZEN_USE_WINDOWS_EVENTS
CloseHandle(m_EventHandle);
+ m_EventHandle = nullptr;
#else
+ std::atomic_thread_fence(std::memory_order_acquire);
auto* Inner = (EventInner*)m_EventHandle;
{
std::unique_lock Lock(Inner->Mutex);
Inner->bSet.store(true);
}
+ m_EventHandle = nullptr;
+ std::atomic_thread_fence(std::memory_order_release);
delete Inner;
#endif
- m_EventHandle = nullptr;
}
bool
@@ -226,6 +232,7 @@ Event::Wait(int TimeoutMs)
return (Result == WAIT_OBJECT_0);
#else
+ std::atomic_thread_fence(std::memory_order_acquire);
auto* Inner = reinterpret_cast<EventInner*>(m_EventHandle);
if (Inner->bSet.load())
diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp
index 6ff6463dd..16b2310ff 100644
--- a/src/zencore/workthreadpool.cpp
+++ b/src/zencore/workthreadpool.cpp
@@ -132,7 +132,9 @@ struct WorkerThreadPool::Impl
Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName)
{
+# if ZEN_WITH_TRACE
trace::ThreadGroupBegin(m_WorkerThreadBaseName.c_str());
+# endif
zen::Latch WorkerLatch{InThreadCount};
@@ -143,7 +145,9 @@ struct WorkerThreadPool::Impl
WorkerLatch.Wait();
+# if ZEN_WITH_TRACE
trace::ThreadGroupEnd();
+# endif
}
~Impl()
diff --git a/src/zencore/zencore.cpp b/src/zencore/zencore.cpp
index 3b938a6ef..8dd687fbd 100644
--- a/src/zencore/zencore.cpp
+++ b/src/zencore/zencore.cpp
@@ -36,6 +36,8 @@
#include <fmt/format.h>
+#include <atomic>
+
namespace zen::assert {
void
@@ -103,8 +105,8 @@ IsInteractiveSession()
//////////////////////////////////////////////////////////////////////////
-static int s_ApplicationExitCode = 0;
-static bool s_ApplicationExitRequested;
+static std::atomic_int s_ApplicationExitCode{0};
+static std::atomic_bool s_ApplicationExitRequested{false};
bool
IsApplicationExitRequested()
diff --git a/src/zenhttp/httpshared.cpp b/src/zenhttp/httpshared.cpp
index 5421fcba5..ca014bf1c 100644
--- a/src/zenhttp/httpshared.cpp
+++ b/src/zenhttp/httpshared.cpp
@@ -19,6 +19,10 @@
#include <span>
#include <vector>
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+#endif
+
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
ZEN_THIRD_PARTY_INCLUDES_END
diff --git a/src/zenhttp/servers/httpparser.h b/src/zenhttp/servers/httpparser.h
index 219ac351d..bdbcab4d9 100644
--- a/src/zenhttp/servers/httpparser.h
+++ b/src/zenhttp/servers/httpparser.h
@@ -9,6 +9,8 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <http_parser.h>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <atomic>
+
namespace zen {
class HttpRequestParserCallbacks
@@ -85,7 +87,7 @@ private:
int8_t m_ContentTypeHeaderIndex;
int8_t m_RangeHeaderIndex;
HttpVerb m_RequestVerb;
- bool m_KeepAlive = false;
+ std::atomic_bool m_KeepAlive{false};
bool m_Expect100Continue = false;
int m_RequestId = -1;
Oid m_SessionId{};
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index c2df847ad..cc1ffdcdc 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -3,6 +3,7 @@
#include "admin.h"
#include <zencore/compactbinarybuilder.h>
+#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/jobqueue.h>
#include <zencore/logging.h>
@@ -20,24 +21,86 @@
#include <zenstore/gc.h>
#include "cache/structuredcachestore.h"
+#include "config.h"
#include "projectstore/projectstore.h"
#include <chrono>
namespace zen {
-HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
- JobQueue& BackgroundJobQueue,
- ZenCacheStore* CacheStore,
- CidStore* CidStore,
- ProjectStore* ProjectStore,
- const LogPaths& LogPaths)
+struct DirStats
+{
+ uint64_t FileCount = 0;
+ uint64_t DirCount = 0;
+ uint64_t ByteCount = 0;
+};
+
+DirStats
+GetStatsForDirectory(std::filesystem::path Dir)
+{
+ if (!std::filesystem::exists(Dir))
+ return {};
+
+ FileSystemTraversal Traversal;
+
+ struct StatsTraversal : public FileSystemTraversal::TreeVisitor
+ {
+ virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override
+ {
+ ZEN_UNUSED(Parent, File);
+ ++TotalFileCount;
+ TotalBytes += FileSize;
+ }
+ virtual bool VisitDirectory(const std::filesystem::path&, const path_view&) override
+ {
+ ++TotalDirCount;
+ return true;
+ }
+
+ uint64_t TotalBytes = 0;
+ uint64_t TotalFileCount = 0;
+ uint64_t TotalDirCount = 0;
+
+ DirStats GetStats() { return {.FileCount = TotalFileCount, .DirCount = TotalDirCount, .ByteCount = TotalBytes}; }
+ };
+
+ StatsTraversal DirTraverser;
+ Traversal.TraverseFileSystem(Dir, DirTraverser);
+
+ return DirTraverser.GetStats();
+}
+
+struct StateDiskStats
+{
+ DirStats CacheStats;
+ DirStats CasStats;
+ DirStats ProjectStats;
+};
+
+StateDiskStats
+GetStatsForStateDirectory(std::filesystem::path StateDir)
+{
+ StateDiskStats Stats;
+ Stats.CacheStats = GetStatsForDirectory(StateDir / "cache");
+ Stats.CasStats = GetStatsForDirectory(StateDir / "cas");
+ Stats.ProjectStats = GetStatsForDirectory(StateDir / "projects");
+ return Stats;
+}
+
+HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
+ JobQueue& BackgroundJobQueue,
+ ZenCacheStore* CacheStore,
+ CidStore* CidStore,
+ ProjectStore* ProjectStore,
+ const LogPaths& LogPaths,
+ const ZenServerOptions& ServerOptions)
: m_GcScheduler(Scheduler)
, m_BackgroundJobQueue(BackgroundJobQueue)
, m_CacheStore(CacheStore)
, m_CidStore(CidStore)
, m_ProjectStore(ProjectStore)
, m_LogPaths(LogPaths)
+, m_ServerOptions(ServerOptions)
{
using namespace std::literals;
@@ -509,6 +572,60 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
#endif // ZEN_WITH_TRACE
m_Router.RegisterRoute(
+ "info",
+ [this](HttpRouterRequest& Req) {
+ CbObjectWriter Obj;
+
+ Obj << "root" << m_ServerOptions.SystemRootDir.generic_wstring();
+ Obj << "install" << (m_ServerOptions.SystemRootDir / "Install").generic_wstring();
+
+ Obj.BeginObject("primary");
+ Obj << "data" << m_ServerOptions.DataDir.generic_wstring();
+
+ try
+ {
+ auto Stats = GetStatsForStateDirectory(m_ServerOptions.DataDir);
+
+ auto EmitStats = [&](std::string_view Tag, const DirStats& Stats) {
+ Obj.BeginObject(Tag);
+ Obj << "bytes" << Stats.ByteCount;
+ Obj << "files" << Stats.FileCount;
+ Obj << "dirs" << Stats.DirCount;
+ Obj.EndObject();
+ };
+
+ EmitStats("cache", Stats.CacheStats);
+ EmitStats("cas", Stats.CasStats);
+ EmitStats("project", Stats.ProjectStats);
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("exception in disk stats gathering for '{}': {}", m_ServerOptions.DataDir, Ex.what());
+ }
+ Obj.EndObject();
+
+ try
+ {
+ std::vector<CbObject> Manifests = ReadAllCentralManifests(m_ServerOptions.SystemRootDir);
+
+ Obj.BeginArray("known");
+
+ for (const auto& Manifest : Manifests)
+ {
+ Obj.AddObject(Manifest);
+ }
+
+ Obj.EndArray();
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("exception in state gathering for '{}': {}", m_ServerOptions.SystemRootDir, Ex.what());
+ }
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
"logs",
[this](HttpRouterRequest& Req) {
CbObjectWriter Obj;
diff --git a/src/zenserver/admin/admin.h b/src/zenserver/admin/admin.h
index 9d8bdfe50..563c4f536 100644
--- a/src/zenserver/admin/admin.h
+++ b/src/zenserver/admin/admin.h
@@ -12,6 +12,7 @@ class JobQueue;
class ZenCacheStore;
class CidStore;
class ProjectStore;
+struct ZenServerOptions;
class HttpAdminService : public zen::HttpService
{
@@ -22,25 +23,27 @@ public:
std::filesystem::path HttpLogPath;
std::filesystem::path CacheLogPath;
};
- HttpAdminService(GcScheduler& Scheduler,
- JobQueue& BackgroundJobQueue,
- ZenCacheStore* CacheStore,
- CidStore* CidStore,
- ProjectStore* ProjectStore,
- const LogPaths& LogPaths);
+ HttpAdminService(GcScheduler& Scheduler,
+ JobQueue& BackgroundJobQueue,
+ ZenCacheStore* CacheStore,
+ CidStore* CidStore,
+ ProjectStore* ProjectStore,
+ const LogPaths& LogPaths,
+ const ZenServerOptions& ServerOptions);
~HttpAdminService();
virtual const char* BaseUri() const override;
virtual void HandleRequest(zen::HttpServerRequest& Request) override;
private:
- HttpRequestRouter m_Router;
- GcScheduler& m_GcScheduler;
- JobQueue& m_BackgroundJobQueue;
- ZenCacheStore* m_CacheStore;
- CidStore* m_CidStore;
- ProjectStore* m_ProjectStore;
- LogPaths m_LogPaths;
+ HttpRequestRouter m_Router;
+ GcScheduler& m_GcScheduler;
+ JobQueue& m_BackgroundJobQueue;
+ ZenCacheStore* m_CacheStore;
+ CidStore* m_CidStore;
+ ProjectStore* m_ProjectStore;
+ LogPaths m_LogPaths;
+ const ZenServerOptions& m_ServerOptions;
};
} // namespace zen
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index 13f3c9e58..8d046105d 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -209,9 +209,6 @@ namespace {
zen::Sleep(100);
} while (true);
}
-
- uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize) { return 8u + 32u + RoundUp(PayloadSize, 8u); }
-
} // namespace
namespace fs = std::filesystem;
@@ -507,6 +504,8 @@ BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& B
std::vector<AccessTime>& AccessTimes,
std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads)
{
+ ZEN_TRACE_CPU("Z$::ReadSidecarFile");
+
ZEN_ASSERT(AccessTimes.size() == Payloads.size());
std::error_code Ec;
@@ -593,6 +592,8 @@ BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&,
const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas)
{
+ ZEN_TRACE_CPU("Z$::WriteSidecarFile");
+
BucketMetaHeader Header;
Header.EntryCount = m_ManifestEntryCount;
Header.LogPosition = SnapshotLogPosition;
@@ -701,7 +702,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
{
using namespace std::literals;
- ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate");
+ ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate");
ZEN_ASSERT(m_IsFlushing.load());
// We want to take the lock here since we register as a GC referencer a construction
@@ -768,7 +769,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
void
ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::WriteIndexSnapshot");
+ ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot");
const uint64_t LogCount = m_SlogFile.GetLogCount();
if (m_LogFlushPosition == LogCount)
@@ -878,7 +879,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uin
uint64_t
ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadIndexFile");
+ ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile");
if (!std::filesystem::is_regular_file(IndexPath))
{
@@ -967,7 +968,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const
uint64_t
ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadLog");
+ ZEN_TRACE_CPU("Z$::Bucket::ReadLog");
if (!std::filesystem::is_regular_file(LogPath))
{
@@ -1037,7 +1038,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::
void
ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockScope& IndexLock, const bool IsNew)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog");
+ ZEN_TRACE_CPU("Z$::Bucket::Initialize");
m_StandaloneSize = 0;
@@ -1139,7 +1140,7 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& H
IoBuffer
ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::GetInlineCacheValue");
+ ZEN_TRACE_CPU("Z$::Bucket::GetInlineCacheValue");
BlockStoreLocation Location = Loc.GetBlockLocation(m_Configuration.PayloadAlignment);
@@ -1155,7 +1156,7 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) con
IoBuffer
ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::GetStandaloneCacheValue");
+ ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue");
ExtendablePathBuilder<256> DataFilePath;
BuildPath(DataFilePath, HashKey);
@@ -1175,6 +1176,8 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentTy
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
+ ZEN_TRACE_CPU("Z$::Bucket::Get");
+
metrics::RequestStats::Scope StatsScope(m_GetOps, 0);
RwLock::SharedLockScope IndexLock(m_IndexLock);
@@ -1189,7 +1192,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
return false;
}
- size_t EntryIndex = It.value();
+ PayloadIndex EntryIndex = It.value();
m_AccessTimes[EntryIndex] = GcClock::TickCount();
DiskLocation Location = m_Payloads[EntryIndex].Location;
@@ -1206,7 +1209,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
if (Payload->MemCached)
{
- OutValue.Value = m_MemCachedPayloads[Payload->MemCached];
+ OutValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload;
Payload = nullptr;
IndexLock.ReleaseNow();
m_MemoryHitCount++;
@@ -1231,7 +1234,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
size_t ValueSize = OutValue.Value.GetSize();
if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MemCache");
+ ZEN_TRACE_CPU("Z$::Bucket::Get::MemCache");
OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value);
RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end())
@@ -1240,7 +1243,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
// Only update if it has not already been updated by other thread
if (!WritePayload.MemCached)
{
- SetMemCachedData(UpdateIndexLock, WritePayload, OutValue.Value);
+ SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value);
}
}
}
@@ -1250,7 +1253,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
if (FillRawHashAndRawSize)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MetaData");
+ ZEN_TRACE_CPU("Z$::Bucket::Get::MetaData");
if (Location.IsFlagSet(DiskLocation::kCompressed))
{
if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
@@ -1293,6 +1296,8 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
void
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
+ ZEN_TRACE_CPU("Z$::Bucket::Put");
+
metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
@@ -1307,71 +1312,91 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
m_DiskWriteCount++;
}
-void
+uint64_t
ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime)
{
+ ZEN_TRACE_CPU("Z$::Bucket::MemCacheTrim");
+
+ uint64_t Trimmed = 0;
GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- if (m_MemCachedPayloads.empty())
+ uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size());
+ if (MemCachedCount == 0)
{
- return;
+ return 0;
}
- for (const auto& Kv : m_Index)
+
+ uint32_t WriteIndex = 0;
+ for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex)
{
- size_t Index = Kv.second;
- BucketPayload& Payload = m_Payloads[Index];
- if (!Payload.MemCached)
+ MemCacheData& Data = m_MemCachedPayloads[ReadIndex];
+ if (!Data.Payload)
+ {
+ continue;
+ }
+ PayloadIndex Index = Data.OwnerIndex;
+ ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex));
+ GcClock::Tick AccessTime = m_AccessTimes[Index];
+ if (AccessTime < ExpireTicks)
{
+ size_t PayloadSize = Data.Payload.GetSize();
+ RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
+ Data = {};
+ m_Payloads[Index].MemCached = {};
+ Trimmed += PayloadSize;
continue;
}
- if (m_AccessTimes[Index] < ExpireTicks)
+ if (ReadIndex > WriteIndex)
{
- RemoveMemCachedData(IndexLock, Payload);
+ m_MemCachedPayloads[WriteIndex] = MemCacheData{.Payload = std::move(Data.Payload), .OwnerIndex = Index};
+ m_Payloads[Index].MemCached = MemCachedIndex(WriteIndex);
}
+ WriteIndex++;
}
+ m_MemCachedPayloads.resize(WriteIndex);
m_MemCachedPayloads.shrink_to_fit();
- m_FreeMemCachedPayloads.shrink_to_fit();
- m_FreeMetaDatas.shrink_to_fit();
+ zen::Reset(m_FreeMemCachedPayloads);
+ return Trimmed;
}
void
-ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart,
- GcClock::Duration SectionLength,
- std::vector<uint64_t>& InOutUsageSlots)
+ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots)
{
+ ZEN_TRACE_CPU("Z$::Bucket::GetUsageByAccess");
+
+ size_t SlotCount = InOutUsageSlots.capacity();
RwLock::SharedLockScope _(m_IndexLock);
- if (m_MemCachedPayloads.empty())
+ uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size());
+ if (MemCachedCount == 0)
{
return;
}
- for (const auto& It : m_Index)
+ for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex)
{
- size_t Index = It.second;
- BucketPayload& Payload = m_Payloads[Index];
- if (!Payload.MemCached)
+ MemCacheData& Data = m_MemCachedPayloads[ReadIndex];
+ if (!Data.Payload)
{
continue;
}
+ PayloadIndex Index = Data.OwnerIndex;
+ ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex));
GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index]));
- GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch();
- uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0);
- if (Slot >= InOutUsageSlots.capacity())
- {
- Slot = InOutUsageSlots.capacity() - 1;
- }
- if (Slot > InOutUsageSlots.size())
+ GcClock::Duration Age = Now > ItemAccessTime ? Now - ItemAccessTime : GcClock::Duration(0);
+ size_t Slot = Age < MaxAge ? gsl::narrow<size_t>((Age.count() * SlotCount) / MaxAge.count()) : (SlotCount - 1);
+ ZEN_ASSERT_SLOW(Slot < SlotCount);
+ if (Slot >= InOutUsageSlots.size())
{
- InOutUsageSlots.resize(uint64_t(Slot + 1), 0);
+ InOutUsageSlots.resize(Slot + 1, 0);
}
- InOutUsageSlots[Slot] += m_MemCachedPayloads[Payload.MemCached].GetSize();
+ InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(Data.Payload.GetSize());
}
}
bool
ZenCacheDiskLayer::CacheBucket::Drop()
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::Drop");
+ ZEN_TRACE_CPU("Z$::Bucket::Drop");
RwLock::ExclusiveLockScope _(m_IndexLock);
@@ -1407,7 +1432,7 @@ ZenCacheDiskLayer::CacheBucket::Drop()
void
ZenCacheDiskLayer::CacheBucket::Flush()
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::Flush");
+ ZEN_TRACE_CPU("Z$::Bucket::Flush");
bool Expected = false;
if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
{
@@ -1433,6 +1458,7 @@ ZenCacheDiskLayer::CacheBucket::Flush()
void
ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
+ ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot");
try
{
bool UseLegacyScheme = false;
@@ -1607,7 +1633,7 @@ ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer)
void
ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::Scrub");
+ ZEN_TRACE_CPU("Z$::Bucket::Scrub");
ZEN_INFO("scrubbing '{}'", m_BucketDir);
@@ -1823,7 +1849,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
std::vector<BucketMetaData> MetaDatas;
- std::vector<IoBuffer> MemCachedPayloads;
+ std::vector<MemCacheData> MemCachedPayloads;
std::vector<ReferenceIndex> FirstReferenceIndex;
IndexMap Index;
@@ -1847,7 +1873,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
void
ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::GatherReferences");
+ ZEN_TRACE_CPU("Z$::Bucket::GatherReferences");
#define CALCULATE_BLOCKING_TIME 0
@@ -1999,10 +2025,10 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
#endif // CALCULATE_BLOCKING_TIME
if (auto It = m_Index.find(Key); It != m_Index.end())
{
- const BucketPayload& CachedPayload = Payloads[It->second];
+ const BucketPayload& CachedPayload = m_Payloads[It->second];
if (CachedPayload.MemCached)
{
- Buffer = m_MemCachedPayloads[CachedPayload.MemCached];
+ Buffer = m_MemCachedPayloads[CachedPayload.MemCached].Payload;
ZEN_ASSERT_SLOW(Buffer);
}
else
@@ -2065,7 +2091,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
void
ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage");
+ ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage");
ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir);
@@ -2124,7 +2150,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
std::vector<BucketMetaData> MetaDatas;
- std::vector<IoBuffer> MemCachedPayloads;
+ std::vector<MemCacheData> MemCachedPayloads;
std::vector<ReferenceIndex> FirstReferenceIndex;
IndexMap Index;
{
@@ -2165,7 +2191,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State");
+ ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::State");
RwLock::SharedLockScope IndexLock(m_IndexLock);
Stopwatch Timer;
@@ -2213,7 +2239,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
if (GcCtx.IsDeletionMode())
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::Delete");
+ ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::Delete");
ExtendablePathBuilder<256> Path;
@@ -2281,7 +2307,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment);
size_t ChunkIndex = ChunkLocations.size();
ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash[ChunkIndex] = Key;
+ ChunkIndexToChunkHash.push_back(Key);
if (ExpiredCacheKeys.contains(Key))
{
continue;
@@ -2453,7 +2479,7 @@ ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents(
void
ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
{
- ZEN_TRACE_CPU("Z$::Disk::CollectGarbage");
+ ZEN_TRACE_CPU("Z$::CollectGarbage");
std::vector<CacheBucket*> Buckets;
{
@@ -2468,13 +2494,16 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
{
Bucket->CollectGarbage(GcCtx);
}
- MemCacheTrim(Buckets, GcCtx.CacheExpireTime());
+ if (!m_IsMemCacheTrimming)
+ {
+ MemCacheTrim(Buckets, GcCtx.CacheExpireTime());
+ }
}
void
ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::PutStandaloneCacheValue");
+ ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue");
uint64_t NewFileSize = Value.Value.Size();
@@ -2671,16 +2700,17 @@ ZenCacheDiskLayer::CacheBucket::RemoveMetaData(RwLock::ExclusiveLockScope&, Buck
}
void
-ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload, IoBuffer& MemCachedData)
+ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData)
{
- uint64_t PayloadSize = MemCachedData.GetSize();
+ BucketPayload& Payload = m_Payloads[PayloadIndex];
+ uint64_t PayloadSize = MemCachedData.GetSize();
ZEN_ASSERT(PayloadSize != 0);
if (m_FreeMemCachedPayloads.empty())
{
if (m_MemCachedPayloads.size() != std::numeric_limits<uint32_t>::max())
{
Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(m_MemCachedPayloads.size()));
- m_MemCachedPayloads.push_back(MemCachedData);
+ m_MemCachedPayloads.emplace_back(MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex});
AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
m_MemoryWriteCount++;
}
@@ -2689,7 +2719,7 @@ ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, Bu
{
Payload.MemCached = m_FreeMemCachedPayloads.back();
m_FreeMemCachedPayloads.pop_back();
- m_MemCachedPayloads[Payload.MemCached] = MemCachedData;
+ m_MemCachedPayloads[Payload.MemCached] = MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex};
AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
m_MemoryWriteCount++;
}
@@ -2700,9 +2730,9 @@ ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(RwLock::ExclusiveLockScope&,
{
if (Payload.MemCached)
{
- size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].GetSize();
+ size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].Payload.GetSize();
RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
- m_MemCachedPayloads[Payload.MemCached] = IoBuffer{};
+ m_MemCachedPayloads[Payload.MemCached] = {};
m_FreeMemCachedPayloads.push_back(Payload.MemCached);
Payload.MemCached = {};
return PayloadSize;
@@ -2723,7 +2753,7 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck
void
ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::PutInlineCacheValue");
+ ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue");
uint8_t EntryFlags = 0;
@@ -2800,7 +2830,7 @@ public:
virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactStore");
+ ZEN_TRACE_CPU("Z$::Bucket::CompactStore");
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3023,7 +3053,7 @@ private:
GcStoreCompactor*
ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveExpiredData");
+ ZEN_TRACE_CPU("Z$::Bucket::RemoveExpiredData");
size_t TotalEntries = 0;
@@ -3117,7 +3147,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
std::vector<BucketMetaData> MetaDatas;
- std::vector<IoBuffer> MemCachedPayloads;
+ std::vector<MemCacheData> MemCachedPayloads;
std::vector<ReferenceIndex> FirstReferenceIndex;
IndexMap Index;
{
@@ -3164,7 +3194,7 @@ public:
virtual void PreCache(GcCtx& Ctx) override
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::PreCache");
+ ZEN_TRACE_CPU("Z$::Bucket::PreCache");
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3385,7 +3415,7 @@ public:
virtual void LockState(GcCtx& Ctx) override
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::LockState");
+ ZEN_TRACE_CPU("Z$::Bucket::LockState");
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3458,7 +3488,7 @@ public:
virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveUsedReferencesFromSet");
+ ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet");
ZEN_ASSERT(m_IndexLock);
size_t InitialCount = IoCids.size();
@@ -3505,7 +3535,7 @@ public:
std::vector<GcReferenceChecker*>
ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CreateReferenceCheckers");
+ ZEN_TRACE_CPU("Z$::Bucket::CreateReferenceCheckers");
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3530,7 +3560,7 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx)
void
ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactReferences");
+ ZEN_TRACE_CPU("Z$::Bucket::CompactReferences");
std::vector<ReferenceIndex> FirstReferenceIndex;
std::vector<IoHash> NewReferenceHashes;
@@ -3708,12 +3738,12 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&,
std::vector<BucketPayload>& Payloads,
std::vector<AccessTime>& AccessTimes,
std::vector<BucketMetaData>& MetaDatas,
- std::vector<IoBuffer>& MemCachedPayloads,
+ std::vector<MemCacheData>& MemCachedPayloads,
std::vector<ReferenceIndex>& FirstReferenceIndex,
IndexMap& Index,
RwLock::ExclusiveLockScope& IndexLock)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactState");
+ ZEN_TRACE_CPU("Z$::Bucket::CompactState");
size_t EntryCount = m_Index.size();
Payloads.reserve(EntryCount);
@@ -3738,7 +3768,8 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&,
}
if (Payload.MemCached)
{
- MemCachedPayloads.push_back(std::move(m_MemCachedPayloads[Payload.MemCached]));
+ MemCachedPayloads.emplace_back(
+ MemCacheData{.Payload = std::move(m_MemCachedPayloads[Payload.MemCached].Payload), .OwnerIndex = EntryIndex});
Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(MemCachedPayloads.size() - 1));
}
if (m_Configuration.EnableReferenceCaching)
@@ -3811,7 +3842,7 @@ ZenCacheDiskLayer::~ZenCacheDiskLayer()
ZenCacheDiskLayer::CacheBucket*
ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
{
- ZEN_TRACE_CPU("Z$::Disk::GetOrCreateBucket");
+ ZEN_TRACE_CPU("Z$::GetOrCreateBucket");
const auto BucketName = std::string(InBucket);
{
@@ -3858,7 +3889,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
bool
ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
{
- ZEN_TRACE_CPU("Z$::Disk::Get");
+ ZEN_TRACE_CPU("Z$::Get");
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
@@ -3874,7 +3905,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
void
ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
- ZEN_TRACE_CPU("Z$::Disk::Put");
+ ZEN_TRACE_CPU("Z$::Put");
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
@@ -3886,6 +3917,8 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
void
ZenCacheDiskLayer::DiscoverBuckets()
{
+ ZEN_TRACE_CPU("Z$::DiscoverBuckets");
+
DirectoryContent DirContent;
GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent);
@@ -3986,6 +4019,8 @@ ZenCacheDiskLayer::DiscoverBuckets()
bool
ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
{
+ ZEN_TRACE_CPU("Z$::DropBucket");
+
RwLock::ExclusiveLockScope _(m_Lock);
auto It = m_Buckets.find(std::string(InBucket));
@@ -4008,6 +4043,8 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
bool
ZenCacheDiskLayer::Drop()
{
+ ZEN_TRACE_CPU("Z$::Drop");
+
RwLock::ExclusiveLockScope _(m_Lock);
std::vector<std::unique_ptr<CacheBucket>> Buckets;
@@ -4029,6 +4066,8 @@ ZenCacheDiskLayer::Drop()
void
ZenCacheDiskLayer::Flush()
{
+ ZEN_TRACE_CPU("Z$::Flush");
+
std::vector<CacheBucket*> Buckets;
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -4070,6 +4109,8 @@ ZenCacheDiskLayer::Flush()
void
ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
{
+ ZEN_TRACE_CPU("Z$::ScrubStorage");
+
RwLock::SharedLockScope _(m_Lock);
{
std::vector<std::future<void>> Results;
@@ -4096,7 +4137,7 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
void
ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx)
{
- ZEN_TRACE_CPU("Z$::Disk::GatherReferences");
+ ZEN_TRACE_CPU("Z$::GatherReferences");
std::vector<CacheBucket*> Buckets;
{
@@ -4213,20 +4254,11 @@ ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const st
void
ZenCacheDiskLayer::MemCacheTrim()
{
- ZEN_TRACE_CPU("Z$::Disk::MemCacheTrim");
+ ZEN_TRACE_CPU("Z$::MemCacheTrim");
ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0);
-
- const GcClock::TimePoint Now = GcClock::Now();
-
- const GcClock::Tick NowTick = Now.time_since_epoch().count();
- const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
- GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim;
- const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count();
- if (NowTick < NextAllowedTrimTick)
- {
- return;
- }
+ ZEN_ASSERT(m_Configuration.MemCacheMaxAgeSeconds != 0);
+ ZEN_ASSERT(m_Configuration.MemCacheTrimIntervalSeconds != 0);
bool Expected = false;
if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true))
@@ -4234,75 +4266,90 @@ ZenCacheDiskLayer::MemCacheTrim()
return;
}
- // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong
- const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
- m_LastTickMemCacheTrim.store(NextTrimTick);
+ try
+ {
+ m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) {
+ ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]");
+
+ const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
+ uint64_t TrimmedSize = 0;
+ Stopwatch Timer;
+ const auto Guard = MakeGuard([&] {
+ ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
+ NiceBytes(TrimmedSize),
+ NiceBytes(m_TotalMemCachedSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+
+ const GcClock::Tick NowTick = GcClock::TickCount();
+ const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
+ m_NextAllowedTrimTick.store(NextTrimTick);
+ m_IsMemCacheTrimming.store(false);
+ });
- m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this, Now, TrimInterval](JobContext&) {
- ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]");
+ const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds);
- uint64_t StartSize = m_TotalMemCachedSize.load();
- Stopwatch Timer;
- const auto Guard = MakeGuard([&] {
- uint64_t EndSize = m_TotalMemCachedSize.load();
- ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
- NiceBytes(StartSize > EndSize ? StartSize - EndSize : 0),
- NiceBytes(m_TotalMemCachedSize),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- m_IsMemCacheTrimming.store(false);
- });
+ static const size_t UsageSlotCount = 2048;
+ std::vector<uint64_t> UsageSlots;
+ UsageSlots.reserve(UsageSlotCount);
- const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds);
-
- std::vector<uint64_t> UsageSlots;
- UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count());
+ std::vector<CacheBucket*> Buckets;
+ {
+ RwLock::SharedLockScope __(m_Lock);
+ Buckets.reserve(m_Buckets.size());
+ for (auto& Kv : m_Buckets)
+ {
+ Buckets.push_back(Kv.second.get());
+ }
+ }
- std::vector<CacheBucket*> Buckets;
- {
- RwLock::SharedLockScope __(m_Lock);
- Buckets.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
+ const GcClock::TimePoint Now = GcClock::Now();
{
- Buckets.push_back(Kv.second.get());
+ ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess");
+ for (CacheBucket* Bucket : Buckets)
+ {
+ Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots);
+ }
}
- }
- for (CacheBucket* Bucket : Buckets)
- {
- Bucket->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots);
- }
- uint64_t TotalSize = 0;
- for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
- {
- TotalSize += UsageSlots[Index];
- if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes)
+ uint64_t TotalSize = 0;
+ for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
{
- GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index);
- MemCacheTrim(Buckets, ExpireTime);
- break;
+ TotalSize += UsageSlots[Index];
+ if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes)
+ {
+ GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount);
+ TrimmedSize = MemCacheTrim(Buckets, ExpireTime);
+ break;
+ }
}
- }
- });
+ });
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what());
+ m_IsMemCacheTrimming.store(false);
+ }
}
-void
+uint64_t
ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime)
{
if (m_Configuration.MemCacheTargetFootprintBytes == 0)
{
- return;
+ return 0;
}
- RwLock::SharedLockScope __(m_Lock);
+ uint64_t TrimmedSize = 0;
for (CacheBucket* Bucket : Buckets)
{
- Bucket->MemCacheTrim(ExpireTime);
+ TrimmedSize += Bucket->MemCacheTrim(ExpireTime);
}
const GcClock::TimePoint Now = GcClock::Now();
const GcClock::Tick NowTick = Now.time_since_epoch().count();
const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
- GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim;
+ GcClock::Tick LastTrimTick = m_NextAllowedTrimTick;
const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
- m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
+ m_NextAllowedTrimTick.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
+ return TrimmedSize;
}
#if ZEN_WITH_TESTS
diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h
index 277371f2c..6997a12e4 100644
--- a/src/zenserver/cache/cachedisklayer.h
+++ b/src/zenserver/cache/cachedisklayer.h
@@ -197,15 +197,15 @@ public:
CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config);
~CacheBucket();
- bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
- bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
- void MemCacheTrim(GcClock::TimePoint ExpireTime);
- bool Drop();
- void Flush();
- void ScrubStorage(ScrubContext& Ctx);
- void GatherReferences(GcContext& GcCtx);
- void CollectGarbage(GcContext& GcCtx);
+ bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
+ bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
+ bool Drop();
+ void Flush();
+ void ScrubStorage(ScrubContext& Ctx);
+ void GatherReferences(GcContext& GcCtx);
+ void CollectGarbage(GcContext& GcCtx);
inline GcStorageSize StorageSize() const
{
@@ -218,7 +218,7 @@ public:
CacheValueDetails::BucketDetails GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const;
void EnumerateBucketContents(std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const;
- void GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector<uint64_t>& InOutUsageSlots);
+ void GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots);
#if ZEN_WITH_TESTS
void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time);
#endif // ZEN_WITH_TESTS
@@ -286,6 +286,11 @@ public:
operator bool() const { return RawSize != 0 || RawHash != IoHash::Zero; };
};
+ struct MemCacheData
+ {
+ IoBuffer Payload;
+ PayloadIndex OwnerIndex;
+ };
#pragma pack(pop)
static_assert(sizeof(BucketPayload) == 20u);
static_assert(sizeof(BucketMetaData) == 28u);
@@ -323,7 +328,7 @@ public:
std::vector<BucketPayload> m_Payloads;
std::vector<BucketMetaData> m_MetaDatas;
std::vector<MetaDataIndex> m_FreeMetaDatas;
- std::vector<IoBuffer> m_MemCachedPayloads;
+ std::vector<MemCacheData> m_MemCachedPayloads;
std::vector<MemCachedIndex> m_FreeMemCachedPayloads;
std::vector<ReferenceIndex> m_FirstReferenceIndex;
std::vector<IoHash> m_ReferenceHashes;
@@ -364,7 +369,7 @@ public:
const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData);
void RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload);
BucketMetaData GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const;
- void SetMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload, IoBuffer& MemCachedData);
+ void SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData);
size_t RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload);
void InitializeIndexFromDisk(RwLock::ExclusiveLockScope&, bool IsNew);
@@ -390,7 +395,7 @@ public:
std::vector<BucketPayload>& Payloads,
std::vector<AccessTime>& AccessTimes,
std::vector<BucketMetaData>& MetaDatas,
- std::vector<IoBuffer>& MemCachedPayloads,
+ std::vector<MemCacheData>& MemCachedPayloads,
std::vector<ReferenceIndex>& FirstReferenceIndex,
IndexMap& Index,
RwLock::ExclusiveLockScope& IndexLock);
@@ -405,6 +410,10 @@ public:
m_MemCachedSize.fetch_sub(ValueSize, std::memory_order::relaxed);
m_OuterCacheMemoryUsage.fetch_sub(ValueSize, std::memory_order::relaxed);
}
+ static inline uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize)
+ {
+ return sizeof(MemCacheData) + sizeof(IoBufferCore) + RoundUp(PayloadSize, 8u);
+ }
// These locks are here to avoid contention on file creation, therefore it's sufficient
// that we take the same lock for the same hash
@@ -436,10 +445,21 @@ private:
{
return;
}
+ if (m_IsMemCacheTrimming)
+ {
+ return;
+ }
+
+ const GcClock::Tick NowTick = GcClock::TickCount();
+ if (NowTick < m_NextAllowedTrimTick)
+ {
+ return;
+ }
+
MemCacheTrim();
}
- void MemCacheTrim();
- void MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime);
+ void MemCacheTrim();
+ uint64_t MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime);
GcManager& m_Gc;
JobQueue& m_JobQueue;
@@ -447,7 +467,7 @@ private:
Configuration m_Configuration;
std::atomic_uint64_t m_TotalMemCachedSize{};
std::atomic_bool m_IsMemCacheTrimming = false;
- std::atomic<GcClock::Tick> m_LastTickMemCacheTrim;
+ std::atomic<GcClock::Tick> m_NextAllowedTrimTick;
mutable RwLock m_Lock;
std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets;
std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets;
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 8db96f914..f61fbd8bc 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -338,7 +338,11 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach
HttpStructuredCacheService::~HttpStructuredCacheService()
{
ZEN_INFO("closing structured cache");
- m_RequestRecorder.reset();
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+ }
m_StatsService.UnregisterHandler("z$", *this);
m_StatusService.UnregisterHandler("z$", *this);
@@ -615,24 +619,44 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
if (Key == HttpZCacheUtilStartRecording)
{
- m_RequestRecorder.reset();
HttpServerRequest::QueryParams Params = Request.GetQueryParams();
std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
- m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath);
+
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+
+ m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath);
+ m_RequestRecordingEnabled.store(true);
+ }
+ ZEN_INFO("cache RPC recording STARTED -> '{}'", RecordPath);
Request.WriteResponse(HttpResponseCode::OK);
return;
}
+
if (Key == HttpZCacheUtilStopRecording)
{
- m_RequestRecorder.reset();
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+ }
+ ZEN_INFO("cache RPC recording STOPPED");
Request.WriteResponse(HttpResponseCode::OK);
return;
}
+
if (Key == HttpZCacheUtilReplayRecording)
{
CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
- m_RequestRecorder.reset();
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+ }
+
HttpServerRequest::QueryParams Params = Request.GetQueryParams();
std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
uint32_t ThreadCount = std::thread::hardware_concurrency();
@@ -643,11 +667,18 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
ThreadCount = gsl::narrow<uint32_t>(Value.value());
}
}
+
+ ZEN_INFO("initiating cache RPC replay using {} threads, from '{}'", ThreadCount, RecordPath);
+
std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false));
ReplayRequestRecorder(RequestContext, *Replayer, ThreadCount < 1 ? 1 : ThreadCount);
+
+ ZEN_INFO("cache RPC replay STARTED");
+
Request.WriteResponse(HttpResponseCode::OK);
return;
}
+
if (Key.starts_with(HttpZCacheDetailsPrefix))
{
HandleDetailsRequest(Request);
@@ -1776,11 +1807,15 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
[this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable {
uint64_t RequestIndex = ~0ull;
- if (m_RequestRecorder)
+ if (m_RequestRecordingEnabled)
{
- RequestIndex = m_RequestRecorder->RecordRequest(
- {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId},
- Body);
+ RwLock::SharedLockScope _(m_RequestRecordingLock);
+ if (m_RequestRecorder)
+ {
+ RequestIndex = m_RequestRecorder->RecordRequest(
+ {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId},
+ Body);
+ }
}
uint32_t AcceptMagic = 0;
@@ -1816,8 +1851,11 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle);
if (RequestIndex != ~0ull)
{
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
+ RwLock::SharedLockScope _(m_RequestRecordingLock);
+ if (m_RequestRecorder)
+ {
+ m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
}
AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
@@ -1828,10 +1866,13 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
if (RequestIndex != ~0ull)
{
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->RecordResponse(RequestIndex,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ RwLock::SharedLockScope _(m_RequestRecordingLock);
+ if (m_RequestRecorder)
+ {
+ m_RequestRecorder->RecordResponse(RequestIndex,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
}
AsyncRequest.WriteResponse(HttpResponseCode::OK,
HttpContentType::kCbPackage,
diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h
index 57a533029..2feaaead8 100644
--- a/src/zenserver/cache/httpstructuredcache.h
+++ b/src/zenserver/cache/httpstructuredcache.h
@@ -190,6 +190,12 @@ private:
void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount);
+ // This exists to avoid taking locks when recording is not enabled
+ std::atomic_bool m_RequestRecordingEnabled{false};
+
+ // This lock should be taken in SHARED mode when calling into the recorder,
+ // and taken in EXCLUSIVE mode whenever the recorder is created or destroyed
+ RwLock m_RequestRecordingLock;
std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder;
};
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index 5f2c3351e..012925b51 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -5,6 +5,8 @@
#include "config/luaconfig.h"
#include "diag/logging.h"
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
#include <zencore/crypto.h>
#include <zencore/except.h>
#include <zencore/fmtutils.h>
@@ -41,7 +43,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
std::filesystem::path
-PickDefaultStateDirectory()
+PickDefaultSystemRootDirectory()
{
// Pick sensible default
PWSTR ProgramDataDir = nullptr;
@@ -50,7 +52,7 @@ PickDefaultStateDirectory()
if (SUCCEEDED(hRes))
{
std::filesystem::path FinalPath(ProgramDataDir);
- FinalPath /= L"Epic\\Zen\\Data";
+ FinalPath /= L"Epic\\Zen";
::CoTaskMemFree(ProgramDataDir);
return FinalPath;
@@ -66,7 +68,7 @@ PickDefaultStateDirectory()
namespace zen {
std::filesystem::path
-PickDefaultStateDirectory()
+PickDefaultSystemRootDirectory()
{
int UserId = getuid();
const passwd* Passwd = getpwuid(UserId);
@@ -79,6 +81,62 @@ PickDefaultStateDirectory()
namespace zen {
+std::filesystem::path
+PickDefaultStateDirectory(std::filesystem::path SystemRoot)
+{
+ if (SystemRoot.empty())
+ return SystemRoot;
+
+ return SystemRoot / "Data";
+}
+
+void
+EmitCentralManifest(const std::filesystem::path& SystemRoot, Oid Identifier, CbObject Manifest, std::filesystem::path ManifestPath)
+{
+ CbObjectWriter Cbo;
+ Cbo << "path" << ManifestPath.generic_wstring();
+ Cbo << "manifest" << Manifest;
+
+ const std::filesystem::path StatesPath = SystemRoot / "States";
+
+ CreateDirectories(StatesPath);
+ WriteFile(StatesPath / fmt::format("{}", Identifier), Cbo.Save().GetBuffer().AsIoBuffer());
+}
+
+std::vector<CbObject>
+ReadAllCentralManifests(const std::filesystem::path& SystemRoot)
+{
+ std::vector<CbObject> Manifests;
+
+ DirectoryContent Content;
+ GetDirectoryContent(SystemRoot / "States", DirectoryContent::IncludeFilesFlag, Content);
+
+ for (std::filesystem::path& File : Content.Files)
+ {
+ try
+ {
+ FileContents FileData = ReadFile(File);
+ IoBuffer DataBuffer = FileData.Flatten();
+ CbValidateError ValidateError = ValidateCompactBinary(DataBuffer, CbValidateMode::All);
+
+ if (ValidateError == CbValidateError::None)
+ {
+ Manifests.push_back(LoadCompactBinaryObject(DataBuffer));
+ }
+ else
+ {
+ ZEN_WARN("failed to load manifest '{}': {}", File, ToString(ValidateError));
+ }
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("failed to load manifest '{}': {}", File, Ex.what());
+ }
+ }
+
+ return Manifests;
+}
+
void
ValidateOptions(ZenServerOptions& ServerOptions)
{
@@ -343,6 +401,7 @@ ParseConfigFile(const std::filesystem::path& Path,
LuaOptions.AddOption("server.logid"sv, ServerOptions.LogId, "log-id"sv);
LuaOptions.AddOption("server.sentry.disable"sv, ServerOptions.NoSentry, "no-sentry"sv);
LuaOptions.AddOption("server.sentry.allowpersonalinfo"sv, ServerOptions.SentryAllowPII, "sentry-allow-personal-info"sv);
+ LuaOptions.AddOption("server.systemrootdir"sv, ServerOptions.SystemRootDir, "system-dir"sv);
LuaOptions.AddOption("server.datadir"sv, ServerOptions.DataDir, "data-dir"sv);
LuaOptions.AddOption("server.contentdir"sv, ServerOptions.ContentDir, "content-dir"sv);
LuaOptions.AddOption("server.abslog"sv, ServerOptions.AbsLogFile, "abslog"sv);
@@ -370,9 +429,11 @@ ParseConfigFile(const std::filesystem::path& Path,
ServerOptions.HttpServerConfig.HttpSys.IsRequestLoggingEnabled,
"httpsys-enable-request-logging"sv);
+#if ZEN_WITH_TRACE
////// trace
LuaOptions.AddOption("trace.host"sv, ServerOptions.TraceHost, "tracehost"sv);
LuaOptions.AddOption("trace.file"sv, ServerOptions.TraceFile, "tracefile"sv);
+#endif
////// stats
LuaOptions.AddOption("stats.enable"sv, ServerOptions.StatsConfig.Enabled);
@@ -503,6 +564,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
// stream operator to convert argv value into the options type. std::fs::path
// expects paths in streams to be quoted but argv paths are unquoted. By
// going into a std::string first, paths with whitespace parse correctly.
+ std::string SystemRootDir;
std::string DataDir;
std::string ContentDir;
std::string AbsLogFile;
@@ -525,6 +587,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
options.add_options()("help", "Show command line help");
options.add_options()("t, test", "Enable test mode", cxxopts::value<bool>(ServerOptions.IsTest)->default_value("false"));
options.add_options()("data-dir", "Specify persistence root", cxxopts::value<std::string>(DataDir));
+ options.add_options()("system-dir", "Specify system root", cxxopts::value<std::string>(SystemRootDir));
options.add_options()("snapshot-dir",
"Specify a snapshot of server state to mirror into the persistence root at startup",
cxxopts::value<std::string>(BaseSnapshotDir));
@@ -975,6 +1038,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
}
logging::RefreshLogLevels();
+ ServerOptions.SystemRootDir = MakeSafePath(SystemRootDir);
ServerOptions.DataDir = MakeSafePath(DataDir);
ServerOptions.BaseSnapshotDir = MakeSafePath(BaseSnapshotDir);
ServerOptions.ContentDir = MakeSafePath(ContentDir);
@@ -1022,9 +1086,14 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
throw;
}
+ if (ServerOptions.SystemRootDir.empty())
+ {
+ ServerOptions.SystemRootDir = PickDefaultSystemRootDirectory();
+ }
+
if (ServerOptions.DataDir.empty())
{
- ServerOptions.DataDir = PickDefaultStateDirectory();
+ ServerOptions.DataDir = PickDefaultStateDirectory(ServerOptions.SystemRootDir);
}
if (ServerOptions.AbsLogFile.empty())
diff --git a/src/zenserver/config.h b/src/zenserver/config.h
index cd2d92523..b5314b600 100644
--- a/src/zenserver/config.h
+++ b/src/zenserver/config.h
@@ -128,6 +128,7 @@ struct ZenServerOptions
zen::HttpServerConfig HttpServerConfig;
ZenStructuredCacheConfig StructuredCacheConfig;
ZenStatsConfig StatsConfig;
+ std::filesystem::path SystemRootDir; // System root directory (used for machine level config)
std::filesystem::path DataDir; // Root directory for state (used for testing)
std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental)
std::filesystem::path AbsLogFile; // Absolute path to main log file
@@ -162,4 +163,7 @@ struct ZenServerOptions
void ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions);
+void EmitCentralManifest(const std::filesystem::path& SystemRoot, Oid Identifier, CbObject Manifest, std::filesystem::path ManifestPath);
+std::vector<CbObject> ReadAllCentralManifests(const std::filesystem::path& SystemRoot);
+
} // namespace zen
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 73cb35fb8..b7507bd17 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1176,8 +1176,6 @@ ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
const OplogEntryMapping& OpMapping,
const OplogEntry& OpEntry)
{
- ZEN_TRACE_CPU("Store::Oplog::RegisterOplogEntry");
-
// For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing
// too many problems. Longer term we'll probably want to ensure we can do concurrent updates however
@@ -3662,11 +3660,11 @@ namespace testutils {
return Result;
}
- uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset)
+ uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64_t RawOffset)
{
if (RawOffset > 0)
{
- uint64 BlockSize = 0;
+ uint64_t BlockSize = 0;
OodleCompressor Compressor;
OodleCompressionLevel CompressionLevel;
if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
diff --git a/src/zenserver/upstream/zen.cpp b/src/zenserver/upstream/zen.cpp
index 8ae33597a..2d52236b3 100644
--- a/src/zenserver/upstream/zen.cpp
+++ b/src/zenserver/upstream/zen.cpp
@@ -59,6 +59,11 @@ ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClien
ZenStructuredCacheClient::~ZenStructuredCacheClient()
{
+ RwLock::ExclusiveLockScope _(m_SessionStateLock);
+ for (auto& CacheEntry : m_SessionStateCache)
+ {
+ delete CacheEntry;
+ }
}
detail::ZenCacheSessionState*
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 336f715f4..f80f95f8e 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -305,7 +305,8 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
m_ProjectStore,
HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile,
.HttpLogPath = ServerOptions.DataDir / "logs" / "http.log",
- .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"});
+ .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"},
+ ServerOptions);
m_Http->RegisterService(*m_AdminService);
return EffectiveBasePort;
@@ -329,6 +330,8 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions)
bool UpdateManifest = false;
std::filesystem::path ManifestPath = m_DataRoot / "root_manifest";
+ Oid StateId = Oid::Zero;
+ DateTime CreatedWhen{0};
if (!WipeState)
{
@@ -365,6 +368,8 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions)
m_RootManifest = LoadCompactBinaryObject(Manifest);
const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0);
+ StateId = m_RootManifest["state_id"].AsObjectId();
+ CreatedWhen = m_RootManifest["created"].AsDateTime();
if (ManifestVersion != ZEN_CFG_SCHEMA_VERSION)
{
@@ -391,6 +396,20 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions)
}
}
+ if (StateId == Oid::Zero)
+ {
+ StateId = Oid::NewOid();
+ UpdateManifest = true;
+ }
+
+ const DateTime Now = DateTime::Now();
+
+ if (CreatedWhen.GetTicks() == 0)
+ {
+ CreatedWhen = Now;
+ UpdateManifest = true;
+ }
+
// Handle any state wipe
if (WipeState)
@@ -418,19 +437,36 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions)
UpdateManifest = true;
}
- if (UpdateManifest)
- {
- // Write new manifest
-
- const DateTime Now = DateTime::Now();
+ // Write manifest
+ {
CbObjectWriter Cbo;
- Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "created" << Now << "updated" << Now << "state_id" << Oid::NewOid();
+ Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "created" << CreatedWhen << "updated" << Now << "state_id" << StateId;
m_RootManifest = Cbo.Save();
- WriteFile(ManifestPath, m_RootManifest.GetBuffer().AsIoBuffer());
+ if (UpdateManifest)
+ {
+ IoBuffer ManifestBuffer = m_RootManifest.GetBuffer().AsIoBuffer();
+
+ WriteFile(ManifestPath, ManifestBuffer);
+ }
+
+ if (!ServerOptions.IsTest)
+ {
+ try
+ {
+ EmitCentralManifest(ServerOptions.SystemRootDir, StateId, m_RootManifest, ManifestPath);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Unable to emit central manifest: ", Ex.what());
+ }
+ }
}
+
+ // Write state marker
+
{
std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker";
static const std::string_view StateMarkerContent = "deleting this file will cause " ZEN_APP_NAME " to exit"sv;
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 71e306eca..73a8ad538 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -51,6 +51,7 @@ BlockStoreFile::GetPath() const
void
BlockStoreFile::Open()
{
+ ZEN_TRACE_CPU("BlockStoreFile::Open");
uint32_t RetriesLeft = 3;
m_File.Open(m_Path, BasicFile::Mode::kDelete, [&](std::error_code& Ec) {
if (RetriesLeft == 0)
@@ -149,6 +150,11 @@ BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::functio
{
m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun));
}
+bool
+BlockStoreFile::IsOpen() const
+{
+ return !!m_IoBuffer;
+}
constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024;
@@ -285,6 +291,7 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations)
BlockStore::BlockEntryCountMap
BlockStore::GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent)
{
+ ZEN_TRACE_CPU("BlockStoreFile::GetBlocksToCompact");
BlockEntryCountMap Result;
{
RwLock::SharedLockScope InsertLock(m_InsertLock);
@@ -345,6 +352,7 @@ BlockStore::GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUs
void
BlockStore::Close()
{
+ ZEN_TRACE_CPU("BlockStore::Close");
RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
m_WriteBlock = nullptr;
m_CurrentInsertOffset = 0;
@@ -666,7 +674,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
ZEN_TRACE_CPU("BlockStore::ReclaimSpace::Compact");
Ref<BlockStoreFile> NewBlockFile;
auto NewBlockFileGuard = MakeGuard([&]() {
- if (NewBlockFile)
+ if (NewBlockFile && NewBlockFile->IsOpen())
{
ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath());
m_TotalSize.fetch_sub(NewBlockFile->FileSize(), std::memory_order::relaxed);
@@ -754,10 +762,26 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
if (OldBlockFile)
{
ZEN_TRACE_CPU("BlockStore::ReclaimSpace::MoveBlock");
+
+ ZEN_INFO("Moving {} chunks from '{}' to new block", KeepMap.size(), GetBlockPath(m_BlocksBasePath, BlockIndex));
+
+ uint64_t OldBlockSize = OldBlockFile->FileSize();
std::vector<uint8_t> Chunk;
for (const size_t& ChunkIndex : KeepMap)
{
const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex];
+ if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize)
+ {
+ ZEN_WARN(
+ "ReclaimSpace skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block "
+ "size {}",
+ m_BlocksBasePath,
+ ChunkLocation.Offset,
+ ChunkLocation.Size,
+ OldBlockFile->GetPath(),
+ OldBlockSize);
+ continue;
+ }
Chunk.resize(ChunkLocation.Size);
OldBlockFile->Read(Chunk.data(), ChunkLocation.Size, ChunkLocation.Offset);
@@ -767,6 +791,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
if (NewBlockFile)
{
+ ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
NewBlockFile->Flush();
NewBlockFile = nullptr;
}
@@ -820,8 +845,8 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
});
ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile);
+ ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen());
m_ChunkBlocks.erase(NextBlockIndex);
- NewBlockFile->MarkAsDeleteOnClose();
return;
}
@@ -846,6 +871,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
Chunk.clear();
if (NewBlockFile)
{
+ ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
NewBlockFile->Flush();
}
}
@@ -880,6 +906,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
}
if (NewBlockFile)
{
+ ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
NewBlockFile->Flush();
NewBlockFile = nullptr;
}
@@ -1044,6 +1071,8 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
const CompactCallback& ChangeCallback,
const ClaimDiskReserveCallback& DiskReserveCallback)
{
+ ZEN_TRACE_CPU("BlockStore::CompactBlocks");
+
uint64_t DeletedSize = 0;
uint64_t MovedCount = 0;
uint64_t MovedSize = 0;
@@ -1069,7 +1098,6 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
auto NewBlockFileGuard = MakeGuard([&]() {
if (NewBlockFile)
{
- ZEN_DEBUG("Dropping incomplete cas block store file '{}'", NewBlockFile->GetPath());
{
RwLock::ExclusiveLockScope _l(m_InsertLock);
if (m_ChunkBlocks[NewBlockIndex] == NewBlockFile)
@@ -1077,7 +1105,11 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
m_ChunkBlocks.erase(NewBlockIndex);
}
}
- NewBlockFile->MarkAsDeleteOnClose();
+ if (NewBlockFile->IsOpen())
+ {
+ ZEN_DEBUG("Dropping incomplete cas block store file '{}'", NewBlockFile->GetPath());
+ NewBlockFile->MarkAsDeleteOnClose();
+ }
}
});
@@ -1100,6 +1132,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
CompactState.IterateBlocks([&](uint32_t BlockIndex,
const std::vector<size_t>& KeepChunkIndexes,
const std::vector<BlockStoreLocation>& ChunkLocations) -> bool {
+ ZEN_TRACE_CPU("BlockStore::CompactBlock");
Ref<BlockStoreFile> OldBlockFile;
{
RwLock::SharedLockScope _(m_InsertLock);
@@ -1125,6 +1158,8 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
}
ZEN_ASSERT(OldBlockFile);
+ ZEN_INFO("Moving {} chunks from '{}' to new block", KeepChunkIndexes.size(), GetBlockPath(m_BlocksBasePath, BlockIndex));
+
uint64_t OldBlockSize = OldBlockFile->FileSize();
std::vector<uint8_t> Chunk;
@@ -1151,6 +1186,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
{
if (NewBlockFile)
{
+ ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
NewBlockFile->Flush();
MovedSize += NewBlockFile->FileSize();
NewBlockFile = nullptr;
@@ -1179,7 +1215,6 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
NewBlockFile = new BlockStoreFile(NewBlockPath);
m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
}
- ZEN_ASSERT(NewBlockFile);
std::error_code Error;
DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error);
@@ -1191,7 +1226,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile);
m_ChunkBlocks.erase(NextBlockIndex);
}
- NewBlockFile->MarkAsDeleteOnClose();
+ ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen());
NewBlockFile = nullptr;
return false;
}
@@ -1210,7 +1245,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile);
m_ChunkBlocks.erase(NextBlockIndex);
}
- NewBlockFile->MarkAsDeleteOnClose();
+ ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen());
NewBlockFile = nullptr;
return false;
}
@@ -1251,6 +1286,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
if (NewBlockFile)
{
+ ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
NewBlockFile->Flush();
MovedSize += NewBlockFile->FileSize();
NewBlockFile = nullptr;
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index d38099117..b20f2049a 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -91,7 +91,7 @@ CasImpl::~CasImpl()
void
CasImpl::Initialize(const CidStoreConfiguration& InConfig)
{
- ZEN_TRACE_CPU("Cas::Initialize");
+ ZEN_TRACE_CPU("CAS::Initialize");
m_Config = InConfig;
@@ -127,6 +127,7 @@ CasImpl::Initialize(const CidStoreConfiguration& InConfig)
bool
CasImpl::OpenOrCreateManifest()
{
+ ZEN_TRACE_CPU("CAS::OpenOrCreateManifest");
bool IsNewStore = false;
std::filesystem::path ManifestPath = m_Config.RootDirectory;
@@ -189,6 +190,7 @@ CasImpl::OpenOrCreateManifest()
void
CasImpl::UpdateManifest()
{
+ ZEN_TRACE_CPU("CAS::UpdateManifest");
if (!m_ManifestObject)
{
CbObjectWriter Cbo;
@@ -266,6 +268,7 @@ CasImpl::ContainsChunk(const IoHash& ChunkHash)
void
CasImpl::FilterChunks(HashKeySet& InOutChunks)
{
+ ZEN_TRACE_CPU("CAS::FilterChunks");
m_SmallStrategy.FilterChunks(InOutChunks);
m_TinyStrategy.FilterChunks(InOutChunks);
m_LargeStrategy.FilterChunks(InOutChunks);
@@ -274,6 +277,7 @@ CasImpl::FilterChunks(HashKeySet& InOutChunks)
void
CasImpl::Flush()
{
+ ZEN_TRACE_CPU("CAS::Flush");
ZEN_INFO("flushing CAS pool at '{}'", m_Config.RootDirectory);
m_SmallStrategy.Flush();
m_TinyStrategy.Flush();
diff --git a/src/zenstore/caslog.cpp b/src/zenstore/caslog.cpp
index cf3bd76da..2c26e522f 100644
--- a/src/zenstore/caslog.cpp
+++ b/src/zenstore/caslog.cpp
@@ -229,7 +229,8 @@ CasLogFile::Append(const void* DataPointer, uint64_t DataSize)
if (Ec)
{
- throw std::system_error(Ec, fmt::format("Failed to write to log file '{}'", PathFromHandle(m_File.Handle())));
+ std::error_code Dummy;
+ throw std::system_error(Ec, fmt::format("Failed to write to log file '{}'", PathFromHandle(m_File.Handle(), Dummy)));
}
}
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index b21f9f8d8..64c1dadf8 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -244,6 +244,7 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks)
void
CasContainerStrategy::Flush()
{
+ ZEN_TRACE_CPU("CasContainer::Flush");
m_BlockStore.Flush(/*ForceNewBlock*/ false);
m_CasLog.Flush();
MakeIndexSnapshot();
@@ -470,7 +471,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
size_t ChunkIndex = ChunkLocations.size();
ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
+ ChunkIndexToChunkHash.push_back(ChunkHash);
if (Keep)
{
KeepChunkIndexes.push_back(ChunkIndex);
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index de653b0e3..4cc2c3ed1 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -1876,18 +1876,17 @@ GcScheduler::SchedulerThread()
NextTriggerStatus = Sb;
}
- ZEN_INFO(
- "{} used{}. '{}': {} in use, {} free. Disk writes last {} per {} [{}], peak {}/s.{}",
- NiceBytes(TotalSize.DiskSize),
- DiskSizeSoftLimit == 0 ? "" : fmt::format(", {} soft limit", NiceBytes(DiskSizeSoftLimit)),
- m_Config.RootDirectory,
- NiceBytes(Space.Total - Space.Free),
- NiceBytes(Space.Free),
- NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(LoadGraphTime).count())),
- NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(LoadGraphTime).count() / PressureGraphLength)),
- LoadGraph,
- NiceBytes(MaxLoad * uint64_t(std::chrono::seconds(1).count()) / uint64_t(std::chrono::seconds(LoadGraphTime).count())),
- NextTriggerStatus);
+ ZEN_INFO("{} used{}. '{}': {} in use, {} free. Disk writes last {} per {} [{}], peak {}/s.{}",
+ NiceBytes(TotalSize.DiskSize),
+ DiskSizeSoftLimit == 0 ? "" : fmt::format(", {} soft limit", NiceBytes(DiskSizeSoftLimit)),
+ m_Config.RootDirectory,
+ NiceBytes(Space.Total - Space.Free),
+ NiceBytes(Space.Free),
+ NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(LoadGraphTime).count())),
+ NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(LoadGraphTime).count() / PressureGraphLength)),
+ LoadGraph,
+ NiceBytes(MaxLoad / uint64_t(std::chrono::seconds(m_Config.MonitorInterval).count())),
+ NextTriggerStatus);
if (!DiskSpaceGCTriggered && !TimeBasedGCTriggered)
{
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index 786780b5e..bb36cb3cd 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -101,6 +101,7 @@ struct BlockStoreFile : public RefCounted
void Flush();
BasicFile& GetBasicFile();
void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
+ bool IsOpen() const;
private:
const std::filesystem::path m_Path;
diff --git a/src/zenutil/basicfile.cpp b/src/zenutil/basicfile.cpp
index 173b22449..024b1e5bf 100644
--- a/src/zenutil/basicfile.cpp
+++ b/src/zenutil/basicfile.cpp
@@ -241,7 +241,8 @@ BasicFile::Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset)
if (!Success)
{
- ThrowLastError(fmt::format("Failed to read from file '{}'", zen::PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ ThrowLastError(fmt::format("Failed to read from file '{}'", zen::PathFromHandle(m_FileHandle, Dummy)));
}
BytesToRead -= NumberOfBytesToRead;
@@ -374,7 +375,8 @@ BasicFile::Write(const void* Data, uint64_t Size, uint64_t Offset)
if (Ec)
{
- throw std::system_error(Ec, fmt::format("Failed to write to file '{}'", zen::PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ throw std::system_error(Ec, fmt::format("Failed to write to file '{}'", zen::PathFromHandle(m_FileHandle, Dummy)));
}
}
@@ -426,7 +428,8 @@ BasicFile::FileSize()
int Error = zen::GetLastError();
if (Error)
{
- ThrowSystemError(Error, fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ ThrowSystemError(Error, fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy)));
}
}
return uint64_t(liFileSize.QuadPart);
@@ -436,7 +439,8 @@ BasicFile::FileSize()
struct stat Stat;
if (fstat(Fd, &Stat) == -1)
{
- ThrowSystemError(GetLastError(), fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ ThrowSystemError(GetLastError(), fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy)));
}
return uint64_t(Stat.st_size);
#endif
@@ -483,7 +487,9 @@ BasicFile::SetFileSize(uint64_t FileSize)
int Error = zen::GetLastError();
if (Error)
{
- ThrowSystemError(Error, fmt::format("Failed to set file pointer to {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ ThrowSystemError(Error,
+ fmt::format("Failed to set file pointer to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy)));
}
}
OK = ::SetEndOfFile(m_FileHandle);
@@ -492,7 +498,9 @@ BasicFile::SetFileSize(uint64_t FileSize)
int Error = zen::GetLastError();
if (Error)
{
- ThrowSystemError(Error, fmt::format("Failed to set end of file to {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ ThrowSystemError(Error,
+ fmt::format("Failed to set end of file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy)));
}
}
#elif ZEN_PLATFORM_MAC
@@ -502,7 +510,9 @@ BasicFile::SetFileSize(uint64_t FileSize)
int Error = zen::GetLastError();
if (Error)
{
- ThrowSystemError(Error, fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ ThrowSystemError(Error,
+ fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy)));
}
}
#else
@@ -512,7 +522,9 @@ BasicFile::SetFileSize(uint64_t FileSize)
int Error = zen::GetLastError();
if (Error)
{
- ThrowSystemError(Error, fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ ThrowSystemError(Error,
+ fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy)));
}
}
if (FileSize > 0)
@@ -520,7 +532,9 @@ BasicFile::SetFileSize(uint64_t FileSize)
int Error = posix_fallocate64(Fd, 0, (off64_t)FileSize);
if (Error)
{
- ThrowSystemError(Error, fmt::format("Failed to allocate space of {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ ThrowSystemError(Error,
+ fmt::format("Failed to allocate space of {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy)));
}
}
#endif
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
index 054ac0e56..b8f9d65ef 100644
--- a/src/zenutil/cache/rpcrecording.cpp
+++ b/src/zenutil/cache/rpcrecording.cpp
@@ -2,8 +2,12 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/session.h>
#include <zencore/system.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
#include <zenutil/basicfile.h>
#include <zenutil/cache/rpcrecording.h>
@@ -229,7 +233,6 @@ public:
}
virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
-private:
virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
@@ -238,6 +241,7 @@ private:
}
virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
+private:
std::uint64_t m_RequestCount;
RecordedRequestsReader m_RequestBuffer;
};
@@ -256,15 +260,22 @@ struct RecordedRequest
uint32_t Length; // 4 bytes
ZenContentType ContentType; // 1 byte
ZenContentType AcceptType; // 1 byte
- uint8_t Padding; // 1 byte
+ uint8_t OffsetHigh; // 1 byte
uint8_t Padding2; // 1 byte
Oid SessionId; // 12 bytes
+
+ inline uint64_t GetOffset() const { return uint64_t(Offset) + (uint64_t(OffsetHigh) << 32); }
+ inline void SetOffset(uint64_t NewOffset)
+ {
+ Offset = gsl::narrow_cast<uint32_t>(NewOffset & 0xffff'ffff);
+ OffsetHigh = gsl::narrow_cast<uint8_t>(NewOffset >> 32);
+ }
};
static_assert(sizeof(RecordedRequest) == 24);
const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB
-const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; // 1MiB
+const uint64_t StandaloneFileSizeThreshold = 16 * 1024 * 1024ull; // 16MiB
const uint64_t SegmentRequestCount = 10 * 1000 * 1000;
const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the
// number of files in a directory below this level
@@ -272,6 +283,12 @@ const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try
const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024;
const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0};
+std::string
+MakeSegmentPath(uint64_t SegmentIndex)
+{
+ return fmt::format("segment_{:06}", SegmentIndex);
+}
+
struct RecordedRequestsSegmentWriter
{
RecordedRequestsSegmentWriter() = default;
@@ -291,6 +308,7 @@ struct RecordedRequestsSegmentWriter
return m_RequestCount;
}
+ RwLock::SharedLockScope _(m_Lock);
return m_Entries.size();
}
inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; }
@@ -321,6 +339,7 @@ struct RecordedRequestsWriter
RecordedRequestsSegmentWriter& EnsureCurrentSegment();
void CommitCurrentSegment(RwLock::ExclusiveLockScope&);
void EndWrite();
+ void WriteRecordingMetadata();
uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
private:
@@ -434,50 +453,58 @@ uint64_t
RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
{
const uint64_t RequestBufferSize = RequestBuffer.GetSize();
+ uint64_t RequestIndex = ~0ull;
- RwLock::ExclusiveLockScope Lock(m_Lock);
- uint64_t RequestIndex = m_Entries.size();
- RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u,
- .Length = uint32_t(RequestBufferSize & 0xffffFFFFu),
- .ContentType = RequestInfo.ContentType,
- .AcceptType = RequestInfo.AcceptType,
- .Padding = 0,
- .Padding2 = 0,
- .SessionId = RequestInfo.SessionId});
-
- if (Entry.Length < StandaloneFileSizeThreshold)
{
- const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ RequestIndex = m_Entries.size();
+ RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u,
+ .Length = uint32_t(RequestBufferSize & 0xffffFFFFu),
+ .ContentType = RequestInfo.ContentType,
+ .AcceptType = RequestInfo.AcceptType,
+ .OffsetHigh = 0,
+ .Padding2 = 0,
+ .SessionId = RequestInfo.SessionId});
- if (BlockIndex == m_BlockFiles.size())
+ if (Entry.Length < StandaloneFileSizeThreshold)
{
- std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
- NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
- m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
- ++m_FileCount;
- }
+ const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
- ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
- BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
- ZEN_ASSERT(BlockFile != nullptr);
+ if (BlockIndex == m_BlockFiles.size())
+ {
+ std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
+ NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
+ m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
+ ++m_FileCount;
+ }
- Entry.Offset = uint32_t(m_ChunkOffset & 0xffffFFFF);
- m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u);
- Lock.ReleaseNow();
+ ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
+ BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
+ ZEN_ASSERT(BlockFile != nullptr);
- std::error_code Ec;
- BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec);
- if (Ec)
- {
- Entry.Length = 0;
- return ~0ull;
- }
+ // Note that this is the overall logical offset, not the offset within a single file
+ const uint64_t ChunkWriteOffset = m_ChunkOffset;
+ m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u);
+ Entry.SetOffset(ChunkWriteOffset);
+ Lock.ReleaseNow();
- m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
+ std::error_code Ec;
+ BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec);
+ if (Ec)
+ {
+ // We cannot simply use `Entry` here because the vector may
+ // have been reallocated causing the entry to be in a different
+ // location
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = 0;
+ return ~0ull;
+ }
- return RequestIndex;
+ m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
+
+ return RequestIndex;
+ }
}
- Lock.ReleaseNow();
// Write request data to standalone file
@@ -491,7 +518,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
// The exact value of the entry is not important, we will use
// the size of the standalone file regardless when performing
// the read
- Entry.Length = std::numeric_limits<uint32_t>::max();
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = std::numeric_limits<uint32_t>::max();
}
++m_FileCount;
@@ -501,7 +529,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
if (Ec)
{
- Entry.Length = 0;
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = 0;
return ~0ull;
}
@@ -511,7 +540,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
}
catch (std::exception&)
{
- Entry.Length = 0;
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = 0;
return ~0ull;
}
}
@@ -529,7 +559,7 @@ RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath,
{
if (R.Offset != ~0u)
{
- MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length);
+ MaxChunkPosition = Max(MaxChunkPosition, R.GetOffset() + R.Length);
}
}
uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1;
@@ -547,6 +577,7 @@ RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath,
}
return m_Entries.size();
}
+
void
RecordedRequestsSegmentReader::EndRead()
{
@@ -571,9 +602,10 @@ RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutB
if (Entry.Offset != ~0u)
{
// Inline in block file
- uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
- uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
- OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
+ const uint64_t EntryOffset = Entry.GetOffset();
+ const uint32_t BlockIndex = gsl::narrow<uint32_t>((EntryOffset + Entry.Length) / RecordedRequestBlockSize);
+ const uint64_t ChunkOffset = EntryOffset - (BlockIndex * RecordedRequestBlockSize);
+ OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
return RequestInfo;
}
@@ -600,6 +632,8 @@ RecordedRequestsWriter::EnsureCurrentSegment()
{
bool StartNewSegment = false;
+ TimeSpan SegmentAge(DateTime::NowTicks() - m_CurrentWriter->GetStartTime().GetTicks());
+
if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount)
{
ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount);
@@ -615,6 +649,12 @@ RecordedRequestsWriter::EnsureCurrentSegment()
ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold);
StartNewSegment = true;
}
+ else if (SegmentAge >= SegmentTimeThreshold)
+ {
+ ZEN_DEBUG("starting new RPC recording segment due to age >= {}",
+ NiceTimeSpanMs(SegmentTimeThreshold.GetTicks() / TimeSpan::TicksPerMillisecond));
+ StartNewSegment = true;
+ }
if (StartNewSegment)
{
@@ -627,7 +667,7 @@ RecordedRequestsWriter::EnsureCurrentSegment()
const uint64_t SegmentIndex = m_FinishedSegments.size();
m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>();
- m_CurrentWriter->BeginWrite(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex);
+ m_CurrentWriter->BeginWrite(m_BasePath / MakeSegmentPath(SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex);
}
return *m_CurrentWriter;
@@ -654,8 +694,22 @@ RecordedRequestsWriter::EndWrite()
CommitCurrentSegment(_);
- // Emit some metadata alongside the recording
+ WriteRecordingMetadata();
+}
+
+uint64_t
+RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
+{
+ RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
+
+ const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer);
+
+ return Writer.GetBaseRequestIndex() + SegmentLocalIndex;
+}
+void
+RecordedRequestsWriter::WriteRecordingMetadata()
+{
try
{
DateTime EndTime = DateTime::Now();
@@ -702,16 +756,6 @@ RecordedRequestsWriter::EndWrite()
}
}
-uint64_t
-RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
-{
- RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
-
- const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer);
-
- return Writer.GetBaseRequestIndex() + SegmentLocalIndex;
-}
-
//////////////////////////////////////////////////////////////////////////
uint64_t
@@ -720,26 +764,89 @@ RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool In
m_InMemory = InMemory;
m_BasePath = BasePath;
- BasicFile InfoFile;
- InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead);
- CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll());
+ std::error_code Ec;
+ BasicFile InfoFile;
+ InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead, Ec);
+
+ if (!Ec)
+ {
+ try
+ {
+ CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll());
+
+ uint64_t TotalRequestCount = 0;
+ uint64_t MaxSegmentIndex = 0;
+
+ for (auto SegmentElement : CbInfo["segments"])
+ {
+ CbObjectView Segment = SegmentElement.AsObjectView();
+
+ const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(),
+ .BaseRequestIndex = Segment["base_index"sv].AsUInt64(),
+ .RequestCount = Segment["request_count"sv].AsUInt64(),
+ .RequestBytes = Segment["request_bytes"sv].AsUInt64(),
+ .StartTime = Segment["start_time"sv].AsDateTime(),
+ .EndTime = Segment["end_time"sv].AsDateTime()});
+
+ TotalRequestCount += Info.RequestCount;
+ MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
+ }
+
+ m_SegmentReaders.resize(MaxSegmentIndex + 1);
+
+ return TotalRequestCount;
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("could not read metadata file: {}", Ex.what());
+ }
+ }
+
+ ZEN_INFO("recovering segment info for '{}'", BasePath);
uint64_t TotalRequestCount = 0;
uint64_t MaxSegmentIndex = 0;
- for (auto SegmentElement : CbInfo["segments"])
+ try
+ {
+ for (int SegmentIndex = 0;; ++SegmentIndex)
+ {
+ const std::filesystem::path ZcbPath = BasePath / MakeSegmentPath(SegmentIndex) / "rpc_segment_info.zcb";
+ FileContents Fc = ReadFile(ZcbPath);
+
+ if (Fc.ErrorCode)
+ break;
+
+ if (IoBuffer SegmentInfoBuffer = Fc.Flatten())
+ {
+ CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer);
+
+ const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(),
+ .BaseRequestIndex = 0,
+ .RequestCount = Segment["entry_count"sv].AsUInt64(),
+ .RequestBytes = 0,
+ .StartTime = Segment["time_start"sv].AsDateTime(),
+ .EndTime = Segment["time_end"sv].AsDateTime()});
+
+ TotalRequestCount += Info.RequestCount;
+ MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
+ }
+ }
+ }
+ catch (std::exception&)
{
- CbObjectView Segment = SegmentElement.AsObjectView();
+ }
+
+ std::sort(begin(m_KnownSegments), end(m_KnownSegments), [](const auto& Lhs, const auto& Rhs) {
+ return Lhs.SegmentIndex < Rhs.SegmentIndex;
+ });
- const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(),
- .BaseRequestIndex = Segment["base_index"sv].AsUInt64(),
- .RequestCount = Segment["request_count"sv].AsUInt64(),
- .RequestBytes = Segment["request_bytes"sv].AsUInt64(),
- .StartTime = Segment["start_time"sv].AsDateTime(),
- .EndTime = Segment["end_time"sv].AsDateTime()});
+ uint64_t SegmentRequestOffset = 0;
- TotalRequestCount += Info.RequestCount;
- MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
+ for (SegmentInfo& Info : m_KnownSegments)
+ {
+ Info.BaseRequestIndex = SegmentRequestOffset;
+ SegmentRequestOffset += Info.RequestCount;
}
m_SegmentReaders.resize(MaxSegmentIndex + 1);
@@ -776,7 +883,7 @@ RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer)
if (!SegmentReaderPtr)
{
RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader;
- NewSegment->BeginRead(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), m_InMemory);
+ NewSegment->BeginRead(m_BasePath / MakeSegmentPath(SegmentIndex), m_InMemory);
SegmentReaderPtr.reset(NewSegment);
}
@@ -806,7 +913,6 @@ public:
DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
-private:
virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
{
return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
@@ -814,6 +920,7 @@ private:
virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
+private:
RecordedRequestsWriter m_RecordedRequests;
};
@@ -822,23 +929,41 @@ class DiskRequestReplayer : public IRpcRequestReplayer
public:
DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
{
- m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory);
+ m_RequestCount = m_RequestReader.BeginRead(BasePath, InMemory);
}
- virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
+ virtual ~DiskRequestReplayer() { m_RequestReader.EndRead(); }
- static bool IsCompatible(const std::filesystem::path& BasePath) { return std::filesystem::exists(BasePath / "rpc_recording_info.zcb"); }
+ static bool IsCompatible(const std::filesystem::path& BasePath)
+ {
+ if (std::filesystem::exists(BasePath / "rpc_recording_info.zcb"))
+ {
+ return true;
+ }
+
+ const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0);
+
+ if (std::filesystem::exists(SegmentZero / "rpc_segment_info.zcb") && std::filesystem::exists(SegmentZero / "index.bin"))
+ {
+ // top-level metadata is missing, possibly because of premature exit
+ // on the recording side
+
+ return true;
+ }
+
+ return false;
+ }
-private:
virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
{
- return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
+ return m_RequestReader.ReadRequest(RequestIndex, OutBuffer);
}
virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
+private:
std::uint64_t m_RequestCount;
- RecordedRequestsReader m_RequestBuffer;
+ RecordedRequestsReader m_RequestReader;
};
} // namespace zen::cache::v2
@@ -866,4 +991,66 @@ MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
}
}
+#if ZEN_WITH_TESTS
+
+void
+rpcrecord_forcelink()
+{
+}
+
+TEST_SUITE_BEGIN("rpc.recording");
+
+TEST_CASE("rpc.record")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto Path = TempDir.Path();
+
+ const Oid SessionId = GetSessionId();
+
+ using namespace std::literals;
+
+ {
+ cache::v2::DiskRequestRecorder Recorder{Path};
+
+ for (int i = 0; i < 1000; ++i)
+ {
+ RecordedRequestInfo RequestInfo{.ContentType = ZenContentType::kCbObject,
+ .AcceptType = ZenContentType::kCbObject,
+ .SessionId = SessionId};
+
+ CbObjectWriter RequestPayload;
+ RequestPayload << "test"sv << true;
+ RequestPayload << "index"sv << i;
+ CbObject Req = RequestPayload.Save();
+ IoBuffer RequestBuffer = Req.GetBuffer().AsIoBuffer();
+
+ const uint64_t Index = Recorder.RecordRequest(RequestInfo, RequestBuffer);
+
+ CHECK(Index == i);
+ }
+ }
+
+ {
+ cache::v2::DiskRequestReplayer Replayer{Path, false};
+
+ for (int i = 0; i < 1000; ++i)
+ {
+ IoBuffer RequestBuffer;
+ RecordedRequestInfo RequestInfo = Replayer.GetRequest(i, RequestBuffer);
+
+ CHECK(RequestInfo.AcceptType == ZenContentType::kCbObject);
+ CHECK(RequestInfo.ContentType == ZenContentType::kCbObject);
+ CHECK(RequestInfo.SessionId == SessionId);
+
+ CbObject Req = LoadCompactBinaryObject(RequestBuffer);
+ CHECK_EQ(Req["index"sv].AsInt32(), i);
+ CHECK_EQ(Req["test"sv].AsBool(), true);
+ }
+ }
+}
+
+TEST_SUITE_END();
+
+#endif
+
} // namespace zen::cache
diff --git a/src/zenutil/include/zenutil/cache/rpcrecording.h b/src/zenutil/include/zenutil/cache/rpcrecording.h
index fd5df26ad..ab9b92dd3 100644
--- a/src/zenutil/include/zenutil/cache/rpcrecording.h
+++ b/src/zenutil/include/zenutil/cache/rpcrecording.h
@@ -4,6 +4,9 @@
#include <zencore/compositebuffer.h>
#include <zencore/iobuffer.h>
+#include <zencore/uid.h>
+
+#include <compare>
namespace zen::cache {
@@ -13,6 +16,7 @@ struct RecordedRequestInfo
ZenContentType AcceptType;
Oid SessionId;
+ inline std::strong_ordering operator<=>(const RecordedRequestInfo& Rhs) const = default;
static const RecordedRequestInfo NullRequest;
};
@@ -37,4 +41,6 @@ public:
std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath);
std::unique_ptr<cache::IRpcRequestReplayer> MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory);
+void rpcrecord_forcelink();
+
} // namespace zen::cache
diff --git a/src/zenutil/logging.cpp b/src/zenutil/logging.cpp
index fedfdc7e8..64230ea81 100644
--- a/src/zenutil/logging.cpp
+++ b/src/zenutil/logging.cpp
@@ -23,6 +23,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
+static bool g_IsLoggingInitialized;
spdlog::sink_ptr g_FileSink;
spdlog::sink_ptr
@@ -83,6 +84,14 @@ BeginInitializeLogging(const LoggingOptions& LogOptions)
/* max size */ 128 * 1024 * 1024,
/* max files */ 16,
/* rotate on open */ true);
+ if (LogOptions.AbsLogFile.extension() == ".json")
+ {
+ FileSink->set_formatter(std::make_unique<logging::json_formatter>(LogOptions.LogId));
+ }
+ else
+ {
+ FileSink->set_formatter(std::make_unique<logging::full_formatter>(LogOptions.LogId)); // this will have a date prefix
+ }
}
std::set_terminate([]() { ZEN_CRITICAL("Program exited abnormally via std::terminate()"); });
@@ -173,31 +182,25 @@ FinishInitializeLogging(const LoggingOptions& LogOptions)
spdlog::set_formatter(
std::make_unique<logging::full_formatter>(LogOptions.LogId, std::chrono::system_clock::now())); // default to duration prefix
- if (g_FileSink)
- {
- if (LogOptions.AbsLogFile.extension() == ".json")
- {
- g_FileSink->set_formatter(std::make_unique<logging::json_formatter>(LogOptions.LogId));
- }
- else
- {
- g_FileSink->set_formatter(std::make_unique<logging::full_formatter>(LogOptions.LogId)); // this will have a date prefix
- }
- }
-
const std::string StartLogTime = zen::DateTime::Now().ToIso8601();
spdlog::apply_all([&](auto Logger) { Logger->info("log starting at {}", StartLogTime); });
+
+ g_IsLoggingInitialized = true;
}
void
ShutdownLogging()
{
- g_FileSink.reset();
+ if (g_IsLoggingInitialized)
+ {
+ auto DefaultLogger = zen::logging::Default();
+ ZEN_LOG_INFO(DefaultLogger, "log ending at {}", zen::DateTime::Now().ToIso8601());
+ }
- auto DefaultLogger = zen::logging::Default();
- ZEN_LOG_INFO(DefaultLogger, "log ending at {}", zen::DateTime::Now().ToIso8601());
zen::logging::ShutdownLogging();
+
+ g_FileSink.reset();
}
} // namespace zen
diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp
index 00db5a25b..eba3613f1 100644
--- a/src/zenutil/zenutil.cpp
+++ b/src/zenutil/zenutil.cpp
@@ -6,6 +6,7 @@
# include <zenutil/basicfile.h>
# include <zenutil/process.h>
+# include <zenutil/cache/rpcrecording.h>
namespace zen {
@@ -14,6 +15,7 @@ zenutil_forcelinktests()
{
basicfile_forcelink();
process_forcelink();
+ cache::rpcrecord_forcelink();
}
} // namespace zen