diff options
36 files changed, 1701 insertions, 835 deletions
diff --git a/prepare_commit.bat b/prepare_commit.bat index 97bcea6ed..8e621c96c 100644 --- a/prepare_commit.bat +++ b/prepare_commit.bat @@ -1 +1 @@ -python %~dp0\\scripts\formatcode.py +python %~dp0scripts\formatcode.py %* diff --git a/scripts/deploybuild.py b/scripts/deploybuild.py index 4327d14f7..37518fd0c 100644 --- a/scripts/deploybuild.py +++ b/scripts/deploybuild.py @@ -30,12 +30,10 @@ origcwd = os.getcwd() parser = argparse.ArgumentParser(description='Deploy a zen build to an UE tree') parser.add_argument("root", help="Path to an UE5 root directory") parser.add_argument("--sentry", action="store_true", help="Whether to upload symobls to Sentry") -parser.add_argument("--xmake", action="store_true", help="Build with XMake") args = parser.parse_args() engineroot = args.root upload_symbols = args.sentry -use_xmake = args.xmake if not os.path.isfile(os.path.join(engineroot, "RunUAT.bat")): print(f"{Fore.RED}Not a valid UE5 engine root directory: '{engineroot}'") @@ -53,16 +51,9 @@ jazz_print("Zen root:", zenroot) # Build fresh binaries try: - if use_xmake: - subprocess.run(["xmake.exe", "config", "-p", "windows", "-a", "x64", "-m", "release"], check=True) - build_cmd = ["xmake.exe", "build", "--rebuild", "zenserver"] - build_output_dir = r'build\windows\x64\release' - else: - vs_path = vswhere.get_latest_path() # can also specify prerelease=True - jazz_print("BUILDING CODE", f"using VS root: {vs_path}") - devenv_path = os.path.join(vs_path, "Common7\\IDE\\devenv.com") - build_cmd = [devenv_path, "/build", "Release", "vsxmake2019\\zen.sln"] - build_output_dir = r'x64\Release' + subprocess.run(["xmake.exe", "config", "-p", "windows", "-a", "x64", "-m", "release"], check=True) + build_cmd = ["xmake.exe", "build", "--rebuild", "zenserver"] + build_output_dir = r'build\windows\x64\release' subprocess.run(build_cmd, check=True) except: diff --git a/scripts/formatcode.py b/scripts/formatcode.py index 1a214380d..423d2b4e7 100644 --- a/scripts/formatcode.py +++ b/scripts/formatcode.py @@ -1,5 +1,12 @@ +import argparse import os import fileinput +import pathlib +import re + +match_expressions = [] +valid_extensions = [] +root_dir = '' def is_header_missing(f): with open(f) as reader: @@ -20,9 +27,11 @@ def scan_tree(root): for entry in dirs: if entry.is_dir(): scan_tree(os.path.join(root, entry.name)) - elif entry.name.endswith(".cpp") or entry.name.endswith(".h"): - print("... formatting: %s"%(entry.name)) - full_path = os.path.join(root, entry.name) + continue + full_path = os.path.join(root, entry.name) + relative_root_path = os.path.relpath(full_path, start=root_dir) + if is_matching_filename(relative_root_path): + print("... formatting: {}".format(relative_root_path)) files.append(full_path) if is_header_missing(full_path): header_files.append(full_path) @@ -38,12 +47,65 @@ def scan_zen(root): if entry.is_dir() and entry.name.startswith("zen"): scan_tree(os.path.join(root, entry.name)) -while True: - if (os.path.isfile(".clang-format")): - scan_zen(".") - quit() - else: - cwd = os.getcwd() - if os.path.dirname(cwd) == cwd: +def is_matching_filename(relative_root_path): + global match_expressions + global root_dir + global valid_extensions + + if os.path.splitext(relative_root_path)[1].lower() not in valid_extensions: + return False + if not match_expressions: + return True + relative_root_path = relative_root_path.replace('\\', '/') + for regex in match_expressions: + if regex.fullmatch(relative_root_path): + return True + return False + +def parse_match_expressions(wildcards, matches): + global match_expressions + global valid_extensions + + valid_extensions = ['.cpp', '.h'] + + for wildcard in wildcards: + regex = wildcard.replace('*', '%FORMAT_STAR%').replace('\\', '/') + regex = re.escape(regex) + regex = '.*' + regex.replace('%FORMAT_STAR%', '.*') + '.*' + try: + match_expressions.append(re.compile(regex, re.IGNORECASE)) + except Exception as ex: + print('Could not parse input filename expression \'{}\': {}'.format(wildcard, str(ex))) + quit() + for regex in matches: + try: + match_expressions.append(re.compile(regex, re.IGNORECASE)) + except Exception as ex: + print('Could not parse input --match expression \'{}\': {}'.format(regex, str(ex))) quit() - os.chdir("..") + +def _main(): + global root_dir + + parser = argparse.ArgumentParser() + parser.add_argument('filenames', nargs='*', help="Match text for filenames. If fullpath contains text it is a match, " +\ + "* is a wildcard. Directory separators are matched by either / or \\. Case insensitive.") + parser.add_argument('--match', action='append', default=[], help="Match regular expression for filenames. " +\ + "Relative path from the root zen directory must be a complete match. Directory separators are matched only by /. Case insensitive.") + options = parser.parse_args() + parse_match_expressions(options.filenames, options.match) + root_dir = pathlib.Path(__file__).parent.parent.resolve() + + while True: + if (os.path.isfile(".clang-format")): + scan_zen(".") + quit() + else: + cwd = os.getcwd() + if os.path.dirname(cwd) == cwd: + quit() + os.chdir("..") + + +if __name__ == '__main__': + _main()
\ No newline at end of file diff --git a/scripts/generateprojects.bat b/scripts/generateprojects.bat new file mode 100644 index 000000000..cc6732aaa --- /dev/null +++ b/scripts/generateprojects.bat @@ -0,0 +1 @@ +xmake project -k vsxmake2022 -y diff --git a/scripts/upload_syms.bat b/scripts/upload_syms.bat new file mode 100644 index 000000000..663d64e06 --- /dev/null +++ b/scripts/upload_syms.bat @@ -0,0 +1,3 @@ +rem This is a temporary hack until everyone has access to Sentry + +scripts\sentry-cli upload-dif --org to --project zen-server \ue5-main\Engine\Binaries\Win64\zenserver.exe \ue5-main\engine\Binaries\Win64\zenserver.pdb diff --git a/thirdparty/trace/trace.h b/thirdparty/trace/trace.h index fb538b3af..caa862ffe 100644 --- a/thirdparty/trace/trace.h +++ b/thirdparty/trace/trace.h @@ -347,6 +347,7 @@ struct FInitializeDesc };
typedef void* AllocFunc(SIZE_T, uint32);
typedef void FreeFunc(void*, SIZE_T);
+typedef void ChannelIterFunc(const ANSICHAR*, bool, void*);
struct FStatistics
{
uint64 BytesSent;
@@ -357,6 +358,7 @@ struct FStatistics };
UE_TRACE_API void SetMemoryHooks(AllocFunc Alloc, FreeFunc Free) UE_TRACE_IMPL();
UE_TRACE_API void Initialize(const FInitializeDesc& Desc) UE_TRACE_IMPL();
+UE_TRACE_API void StartWorkerThread() UE_TRACE_IMPL();
UE_TRACE_API void Shutdown() UE_TRACE_IMPL();
UE_TRACE_API void Update() UE_TRACE_IMPL();
UE_TRACE_API void GetStatistics(FStatistics& Out) UE_TRACE_IMPL();
@@ -366,6 +368,7 @@ UE_TRACE_API bool IsTracing() UE_TRACE_IMPL(false); UE_TRACE_API bool Stop() UE_TRACE_IMPL(false);
UE_TRACE_API bool IsChannel(const TCHAR* ChanneName) UE_TRACE_IMPL(false);
UE_TRACE_API bool ToggleChannel(const TCHAR* ChannelName, bool bEnabled) UE_TRACE_IMPL(false);
+UE_TRACE_API void EnumerateChannels(ChannelIterFunc IterFunc, void* User);
UE_TRACE_API void ThreadRegister(const TCHAR* Name, uint32 SystemId, int32 SortHint) UE_TRACE_IMPL();
UE_TRACE_API void ThreadGroupBegin(const TCHAR* Name) UE_TRACE_IMPL();
UE_TRACE_API void ThreadGroupEnd() UE_TRACE_IMPL();
@@ -388,6 +391,7 @@ UE_TRACE_API void ThreadGroupEnd() UE_TRACE_IMPL(); #if UE_TRACE_ENABLED
namespace UE {
namespace Trace {
+typedef void ChannelIterFunc(const ANSICHAR*, bool, void*);
/*
A named channel which can be used to filter trace events. Channels can be
combined using the '|' operator which allows expressions like
@@ -421,6 +425,7 @@ public: static bool Toggle(const ANSICHAR* ChannelName, bool bEnabled);
static void ToggleAll(bool bEnabled);
static FChannel* FindChannel(const ANSICHAR* ChannelName);
+ static void EnumerateChannels(ChannelIterFunc Func, void* User);
bool Toggle(bool bEnabled);
bool IsEnabled() const;
explicit operator bool () const;
@@ -2043,7 +2048,7 @@ FChannel* FChannel::FindChannel(const ANSICHAR* ChannelName) };
for (FChannel* Channel : ChannelLists)
{
- for (; Channel != nullptr; Channel = (FChannel*)(Channel->Next))
+ for (; Channel != nullptr; Channel = Channel->Next)
{
if (Channel->Name.Hash == ChannelNameHash)
{
@@ -2053,10 +2058,26 @@ FChannel* FChannel::FindChannel(const ANSICHAR* ChannelName) }
return nullptr;
}
+void FChannel::EnumerateChannels(ChannelIterFunc Func, void* User)
+{
+ using namespace Private;
+ FChannel* ChannelLists[] =
+ {
+ AtomicLoadAcquire(&GNewChannelList),
+ AtomicLoadAcquire(&GHeadChannel),
+ };
+ for (FChannel* Channel : ChannelLists)
+ {
+ for (; Channel != nullptr; Channel = Channel->Next)
+ {
+ Func(Channel->Name.Ptr, Channel->IsEnabled(), User);
+ }
+ }
+}
bool FChannel::Toggle(bool bEnabled)
{
using namespace Private;
- AtomicAddRelaxed(&Enabled, bEnabled ? 1 : -1);
+ AtomicStoreRelaxed(&Enabled, bEnabled ? 1 : -1);
UE_TRACE_LOG(Trace, ChannelToggle, TraceLogChannel)
<< ChannelToggle.Id(Name.Hash)
<< ChannelToggle.IsEnabled(IsEnabled());
@@ -3055,6 +3076,7 @@ namespace Private {
void Writer_MemorySetHooks(AllocFunc, FreeFunc);
void Writer_Initialize(const FInitializeDesc&);
+void Writer_WorkerCreate();
void Writer_Shutdown();
void Writer_Update();
bool Writer_SendTo(const ANSICHAR*, uint32);
@@ -3132,6 +3154,14 @@ bool ToggleChannel(const TCHAR* ChannelName, bool bEnabled) ToAnsiCheap(ChannelNameA, ChannelName);
return FChannel::Toggle(ChannelNameA, bEnabled);
}
+void EnumerateChannels(ChannelIterFunc IterFunc, void* User)
+{
+ FChannel::EnumerateChannels(IterFunc, User);
+}
+void StartWorkerThread()
+{
+ Private::Writer_WorkerCreate();
+}
UE_TRACE_CHANNEL_EXTERN(TraceLogChannel)
UE_TRACE_EVENT_BEGIN($Trace, ThreadInfo, NoSync|Important)
UE_TRACE_EVENT_FIELD(uint32, ThreadId)
@@ -3533,14 +3563,18 @@ static bool Writer_UpdateConnection() Writer_DescribeEvents();
Writer_CacheOnConnect();
Writer_TailOnConnect();
- Writer_SendSync();
GSyncPacketCountdown = GNumSyncPackets;
return true;
}
static UPTRINT GWorkerThread; // = 0;
static volatile bool GWorkerThreadQuit; // = false;
+static volatile unsigned int GUpdateInProgress = 1; // Don't allow updates until initalized
static void Writer_WorkerUpdate()
{
+ if (!AtomicCompareExchangeAcquire(&GUpdateInProgress, 1u, 0u))
+ {
+ return;
+ }
Writer_UpdateControl();
Writer_UpdateConnection();
Writer_DescribeAnnounce();
@@ -3554,6 +3588,7 @@ static void Writer_WorkerUpdate() Writer_FlushSendBuffer();
}
#endif
+ AtomicExchangeRelease(&GUpdateInProgress, 0u);
}
static void Writer_WorkerThread()
{
@@ -3565,7 +3600,7 @@ static void Writer_WorkerThread() ThreadSleep(SleepMs);
}
}
-static void Writer_WorkerCreate()
+void Writer_WorkerCreate()
{
if (GWorkerThread)
{
@@ -3664,6 +3699,7 @@ void Writer_Initialize(const FInitializeDesc& Desc) {
Writer_WorkerCreate();
}
+ GUpdateInProgress = 0;
}
void Writer_Shutdown()
{
@@ -3709,7 +3745,7 @@ bool Writer_WriteTo(const ANSICHAR* Path) }
bool Writer_IsTracing()
{
- return (GDataHandle != 0);
+ return GDataHandle != 0 || GPendingDataHandle != 0;
}
bool Writer_Stop()
{
@@ -4359,7 +4395,13 @@ UPTRINT TcpSocketConnect(const ANSICHAR* Host, uint16 Port) {
struct FAddrInfoPtr
{
- ~FAddrInfoPtr() { freeaddrinfo(Value); }
+ ~FAddrInfoPtr()
+ {
+ if (Value != nullptr)
+ {
+ freeaddrinfo(Value);
+ }
+ }
addrinfo* operator -> () { return Value; }
addrinfo** operator & () { return &Value; }
addrinfo* Value;
@@ -4371,6 +4413,7 @@ UPTRINT TcpSocketConnect(const ANSICHAR* Host, uint16 Port) Hints.ai_protocol = IPPROTO_TCP;
if (getaddrinfo(Host, nullptr, &Hints, &Info))
{
+ Info.Value = nullptr;
return 0;
}
if (&Info == nullptr)
@@ -4962,6 +5005,8 @@ void Writer_ShutdownSharedBuffers() // Copyright Epic Games, Inc. All Rights Reserved.
+// {{{1 amalgamation-tail //////////////////////////////////////////////////////
+
#if PLATFORM_WINDOWS
# pragma warning(pop)
#endif
@@ -4978,6 +5023,10 @@ void Writer_ShutdownSharedBuffers() #endif // TRACE_UE_COMPAT_LAYER
+
+
+// {{{1 library-setup //////////////////////////////////////////////////////////
+
#include <string_view>
#define TRACE_EVENT_DEFINE UE_TRACE_EVENT_DEFINE
@@ -4998,15 +5047,108 @@ namespace trace = UE::Trace; #define TRACE_PRIVATE_CONCAT(x, y) TRACE_PRIVATE_CONCAT_(x, y)
#define TRACE_PRIVATE_UNIQUE_VAR(name) TRACE_PRIVATE_CONCAT($trace_##name, __LINE__)
+
+
+// {{{1 session-header /////////////////////////////////////////////////////////
+
+namespace UE {
+namespace Trace {
+
+enum class Build
+{
+ Unknown,
+ Debug,
+ DebugGame,
+ Development,
+ Shipping,
+ Test
+};
+
+void DescribeSession(
+ const std::string_view& AppName,
+ Build Variant=Build::Unknown,
+ const std::string_view& CommandLine="",
+ const std::string_view& BuildVersion="unknown_ver");
+
+} // namespace Trace
+} // namespace UE
+
+// {{{1 session-source /////////////////////////////////////////////////////////
+
+#if TRACE_IMPLEMENT
+
+namespace UE {
+namespace Trace {
+namespace Private {
+
+TRACE_EVENT_BEGIN(Diagnostics, Session2, NoSync|Important)
+ TRACE_EVENT_FIELD(uint8, ConfigurationType)
+ TRACE_EVENT_FIELD(trace::AnsiString, AppName)
+ TRACE_EVENT_FIELD(trace::AnsiString, BuildVersion)
+ TRACE_EVENT_FIELD(trace::AnsiString, Platform)
+ TRACE_EVENT_FIELD(trace::AnsiString, CommandLine)
+TRACE_EVENT_END()
+
+} // namespace Private
+
+////////////////////////////////////////////////////////////////////////////////
+void DescribeSession(
+ const std::string_view& AppName,
+ Build Variant,
+ const std::string_view& CommandLine,
+ const std::string_view& BuildVersion)
+{
+ using namespace Private;
+ using namespace std::literals;
+
+ std::string_view Platform;
+#if PLATFORM_WINDOWS
+ Platform = "Windows"sv;
+#elif PLATFORM_UNIX
+ Platform = "Linux"sv;
+#elif PLATFORM_MAC
+ Platform = "Mac"sv;
+#else
+ Platform = "Unknown"sv;
+#endif
+
+ int DataSize = 0;
+ DataSize += AppName.size();
+ DataSize += BuildVersion.size();
+ DataSize += Platform.size();
+ DataSize += CommandLine.size();
+
+ TRACE_LOG(Diagnostics, Session2, true, DataSize)
+ << Session2.AppName(AppName.data(), int(AppName.size()))
+ << Session2.BuildVersion(BuildVersion.data(), int(BuildVersion.size()))
+ << Session2.Platform(Platform.data(), int(Platform.size()))
+ << Session2.CommandLine(CommandLine.data(), int(CommandLine.size()))
+ << Session2.ConfigurationType(uint8(Variant));
+}
+
+} // namespace Trace
+} // namespace UE
+
+#endif // TRACE_IMPLEMENT
+
+
+
+// {{{1 cpu-header /////////////////////////////////////////////////////////////
+
TRACE_CHANNEL_EXTERN(CpuChannel)
namespace UE {
namespace Trace {
+enum CpuScopeFlags : int
+{
+ CpuFlush = 1 << 0,
+};
+
struct TraceCpuScope
{
~TraceCpuScope();
- void Enter(int ScopeId);
+ void Enter(int ScopeId, int Flags=0);
int _ScopeId = 0;
};
@@ -5015,54 +5157,44 @@ int ScopeNew(const std::string_view& Name); } // namespace Trace
} // namespace UE
-#define TRACE_CPU_SCOPE(name) \
- using namespace std::literals; \
+#define TRACE_CPU_SCOPE(name, ...) \
trace::TraceCpuScope TRACE_PRIVATE_UNIQUE_VAR(cpu_scope); \
if (CpuChannel) { \
+ using namespace std::literals; \
static int TRACE_PRIVATE_UNIQUE_VAR(scope_id); \
if (0 == TRACE_PRIVATE_UNIQUE_VAR(scope_id)) \
TRACE_PRIVATE_UNIQUE_VAR(scope_id) = trace::ScopeNew(name##sv); \
- TRACE_PRIVATE_UNIQUE_VAR(cpu_scope).Enter(TRACE_PRIVATE_UNIQUE_VAR(scope_id)); \
+ TRACE_PRIVATE_UNIQUE_VAR(cpu_scope).Enter(TRACE_PRIVATE_UNIQUE_VAR(scope_id), ##__VA_ARGS__); \
} \
do {} while (0)
+// {{{1 cpu-source /////////////////////////////////////////////////////////////
+
#if TRACE_IMPLEMENT
-////////////////////////////////////////////////////////////////////////////////
TRACE_CHANNEL_DEFINE(CpuChannel)
+namespace UE {
+namespace Trace {
+namespace Private {
+
TRACE_EVENT_BEGIN(CpuProfiler, EventSpec, NoSync|Important)
TRACE_EVENT_FIELD(uint32, Id)
- TRACE_EVENT_FIELD(UE::Trace::AnsiString, Name)
+ TRACE_EVENT_FIELD(trace::AnsiString, Name)
TRACE_EVENT_END()
TRACE_EVENT_BEGIN(CpuProfiler, EventBatch, NoSync)
TRACE_EVENT_FIELD(uint8[], Data)
TRACE_EVENT_END()
-namespace UE {
-namespace Trace {
-namespace Private {
-
+////////////////////////////////////////////////////////////////////////////////
static int32_t encode32_7bit(int32_t value, void* __restrict out)
{
// Calculate the number of bytes
-#if 0
- int32_t msb_test = (value << sizeof(value)) | 0x10;
-#if _MSC_VER
- unsigned long bit_index;
- _BitScanReverse(&bit_index, msb_test);
-#else
- int32_t leading_zeros = __builtin_clz(msb_test);
- int32_t bit_index = ((sizeof(value) * 8) - 1) - leading_zeros;
-#endif
- int32_t length = (bit_index + 3) / 7;
-#else
int32_t length = 1;
length += (value >= (1 << 7));
length += (value >= (1 << 14));
length += (value >= (1 << 21));
-#endif
// Add a gap every eigth bit for the continuations
int32_t ret = value;
@@ -5079,20 +5211,10 @@ static int32_t encode32_7bit(int32_t value, void* __restrict out) return length;
}
+////////////////////////////////////////////////////////////////////////////////
static int32_t encode64_7bit(int64_t value, void* __restrict out)
{
// Calculate the output length
-#if 0
- int64_t msb_test = (value << sizeof(value)) | 0x100ull;
-#if _MSC_VER
- unsigned long bit_index;
- _BitScanReverse64(&bit_index, msb_test);
-#else
- int32_t leading_zeros = __builtin_clzll(msb_test);
- int32_t bit_index = ((sizeof(value) * 8) - 1) - leading_zeros;
-#endif
- int32_t length = (bit_index - 1) / 7;
-#else
uint32_t length = 1;
length += (value >= (1ll << 7));
length += (value >= (1ll << 14));
@@ -5101,7 +5223,6 @@ static int32_t encode64_7bit(int64_t value, void* __restrict out) length += (value >= (1ll << 35));
length += (value >= (1ll << 42));
length += (value >= (1ll << 49));
-#endif
// Add a gap every eigth bit for the continuations
int64_t ret = value;
@@ -5123,13 +5244,13 @@ static int32_t encode64_7bit(int64_t value, void* __restrict out) class ThreadBuffer
{
public:
- static void Enter(uint64_t Timestamp, uint32_t ScopeId) { TlsInstance.EnterImpl(Timestamp, ScopeId); }
- static void Leave(uint64_t Timestamp) { TlsInstance.LeaveImpl(Timestamp); }
+ static void Enter(uint64_t Timestamp, uint32_t ScopeId, int Flags);
+ static void Leave(uint64_t Timestamp);
private:
~ThreadBuffer();
void Flush(bool Force);
- void EnterImpl(uint64_t Timestamp, uint32_t ScopeId);
+ void EnterImpl(uint64_t Timestamp, uint32_t ScopeId, int Flags);
void LeaveImpl(uint64_t Timestamp);
enum
{
@@ -5148,6 +5269,18 @@ private: thread_local ThreadBuffer ThreadBuffer::TlsInstance;
////////////////////////////////////////////////////////////////////////////////
+inline void ThreadBuffer::Enter(uint64_t Timestamp, uint32_t ScopeId, int Flags)
+{
+ TlsInstance.EnterImpl(Timestamp, ScopeId, Flags);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+inline void ThreadBuffer::Leave(uint64_t Timestamp)
+{
+ TlsInstance.LeaveImpl(Timestamp);
+}
+
+////////////////////////////////////////////////////////////////////////////////
ThreadBuffer::~ThreadBuffer()
{
Flush(true);
@@ -5156,6 +5289,8 @@ ThreadBuffer::~ThreadBuffer() ////////////////////////////////////////////////////////////////////////////////
void ThreadBuffer::Flush(bool Force)
{
+ using namespace Private;
+
if (!Force && (Cursor <= (Buffer + BufferSize - Overflow)))
return;
@@ -5167,14 +5302,15 @@ void ThreadBuffer::Flush(bool Force) }
////////////////////////////////////////////////////////////////////////////////
-void ThreadBuffer::EnterImpl(uint64_t Timestamp, uint32_t ScopeId)
+void ThreadBuffer::EnterImpl(uint64_t Timestamp, uint32_t ScopeId, int Flags)
{
Timestamp -= PrevTimestamp;
PrevTimestamp += Timestamp;
Cursor += encode64_7bit((Timestamp) << 1 | EnterLsb, Cursor);
Cursor += encode32_7bit(ScopeId, Cursor);
- Flush(false);
+ bool ShouldFlush = (Flags & CpuScopeFlags::CpuFlush);
+ Flush(ShouldFlush);
}
////////////////////////////////////////////////////////////////////////////////
@@ -5194,8 +5330,10 @@ void ThreadBuffer::LeaveImpl(uint64_t Timestamp) ////////////////////////////////////////////////////////////////////////////////
int ScopeNew(const std::string_view& Name)
{
+ using namespace Private;
+
static int volatile NextSpecId = 1;
- int SpecId = Private::AtomicAddRelaxed(&NextSpecId, 1);
+ int SpecId = AtomicAddRelaxed(&NextSpecId, 1);
uint32 NameSize = uint32(Name.size());
TRACE_LOG(CpuProfiler, EventSpec, true, NameSize)
@@ -5220,15 +5358,124 @@ TraceCpuScope::~TraceCpuScope() }
////////////////////////////////////////////////////////////////////////////////
-void TraceCpuScope::Enter(int ScopeId)
+void TraceCpuScope::Enter(int ScopeId, int Flags)
{
using namespace Private;
_ScopeId = ScopeId;
uint64 Timestamp = TimeGetTimestamp();
- ThreadBuffer::Enter(Timestamp, ScopeId);
+ ThreadBuffer::Enter(Timestamp, ScopeId, Flags);
+}
+
+} // namespace Trace
+} // namespace UE
+
+#endif // TRACE_IMPLEMENT
+
+
+
+// {{{1 log-header /////////////////////////////////////////////////////////////
+
+TRACE_CHANNEL_EXTERN(LogChannel)
+
+namespace UE {
+namespace Trace {
+namespace Private {
+
+void LogMessageImpl(int Id, const void* ParamBuffer, int ParamSize);
+int LogMessageNew(const std::string_view& Format, const std::string_view& File, int Line);
+
+template <typename... ARGS>
+void LogMessage(int Id, ARGS&&... Args)
+{
+ LogMessageImpl(Id, nullptr, 0);
+}
+
+} // namespace Private
+} // namespace Trace
+} // namespace UE
+
+#define TRACE_LOG_MESSAGE(format, ...) \
+ if (LogChannel) { \
+ using namespace std::literals; \
+ static int message_id; \
+ if (message_id == 0) \
+ message_id = trace::Private::LogMessageNew( \
+ format##sv, \
+ TRACE_PRIVATE_CONCAT(__FILE__, sv), \
+ __LINE__); \
+ trace::Private::LogMessage(message_id, ##__VA_ARGS__); \
+ } \
+ do {} while (0)
+
+// {{{1 log-source /////////////////////////////////////////////////////////////
+
+#if TRACE_IMPLEMENT
+
+TRACE_CHANNEL_DEFINE(LogChannel)
+
+namespace UE {
+namespace Trace {
+namespace Private {
+
+#if 0
+TRACE_EVENT_BEGIN(Logging, LogCategory, NoSync|Important)
+ TRACE_EVENT_FIELD(const void*, CategoryPointer)
+ TRACE_EVENT_FIELD(uint8, DefaultVerbosity)
+ TRACE_EVENT_FIELD(UE::Trace::AnsiString, Name)
+TRACE_EVENT_END()
+#endif
+
+TRACE_EVENT_BEGIN(Logging, LogMessageSpec, NoSync|Important)
+ TRACE_EVENT_FIELD(uint32, LogPoint)
+ //TRACE_EVENT_FIELD(uint16, CategoryPointer)
+ TRACE_EVENT_FIELD(uint16, Line)
+ TRACE_EVENT_FIELD(UE::Trace::AnsiString, FileName)
+ TRACE_EVENT_FIELD(UE::Trace::AnsiString, FormatString)
+TRACE_EVENT_END()
+
+TRACE_EVENT_BEGIN(Logging, LogMessage, NoSync)
+ TRACE_EVENT_FIELD(uint32, LogPoint)
+ TRACE_EVENT_FIELD(uint64, Cycle)
+ TRACE_EVENT_FIELD(uint8[], FormatArgs)
+TRACE_EVENT_END()
+
+////////////////////////////////////////////////////////////////////////////////
+void LogMessageImpl(int Id, const void* ParamBuffer, int ParamSize)
+{
+ (void)ParamBuffer;
+ (void)ParamSize;
+
+ uint64 Timestamp = TimeGetTimestamp();
+
+ TRACE_LOG(Logging, LogMessage, true)
+ << LogMessage.LogPoint(Id)
+ << LogMessage.Cycle(Timestamp);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int LogMessageNew(
+ const std::string_view& Format,
+ const std::string_view& File,
+ int Line)
+{
+ static int volatile NextId = 1;
+ int Id = AtomicAddRelaxed(&NextId, 1);
+
+ int DataSize = 0;
+ DataSize += Format.size();
+ DataSize += File.size();
+
+ TRACE_LOG(Logging, LogMessageSpec, true, DataSize)
+ << LogMessageSpec.LogPoint(Id)
+ << LogMessageSpec.Line(uint16(Line))
+ << LogMessageSpec.FileName(File.data(), int(File.size()))
+ << LogMessageSpec.FormatString(Format.data(), int(Format.size()));
+
+ return Id;
}
+} // namespace Private
} // namespace Trace
} // namespace UE
diff --git a/vs-chromium-project.txt b/vs-chromium-project.txt index 0a1075662..1e948baad 100644 --- a/vs-chromium-project.txt +++ b/vs-chromium-project.txt @@ -7,3 +7,5 @@ vcpkg_installed/ x64/ *.suo **/x64/ +.test/ +vsxmake*/ diff --git a/zen/cmds/top.cpp b/zen/cmds/top.cpp index 315d8cb38..f5b9d654a 100644 --- a/zen/cmds/top.cpp +++ b/zen/cmds/top.cpp @@ -43,7 +43,7 @@ TopCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) { - ZEN_CONSOLE("{:5} {:6} {:24}", Entry.ListenPort, Entry.Pid, Entry.GetSessionId()); + ZEN_CONSOLE("{:5} {:6} {:24}", Entry.EffectiveListenPort, Entry.Pid, Entry.GetSessionId()); }); zen::Sleep(1000); @@ -78,7 +78,7 @@ PsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return 0; } - State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) { ZEN_CONSOLE("Port {} : pid {}", Entry.ListenPort, Entry.Pid); }); + State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) { ZEN_CONSOLE("Port {} : pid {}", Entry.EffectiveListenPort, Entry.Pid); }); return 0; } diff --git a/zencore/include/zencore/refcount.h b/zencore/include/zencore/refcount.h index 0324b94cc..254a22db5 100644 --- a/zencore/include/zencore/refcount.h +++ b/zencore/include/zencore/refcount.h @@ -94,10 +94,28 @@ public: } return *this; } + template<typename OtherType> + inline RefPtr& operator=(RefPtr<OtherType>&& Rhs) noexcept + { + if ((RefPtr*)&Rhs != this) + { + m_Ref && m_Ref->Release(); + m_Ref = Rhs.m_Ref; + Rhs.m_Ref = nullptr; + } + return *this; + } inline RefPtr(RefPtr&& Rhs) noexcept : m_Ref(Rhs.m_Ref) { Rhs.m_Ref = nullptr; } + template<typename OtherType> + explicit inline RefPtr(RefPtr<OtherType>&& Rhs) noexcept : m_Ref(Rhs.m_Ref) + { + Rhs.m_Ref = nullptr; + } private: T* m_Ref = nullptr; + template <typename U> + friend class RefPtr; }; /** diff --git a/zencore/include/zencore/string.h b/zencore/include/zencore/string.h index 1e8907906..4c378730f 100644 --- a/zencore/include/zencore/string.h +++ b/zencore/include/zencore/string.h @@ -413,6 +413,17 @@ private: char m_StringBuffer[N]; }; +template<size_t N> +class WriteToString : public ExtendableStringBuilder<N> +{ +public: + template<typename... ArgTypes> + explicit WriteToString(ArgTypes&&... Args) + { + (*this << ... << std::forward<ArgTypes>(Args)); + } +}; + ////////////////////////////////////////////////////////////////////////// extern template class StringBuilderImpl<wchar_t>; @@ -454,6 +465,17 @@ private: wchar_t m_Buffer[N]; }; +template<size_t N> +class WriteToWideString : public ExtendableWideStringBuilder<N> +{ +public: + template<typename... ArgTypes> + explicit WriteToWideString(ArgTypes&&... Args) + { + (*this << ... << Forward<ArgTypes>(Args)); + } +}; + ////////////////////////////////////////////////////////////////////////// void Utf8ToWide(const char8_t* str, WideStringBuilderBase& out); diff --git a/zenhttp/httpasio.cpp b/zenhttp/httpasio.cpp index 801bb51ac..f2d48200e 100644 --- a/zenhttp/httpasio.cpp +++ b/zenhttp/httpasio.cpp @@ -64,7 +64,7 @@ public: HttpAsioServerImpl(); ~HttpAsioServerImpl(); - void Start(uint16_t Port, int ThreadCount); + int Start(uint16_t Port, int ThreadCount); void Stop(); void RegisterService(const char* UrlPath, HttpService& Service); HttpService* RouteRequest(std::string_view Url); @@ -934,7 +934,12 @@ struct HttpAcceptor m_Acceptor.set_option(asio::ip::v6_only(false)); m_Acceptor.set_option(asio::socket_base::reuse_address(true)); m_Acceptor.set_option(asio::ip::tcp::no_delay(true)); - m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v6::any(), Port)); + asio::error_code BindErrorCode; + m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v6::any(), Port), BindErrorCode); + if (BindErrorCode == asio::error::access_denied) + { + m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v6::any(), 0)); + } m_Acceptor.listen(); } @@ -980,6 +985,8 @@ struct HttpAcceptor }); } + int GetAcceptPort() { return m_Acceptor.local_endpoint().port(); } + private: HttpAsioServerImpl& m_Server; asio::io_service& m_IoService; @@ -1119,7 +1126,7 @@ HttpAsioServerImpl::~HttpAsioServerImpl() { } -void +int HttpAsioServerImpl::Start(uint16_t Port, int ThreadCount) { ZEN_ASSERT(ThreadCount > 0); @@ -1142,6 +1149,8 @@ HttpAsioServerImpl::Start(uint16_t Port, int ThreadCount) } }); } + + return m_Acceptor->GetAcceptPort(); } void @@ -1212,12 +1221,11 @@ HttpAsioServer::RegisterService(HttpService& Service) m_Impl->RegisterService(Service.BaseUri(), Service); } -void +int HttpAsioServer::Initialize(int BasePort) { - m_BasePort = BasePort; - - m_Impl->Start(gsl::narrow<uint16_t>(m_BasePort), Max(std::thread::hardware_concurrency(), 8u)); + m_BasePort = m_Impl->Start(gsl::narrow<uint16_t>(BasePort), Max(std::thread::hardware_concurrency(), 8u)); + return m_BasePort; } void diff --git a/zenhttp/httpasio.h b/zenhttp/httpasio.h index 08834ba21..716145955 100644 --- a/zenhttp/httpasio.h +++ b/zenhttp/httpasio.h @@ -22,7 +22,7 @@ public: ~HttpAsioServer(); virtual void RegisterService(HttpService& Service) override; - virtual void Initialize(int BasePort) override; + virtual int Initialize(int BasePort) override; virtual void Run(bool IsInteractiveSession) override; virtual void RequestExit() override; diff --git a/zenhttp/httpnull.cpp b/zenhttp/httpnull.cpp index 31b13a6ce..a6e1d3567 100644 --- a/zenhttp/httpnull.cpp +++ b/zenhttp/httpnull.cpp @@ -24,10 +24,10 @@ HttpNullServer::RegisterService(HttpService& Service) ZEN_UNUSED(Service); } -void +int HttpNullServer::Initialize(int BasePort) { - ZEN_UNUSED(BasePort); + return BasePort; } void diff --git a/zenhttp/httpnull.h b/zenhttp/httpnull.h index 867bbe4d2..74f021f6b 100644 --- a/zenhttp/httpnull.h +++ b/zenhttp/httpnull.h @@ -18,7 +18,7 @@ public: ~HttpNullServer(); virtual void RegisterService(HttpService& Service) override; - virtual void Initialize(int BasePort) override; + virtual int Initialize(int BasePort) override; virtual void Run(bool IsInteractiveSession) override; virtual void RequestExit() override; diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp index b3d109b6a..3c57f7ce3 100644 --- a/zenhttp/httpsys.cpp +++ b/zenhttp/httpsys.cpp @@ -746,7 +746,7 @@ HttpSysServer::~HttpSysServer() } } -void +int HttpSysServer::InitializeServer(int BasePort) { using namespace std::literals; @@ -762,7 +762,7 @@ HttpSysServer::InitializeServer(int BasePort) { ZEN_ERROR("Failed to create server session for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result); - return; + return BasePort; } Result = HttpCreateUrlGroup(m_HttpSessionId, &m_HttpUrlGroupId, 0); @@ -771,17 +771,29 @@ HttpSysServer::InitializeServer(int BasePort) { ZEN_ERROR("Failed to create URL group for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result); - return; + return BasePort; } + int EffectivePort = BasePort; + Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, WildcardUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0); + // Sharing violation implies the port is being used by another process + for (int PortOffset = 1; (Result == ERROR_SHARING_VIOLATION) && (PortOffset < 10); ++PortOffset) + { + EffectivePort = BasePort + (PortOffset * 100); + WildcardUrlPath.Reset(); + WildcardUrlPath << u8"http://*:"sv << int64_t(EffectivePort) << u8"/"sv; + + Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, WildcardUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0); + } + m_BaseUris.clear(); if (Result == NO_ERROR) { m_BaseUris.push_back(WildcardUrlPath.c_str()); } - else + else if (Result == ERROR_ACCESS_DENIED) { // If we can't register the wildcard path, we fall back to local paths // This local paths allow requests originating locally to function, but will not allow @@ -792,14 +804,26 @@ HttpSysServer::InitializeServer(int BasePort) const std::u8string_view Hosts[] = {u8"[::1]"sv, u8"localhost"sv, u8"127.0.0.1"sv}; - for (const std::u8string_view Host : Hosts) + ULONG InternalResult = ERROR_SHARING_VIOLATION; + for (int PortOffset = 0; (InternalResult == ERROR_SHARING_VIOLATION) && (PortOffset < 10); ++PortOffset) { - WideStringBuilder<64> LocalUrlPath; - LocalUrlPath << u8"http://"sv << Host << u8":"sv << int64_t(BasePort) << u8"/"sv; + EffectivePort = BasePort + (PortOffset * 100); - if (HttpAddUrlToUrlGroup(m_HttpUrlGroupId, LocalUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0) == NO_ERROR) + for (const std::u8string_view Host : Hosts) { - m_BaseUris.push_back(LocalUrlPath.c_str()); + WideStringBuilder<64> LocalUrlPath; + LocalUrlPath << u8"http://"sv << Host << u8":"sv << int64_t(EffectivePort) << u8"/"sv; + + InternalResult = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, LocalUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0); + + if (InternalResult == NO_ERROR) + { + m_BaseUris.push_back(LocalUrlPath.c_str()); + } + else + { + break; + } } } } @@ -808,7 +832,7 @@ HttpSysServer::InitializeServer(int BasePort) { ZEN_ERROR("Failed to add base URL to URL group for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result); - return; + return BasePort; } HTTP_BINDING_INFO HttpBindingInfo = {{0}, 0}; @@ -823,7 +847,7 @@ HttpSysServer::InitializeServer(int BasePort) { ZEN_ERROR("Failed to create request queue for '{}': {:#x}", WideToUtf8(m_BaseUris.front()), Result); - return; + return EffectivePort; } HttpBindingInfo.Flags.Present = 1; @@ -835,7 +859,7 @@ HttpSysServer::InitializeServer(int BasePort) { ZEN_ERROR("Failed to set server binding property for '{}': {:#x}", WideToUtf8(m_BaseUris.front()), Result); - return; + return EffectivePort; } // Create I/O completion port @@ -853,6 +877,8 @@ HttpSysServer::InitializeServer(int BasePort) ZEN_INFO("Started http.sys server at '{}'", WideToUtf8(m_BaseUris.front())); } + + return EffectivePort; } void @@ -1603,11 +1629,12 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT // HttpServer interface implementation // -void +int HttpSysServer::Initialize(int BasePort) { - InitializeServer(BasePort); + int EffectivePort = InitializeServer(BasePort); StartServer(); + return EffectivePort; } void diff --git a/zenhttp/httpsys.h b/zenhttp/httpsys.h index 06bad99c3..0453b8740 100644 --- a/zenhttp/httpsys.h +++ b/zenhttp/httpsys.h @@ -41,7 +41,7 @@ public: // HttpServer interface implementation - virtual void Initialize(int BasePort) override; + virtual int Initialize(int BasePort) override; virtual void Run(bool TestMode) override; virtual void RequestExit() override; virtual void RegisterService(HttpService& Service) override; @@ -52,7 +52,7 @@ public: inline bool IsAsyncResponseEnabled() const { return m_IsAsyncResponseEnabled; } private: - void InitializeServer(int BasePort); + int InitializeServer(int BasePort); void Cleanup(); void StartServer(); diff --git a/zenhttp/include/zenhttp/httpserver.h b/zenhttp/include/zenhttp/httpserver.h index 902310f04..545b96db2 100644 --- a/zenhttp/include/zenhttp/httpserver.h +++ b/zenhttp/include/zenhttp/httpserver.h @@ -169,7 +169,7 @@ class HttpServer : public RefCounted { public: virtual void RegisterService(HttpService& Service) = 0; - virtual void Initialize(int BasePort) = 0; + virtual int Initialize(int BasePort) = 0; virtual void Run(bool IsInteractiveSession) = 0; virtual void RequestExit() = 0; }; diff --git a/zenserver-test/cachepolicy-tests.cpp b/zenserver-test/cachepolicy-tests.cpp new file mode 100644 index 000000000..686ff818c --- /dev/null +++ b/zenserver-test/cachepolicy-tests.cpp @@ -0,0 +1,164 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zencore/zencore.h> + +#if ZEN_WITH_TESTS + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/string.h> +# include <zencore/testing.h> +# include <zencore/uid.h> +# include <zenutil/cache/cachepolicy.h> + +namespace zen::tests { + +using namespace std::literals; + +TEST_CASE("cachepolicy") +{ + SUBCASE("atomics serialization") + { + CachePolicy SomeAtomics[] = {CachePolicy::None, + CachePolicy::QueryLocal, + CachePolicy::StoreRemote, + CachePolicy::SkipData, + CachePolicy::KeepAlive, + CachePolicy::Disable}; + for (CachePolicy Atomic : SomeAtomics) + { + CHECK(ParseCachePolicy(WriteToString<128>(Atomic)) == Atomic); + } + // Also verify that we ignore unrecognized bits + for (CachePolicy Atomic : SomeAtomics) + { + CHECK(ParseCachePolicy(WriteToString<128>(Atomic | (CachePolicy)0x10000000)) == Atomic); + } + } + SUBCASE("aliases serialization") + { + CachePolicy SomeAliases[] = {CachePolicy::Query, CachePolicy::Local}; + for (CachePolicy Alias : SomeAliases) + { + CHECK(ParseCachePolicy(WriteToString<128>(Alias)) == Alias); + } + // Also verify that we ignore unrecognized bits + for (CachePolicy Alias : SomeAliases) + { + CHECK(ParseCachePolicy(WriteToString<128>(Alias | (CachePolicy)0x10000000)) == Alias); + } + } + SUBCASE("aliases take priority over atomics") + { + CHECK(WriteToString<128>(CachePolicy::Default).ToView() == "Default"sv); + CHECK(WriteToString<128>(CachePolicy::Query).ToView() == "Query"sv); + CHECK(WriteToString<128>(CachePolicy::Local).ToView() == "Local"sv); + } + SUBCASE("policies requiring multiple strings work") + { + char Delimiter = ','; + CachePolicy Combination = CachePolicy::SkipData | CachePolicy::QueryLocal; + CHECK(WriteToString<128>(Combination).ToView().find(Delimiter) != std::string_view::npos); + CHECK(ParseCachePolicy(WriteToString<128>(Combination)) == Combination); + } + SUBCASE("parsing invalid text") + { + CHECK(ParseCachePolicy(",,,") == CachePolicy::None); + CHECK(ParseCachePolicy("fee,fie,foo,fum") == CachePolicy::None); + CHECK(ParseCachePolicy("fee,KeepAlive,foo,fum") == CachePolicy::KeepAlive); + } +} + +TEST_CASE("cacherecordpolicy") +{ + SUBCASE("policy with no values") + { + CachePolicy Policy = CachePolicy::SkipData | CachePolicy::QueryLocal; + CacheRecordPolicy RecordPolicy; + CacheRecordPolicyBuilder Builder(Policy); + RecordPolicy = Builder.Build(); + SUBCASE("construct") + { + CHECK(RecordPolicy.IsUniform()); + CHECK(RecordPolicy.GetRecordPolicy() == Policy); + CHECK(RecordPolicy.GetDefaultValuePolicy() == Policy); + CHECK(RecordPolicy.GetValuePolicy(Oid::NewOid()) == Policy); + CHECK(RecordPolicy.GetValuePolicies().size() == 0); + } + SUBCASE("saveload") + { + CbWriter Writer; + RecordPolicy.Save(Writer); + CbObject Saved = Writer.Save()->AsObject(); + CacheRecordPolicy Loaded = CacheRecordPolicy::Load(Saved); + CHECK(Loaded.IsUniform()); + CHECK(Loaded.GetRecordPolicy() == Policy); + CHECK(Loaded.GetDefaultValuePolicy() == Policy); + CHECK(Loaded.GetValuePolicy(Oid::NewOid()) == Policy); + CHECK(Loaded.GetValuePolicies().size() == 0); + } + } + + SUBCASE("policy with values") + { + CachePolicy DefaultPolicy = CachePolicy::StoreRemote | CachePolicy::QueryLocal; + CachePolicy PartialOverlap = CachePolicy::StoreRemote; + CachePolicy NoOverlap = CachePolicy::QueryRemote; + CachePolicy UnionPolicy = DefaultPolicy | PartialOverlap | NoOverlap; + + CacheRecordPolicy RecordPolicy; + CacheRecordPolicyBuilder Builder(DefaultPolicy); + Oid PartialOid = Oid::NewOid(); + Oid NoOverlapOid = Oid::NewOid(); + Oid OtherOid = Oid::NewOid(); + Builder.AddValuePolicy(PartialOid, PartialOverlap); + Builder.AddValuePolicy(NoOverlapOid, NoOverlap); + RecordPolicy = Builder.Build(); + SUBCASE("construct") + { + CHECK(!RecordPolicy.IsUniform()); + CHECK(RecordPolicy.GetRecordPolicy() == UnionPolicy); + CHECK(RecordPolicy.GetDefaultValuePolicy() == DefaultPolicy); + CHECK(RecordPolicy.GetValuePolicy(PartialOid) == PartialOverlap); + CHECK(RecordPolicy.GetValuePolicy(NoOverlapOid) == NoOverlap); + CHECK(RecordPolicy.GetValuePolicy(OtherOid) == DefaultPolicy); + CHECK(RecordPolicy.GetValuePolicies().size() == 2); + } + SUBCASE("saveload") + { + CbWriter Writer; + RecordPolicy.Save(Writer); + CbObject Saved = Writer.Save()->AsObject(); + CacheRecordPolicy Loaded = CacheRecordPolicy::Load(Saved); + CHECK(!RecordPolicy.IsUniform()); + CHECK(RecordPolicy.GetRecordPolicy() == UnionPolicy); + CHECK(RecordPolicy.GetDefaultValuePolicy() == DefaultPolicy); + CHECK(RecordPolicy.GetValuePolicy(PartialOid) == PartialOverlap); + CHECK(RecordPolicy.GetValuePolicy(NoOverlapOid) == NoOverlap); + CHECK(RecordPolicy.GetValuePolicy(OtherOid) == DefaultPolicy); + CHECK(RecordPolicy.GetValuePolicies().size() == 2); + } + } + + SUBCASE("parsing invalid text") + { + CacheRecordPolicy Loaded = CacheRecordPolicy::Load(CbObject()); + CHECK(Loaded.IsUniform()); + CHECK(Loaded.GetRecordPolicy() == CachePolicy::Default); + CHECK(Loaded.GetDefaultValuePolicy() == CachePolicy::Default); + CHECK(Loaded.GetValuePolicy(Oid::NewOid()) == CachePolicy::Default); + CHECK(Loaded.GetValuePolicies().size() == 0); + + CachePolicy Policy = CachePolicy::SkipData; + Loaded = CacheRecordPolicy::Load(CbObject(), Policy); + CHECK(Loaded.IsUniform()); + CHECK(Loaded.GetRecordPolicy() == Policy); + CHECK(Loaded.GetDefaultValuePolicy() == Policy); + CHECK(Loaded.GetValuePolicy(Oid::NewOid()) == Policy); + CHECK(Loaded.GetValuePolicies().size() == 0); + } +} + +} // namespace zen::tests + +#endif diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 75aae6321..85393aed2 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -1536,13 +1536,13 @@ TEST_CASE("zcache.policy") } { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?query=local", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=QueryLocal,Store", LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); CHECK(Result.status_code == 404); } { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?query=local,remote", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); CHECK(Result.status_code == 200); } @@ -1564,7 +1564,7 @@ TEST_CASE("zcache.policy") // Store binary cache value locally { - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?store=local", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,StoreLocal", LocalCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, cpr::Header{{"Content-Type", "application/octet-stream"}}); CHECK(Result.status_code == 201); @@ -1599,7 +1599,7 @@ TEST_CASE("zcache.policy") // Store binary cache value locally and upstream { - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?store=local,remote", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, cpr::Header{{"Content-Type", "application/octet-stream"}}); CHECK(Result.status_code == 201); @@ -1643,13 +1643,13 @@ TEST_CASE("zcache.policy") } { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?query=local", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=QueryLocal,Store", LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 404); } { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?query=local,remote", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); } @@ -1673,7 +1673,7 @@ TEST_CASE("zcache.policy") // Store packge locally { - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?store=local", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,StoreLocal", LocalCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 201); @@ -1710,7 +1710,7 @@ TEST_CASE("zcache.policy") // Store package locally and upstream { - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?store=local,remote", LocalCfg.BaseUri, Bucket, Key)}, + cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 201); @@ -1729,130 +1729,7 @@ TEST_CASE("zcache.policy") } } - SUBCASE("skip - 'attachments' does not return attachments") - { - ZenConfig LocalCfg = ZenConfig::New(); - ZenServerInstance LocalInst(TestEnv); - const auto Bucket = "texture"sv; - - LocalCfg.Spawn(LocalInst); - - zen::IoHash Key; - zen::IoHash PayloadId; - - // Store package locally - { - zen::CbPackage Package = GeneratePackage(Key, PayloadId); - auto Buf = ToBuffer(Package); - - CHECK(Package.GetAttachments().size() != 0); - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 201); - } - - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?skip=attachments", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 200); - - zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); - zen::CbPackage Package; - const bool Ok = Package.TryLoad(Body); - CHECK(Ok); - - CHECK(Ok); - - CbObject CacheRecord = Package.GetObject(); - std::vector<IoHash> AttachmentKeys; - - CacheRecord.IterateAttachments( - [&AttachmentKeys](CbFieldView AttachmentKey) { AttachmentKeys.push_back(AttachmentKey.AsHash()); }); - - CHECK(AttachmentKeys.size() != 0); - CHECK(Package.GetAttachments().size() == 0); - } - - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 200); - - zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); - zen::CbPackage Package; - const bool Ok = Package.TryLoad(Body); - CHECK(Ok); - - CHECK(Ok); - CHECK(Package.GetAttachments().size() != 0); - } - } - - SUBCASE("skip - 'attachments' does not return attachments when retrieved from upstream") - { - ZenConfig UpstreamCfg = ZenConfig::New(13338); - ZenServerInstance UpstreamInst(TestEnv); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); - ZenServerInstance LocalInst(TestEnv); - const auto Bucket = "texture"sv; - - UpstreamCfg.Spawn(UpstreamInst); - LocalCfg.Spawn(LocalInst); - - zen::IoHash Key; - zen::IoHash PayloadId; - - // Store package upstream - { - zen::CbPackage Package = GeneratePackage(Key, PayloadId); - auto Buf = ToBuffer(Package); - - CHECK(Package.GetAttachments().size() != 0); - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)}, - cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 201); - } - - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?skip=attachments", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 200); - - zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); - zen::CbPackage Package; - const bool Ok = Package.TryLoad(Body); - CHECK(Ok); - - CHECK(Ok); - - CbObject CacheRecord = Package.GetObject(); - std::vector<IoHash> AttachmentKeys; - - CacheRecord.IterateAttachments( - [&AttachmentKeys](CbFieldView AttachmentKey) { AttachmentKeys.push_back(AttachmentKey.AsHash()); }); - - CHECK(AttachmentKeys.size() != 0); - CHECK(Package.GetAttachments().size() == 0); - } - - { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.status_code == 200); - - zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); - zen::CbPackage Package; - const bool Ok = Package.TryLoad(Body); - CHECK(Ok); - - CHECK(Ok); - CHECK(Package.GetAttachments().size() != 0); - } - } - - SUBCASE("skip - 'data' returns empty cache record/payload") + SUBCASE("skip - 'data' returns cache record without attachments/empty payload") { ZenConfig Cfg = ZenConfig::New(); ZenServerInstance Instance(TestEnv); @@ -1875,24 +1752,30 @@ TEST_CASE("zcache.policy") // Get package { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?skip=data", Cfg.BaseUri, Bucket, Key)}, + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(IsHttpSuccessCode(Result.status_code)); - CHECK(Result.text.size() == 0); + IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size()); + CbPackage ResponsePackage; + CHECK(ResponsePackage.TryLoad(Buffer)); + CHECK(ResponsePackage.GetAttachments().size() == 0); } // Get record { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?skip=data", Cfg.BaseUri, Bucket, Key)}, + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbobject"}}); CHECK(IsHttpSuccessCode(Result.status_code)); - CHECK(Result.text.size() == 0); + IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size()); + CbObject ResponseObject = zen::LoadCompactBinaryObject(Buffer); + CHECK((bool)ResponseObject); } // Get payload { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}/{}?skip=data", Cfg.BaseUri, Bucket, Key, PayloadId)}, - cpr::Header{{"Accept", "application/x-ue-comp"}}); + cpr::Response Result = + cpr::Get(cpr::Url{fmt::format("{}/{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key, PayloadId)}, + cpr::Header{{"Accept", "application/x-ue-comp"}}); CHECK(IsHttpSuccessCode(Result.status_code)); CHECK(Result.text.size() == 0); } @@ -1919,7 +1802,7 @@ TEST_CASE("zcache.policy") // Get package { - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?skip=data", Cfg.BaseUri, Bucket, Key)}, + cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); CHECK(IsHttpSuccessCode(Result.status_code)); CHECK(Result.text.size() == 0); @@ -1931,7 +1814,12 @@ TEST_CASE("zcache.rpc") { using namespace std::literals; - auto CreateCacheRecord = [](const zen::CacheKey& CacheKey, size_t PayloadSize) -> zen::CbPackage { + auto AppendCacheRecord = [](CbPackage& Package, + CbWriter& Writer, + const zen::CacheKey& CacheKey, + size_t PayloadSize, + CachePolicy /* BatchDefaultPolicy */, + CachePolicy RecordPolicy) { std::vector<uint8_t> Data; Data.resize(PayloadSize); for (size_t Idx = 0; Idx < PayloadSize; ++Idx) @@ -1941,17 +1829,24 @@ TEST_CASE("zcache.rpc") zen::CbAttachment Attachment(zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()))); - zen::CbObjectWriter CacheRecord; - CacheRecord.BeginObject("CacheKey"sv); - CacheRecord << "Bucket"sv << CacheKey.Bucket << "Hash"sv << CacheKey.Hash; - CacheRecord.EndObject(); - CacheRecord << "Data"sv << Attachment; + Writer.BeginObject(); + { + Writer.BeginObject("Record"sv); + { + Writer.BeginObject("Key"sv); + { + Writer << "Bucket"sv << CacheKey.Bucket << "Hash"sv << CacheKey.Hash; + } + Writer.EndObject(); + Writer << "Data"sv << Attachment; + } + Writer.EndObject(); + Writer.SetName("Policy"sv); + CacheRecordPolicy(RecordPolicy).Save(Writer); + } + Writer.EndObject(); - zen::CbPackage Package; - Package.SetObject(CacheRecord.Save()); Package.AddAttachment(Attachment); - - return Package; }; auto ToIoBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { @@ -1960,27 +1855,46 @@ TEST_CASE("zcache.rpc") return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); }; - auto PutCacheRecords = [&CreateCacheRecord, &ToIoBuffer](std::string_view BaseUri, - std::string_view Query, - std::string_view Bucket, - size_t Num, - size_t PayloadSize = 1024) -> std::vector<CacheKey> { + auto PutCacheRecords = + [&AppendCacheRecord, + &ToIoBuffer](std::string_view BaseUri, std::string_view Bucket, size_t Num, size_t PayloadSize = 1024) -> std::vector<CacheKey> { std::vector<zen::CacheKey> OutKeys; for (uint32_t Key = 1; Key <= Num; ++Key) { - const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, zen::IoHash::HashBuffer(&Key, sizeof(uint32_t))); - CbPackage CacheRecord = CreateCacheRecord(CacheKey, PayloadSize); + zen::IoHash KeyHash; + ((uint32_t*)(KeyHash.Hash))[0] = Key; + const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); + CbPackage Package; + CbWriter Writer; - OutKeys.push_back(CacheKey); + Writer.BeginObject(); + { + Writer << "Method"sv + << "PutCacheRecords"sv; + Writer.BeginObject("Params"sv); + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer.BeginArray("Requests"sv); + { + AppendCacheRecord(Package, Writer, CacheKey, PayloadSize, BatchDefaultPolicy, CachePolicy::Default); + } + Writer.EndArray(); + } + Writer.EndObject(); + } + Writer.EndObject(); + Package.SetObject(Writer.Save().AsObject()); - IoBuffer Payload = ToIoBuffer(CacheRecord); + OutKeys.push_back(CacheKey); - cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}{}", BaseUri, CacheKey.Bucket, CacheKey.Hash, Query)}, - cpr::Body{(const char*)Payload.Data(), Payload.Size()}, - cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); - CHECK(Result.status_code == 201); + CHECK(Result.status_code == 200); } return OutKeys; @@ -2011,9 +1925,8 @@ TEST_CASE("zcache.rpc") } Request.EndArray(); - Request.BeginObject("Policy"); - CacheRecordPolicy::Save(Policy, Request); - Request.EndObject(); + Request.SetName("Policy"sv); + Policy.Save(Request); Request.EndObject(); @@ -2066,7 +1979,7 @@ TEST_CASE("zcache.rpc") Inst.WaitUntilReady(); CacheRecordPolicy Policy; - std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 128); + std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "mastodon"sv, 128); GetCacheRecordResult Result = GetCacheRecords(BaseUri, Keys, Policy); CHECK(Result.Records.size() == Keys.size()); @@ -2076,7 +1989,7 @@ TEST_CASE("zcache.rpc") const CacheKey& ExpectedKey = Keys[Index++]; CbObjectView RecordObj = RecordView.AsObjectView(); - CbObjectView KeyObj = RecordObj["CacheKey"sv].AsObjectView(); + CbObjectView KeyObj = RecordObj["Key"sv].AsObjectView(); const CacheKey Key = CacheKey::Create(KeyObj["Bucket"sv].AsString(), KeyObj["Hash"].AsHash()); const IoHash AttachmentHash = RecordObj["Data"sv].AsHash(); const CbAttachment* Attachment = Result.Response.FindAttachment(AttachmentHash); @@ -2098,7 +2011,7 @@ TEST_CASE("zcache.rpc") Inst.WaitUntilReady(); CacheRecordPolicy Policy; - std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 128); + std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "mastodon"sv, 128); std::vector<zen::CacheKey> Keys; for (const zen::CacheKey& Key : ExistingKeys) @@ -2124,7 +2037,7 @@ TEST_CASE("zcache.rpc") { const CacheKey& ExpectedKey = ExistingKeys[KeyIndex++]; CbObjectView RecordObj = RecordView.AsObjectView(); - zen::CacheKey Key = LoadKey(RecordObj["CacheKey"sv]); + zen::CacheKey Key = LoadKey(RecordObj["Key"sv]); const IoHash AttachmentHash = RecordObj["Data"sv].AsHash(); const CbAttachment* Attachment = Result.Response.FindAttachment(AttachmentHash); @@ -2134,38 +2047,6 @@ TEST_CASE("zcache.rpc") } } - SUBCASE("policy - 'SkipAttachments' does not return any record attachments") - { - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - const uint16_t PortNumber = 13337; - const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); - - ZenServerInstance Inst(TestEnv); - Inst.SetTestDir(TestDir); - Inst.SpawnServer(PortNumber); - Inst.WaitUntilReady(); - - CacheRecordPolicy Policy(CachePolicy::QueryLocal | CachePolicy::SkipAttachments); - std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 4); - GetCacheRecordResult Result = GetCacheRecords(BaseUri, Keys, Policy); - - CHECK(Result.Records.size() == Keys.size()); - - std::span<const zen::CbAttachment> Attachments = Result.Response.GetAttachments(); - CHECK(Attachments.empty()); - - for (size_t Index = 0; CbFieldView RecordView : Result.Records) - { - const CacheKey& ExpectedKey = Keys[Index++]; - - CbObjectView RecordObj = RecordView.AsObjectView(); - CbObjectView KeyObj = RecordObj["CacheKey"sv].AsObjectView(); - const CacheKey Key = CacheKey::Create(KeyObj["Bucket"sv].AsString(), KeyObj["Hash"].AsHash()); - - CHECK(Key == ExpectedKey); - } - } - SUBCASE("policy - 'QueryLocal' does not query upstream") { using namespace utils; @@ -2178,7 +2059,7 @@ TEST_CASE("zcache.rpc") SpawnServer(UpstreamServer, UpstreamCfg); SpawnServer(LocalServer, LocalCfg); - std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, ""sv, "mastodon"sv, 4); + std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "mastodon"sv, 4); CacheRecordPolicy Policy(CachePolicy::QueryLocal); GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, Keys, Policy); @@ -2203,7 +2084,7 @@ TEST_CASE("zcache.rpc") SpawnServer(UpstreamServer, UpstreamCfg); SpawnServer(LocalServer, LocalCfg); - std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, ""sv, "mastodon"sv, 4); + std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "mastodon"sv, 4); CacheRecordPolicy Policy(CachePolicy::QueryLocal | CachePolicy::QueryRemote); GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, Keys, Policy); @@ -2214,7 +2095,7 @@ TEST_CASE("zcache.rpc") { const zen::CacheKey& ExpectedKey = Keys[Index++]; CbObjectView RecordObj = RecordView.AsObjectView(); - zen::CacheKey Key = LoadKey(RecordObj["CacheKey"sv]); + zen::CacheKey Key = LoadKey(RecordObj["Key"sv]); CHECK(Key == ExpectedKey); } } diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 5918d5178..d39b95a1e 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -7,6 +7,7 @@ #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> +#include <zencore/enumflags.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> @@ -14,6 +15,7 @@ #include <zencore/timer.h> #include <zencore/trace.h> #include <zenhttp/httpserver.h> +#include <zenhttp/httpshared.h> #include <zenstore/cas.h> #include <zenutil/cache/cache.h> @@ -42,11 +44,8 @@ using namespace std::literals; CachePolicy ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) { - const CachePolicy QueryPolicy = zen::ParseQueryCachePolicy(QueryParams.GetValue("query"sv)); - const CachePolicy StorePolicy = zen::ParseStoreCachePolicy(QueryParams.GetValue("store"sv)); - const CachePolicy SkipPolicy = zen::ParseSkipCachePolicy(QueryParams.GetValue("skip"sv)); - - return QueryPolicy | StorePolicy | SkipPolicy; + std::string_view PolicyText = QueryParams.GetValue("Policy"sv); + return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default; } struct AttachmentCount @@ -57,6 +56,13 @@ struct AttachmentCount uint32_t Total = 0; }; +struct PutRequestData +{ + CacheKey Key; + CbObjectView RecordObject; + CacheRecordPolicy Policy; +}; + ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, @@ -134,16 +140,15 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } - const auto QueryParams = Request.GetQueryParams(); - CachePolicy Policy = ParseCachePolicy(QueryParams); + CachePolicy PolicyFromURL = ParseCachePolicy(Request.GetQueryParams()); - if (Ref.PayloadId == IoHash::Zero) + if (Ref.ValueContentId == IoHash::Zero) { - return HandleCacheRecordRequest(Request, Ref, Policy); + return HandleCacheRecordRequest(Request, Ref, PolicyFromURL); } else { - return HandleCachePayloadRequest(Request, Ref, Policy); + return HandleCacheValueRequest(Request, Ref, PolicyFromURL); } return; @@ -180,19 +185,19 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, } void -HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: { - HandleGetCacheRecord(Request, Ref, Policy); + HandleGetCacheRecord(Request, Ref, PolicyFromURL); } break; case HttpVerb::kPut: - HandlePutCacheRecord(Request, Ref, Policy); + HandlePutCacheRecord(Request, Ref, PolicyFromURL); break; default: break; @@ -200,18 +205,20 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, } void -HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { - const ZenContentType AcceptType = Request.AcceptContentType(); - const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData; - const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; - const bool PartialOnError = (Policy & CachePolicy::PartialOnError) == CachePolicy::PartialOnError; - const bool QueryUpstream = (Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote; + const ZenContentType AcceptType = Request.AcceptContentType(); + const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData); + const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord); bool Success = false; - ZenCacheValue LocalCacheValue; + ZenCacheValue ClientResultValue; + if (!EnumHasAnyFlags(PolicyFromURL, CachePolicy::Query)) + { + return Request.WriteResponse(HttpResponseCode::OK); + } - if (m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, LocalCacheValue)) + if (EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal) && m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, ClientResultValue)) { Success = true; @@ -220,14 +227,11 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request CbPackage Package; uint32_t MissingCount = 0; - CbObjectView CacheRecord(LocalCacheValue.Value.Data()); - CacheRecord.IterateAttachments([this, SkipAttachments, &MissingCount, &Package](CbFieldView AttachmentHash) { - if (SkipAttachments && MissingCount == 0) + CbObjectView CacheRecord(ClientResultValue.Value.Data()); + CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { + if (SkipData) { - if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) - { - MissingCount++; - } + MissingCount += m_CidStore.ContainsChunk(AttachmentHash.AsHash()) ? 0 : 1; } else { @@ -242,17 +246,17 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } }); - Success = MissingCount == 0 || PartialOnError; + Success = MissingCount == 0 || PartialRecord; if (Success) { - Package.SetObject(LoadCompactBinaryObject(LocalCacheValue.Value)); + Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value)); BinaryWriter MemStream; Package.Save(MemStream); - LocalCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - LocalCacheValue.Value.SetContentType(HttpContentType::kCbPackage); + ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage); } } } @@ -262,21 +266,21 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", Ref.BucketSegment, Ref.HashKey, - NiceBytes(LocalCacheValue.Value.Size()), - ToString(LocalCacheValue.Value.GetContentType())); + NiceBytes(ClientResultValue.Value.Size()), + ToString(ClientResultValue.Value.GetContentType())); m_CacheStats.HitCount++; - - if (SkipData) + if (SkipData && AcceptType == ZenContentType::kBinary) { return Request.WriteResponse(HttpResponseCode::OK); } else { - return Request.WriteResponse(HttpResponseCode::OK, LocalCacheValue.Value.GetContentType(), LocalCacheValue.Value); + // Other types handled SkipData when constructing the ClientResultValue + return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } - else if (!QueryUpstream) + else if (!EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryRemote)) { ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); m_CacheStats.MissCount++; @@ -286,9 +290,13 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request // Issue upstream query asynchronously in order to keep requests flowing without // hogging I/O servicing threads with blocking work - Request.WriteResponseAsync([this, AcceptType, SkipData, SkipAttachments, PartialOnError, Ref](HttpServerRequest& AsyncRequest) { - bool Success = false; - ZenCacheValue UpstreamCacheValue; + Request.WriteResponseAsync([this, AcceptType, PolicyFromURL, Ref](HttpServerRequest& AsyncRequest) { + bool Success = false; + const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord); + const bool QueryLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal); + const bool StoreLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreLocal); + const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData); + ZenCacheValue ClientResultValue; metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); @@ -297,8 +305,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request { Success = true; - UpstreamCacheValue.Value = UpstreamResult.Value; - UpstreamCacheValue.Value.SetContentType(AcceptType); + ClientResultValue.Value = UpstreamResult.Value; + ClientResultValue.Value.SetContentType(AcceptType); if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject) { @@ -313,72 +321,82 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request Ref.HashKey, ToString(AcceptType)); } + + // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data } - if (Success) + if (Success && StoreLocal) { - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, UpstreamCacheValue); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, ClientResultValue); } } else if (AcceptType == ZenContentType::kCbPackage) { CbPackage Package; - if (Package.TryLoad(UpstreamCacheValue.Value)) + if (Package.TryLoad(ClientResultValue.Value)) { CbObject CacheRecord = Package.GetObject(); AttachmentCount Count; - CacheRecord.IterateAttachments([this, &Package, &Ref, &Count, SkipAttachments](CbFieldView HashView) { + CacheRecord.IterateAttachments([this, &Package, &Ref, &Count, QueryLocal, StoreLocal](CbFieldView HashView) { if (const CbAttachment* Attachment = Package.FindAttachment(HashView.AsHash())) { if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { - auto InsertResult = m_CidStore.AddChunk(Compressed); - if (InsertResult.New) + if (StoreLocal) { - Count.New++; + auto InsertResult = m_CidStore.AddChunk(Compressed); + if (InsertResult.New) + { + Count.New++; + } } Count.Valid++; } else { - ZEN_WARN("Uncompressed payload '{}' from upstream cache record '{}/{}'", + ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'", HashView.AsHash(), Ref.BucketSegment, Ref.HashKey); Count.Invalid++; } } - else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash())) + else if (QueryLocal) { - if (!SkipAttachments) + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash())) { Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + Count.Valid++; } - Count.Valid++; } Count.Total++; }); - if ((Count.Valid == Count.Total) || PartialOnError) + if ((Count.Valid == Count.Total) || PartialRecord) { ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); - - if (SkipAttachments) + if (StoreLocal) { - Package.Reset(); - Package.SetObject(CacheRecord); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); } BinaryWriter MemStream; - Package.Save(MemStream); + if (SkipData) + { + // Save a package containing only the object. + CbPackage(Package.GetObject()).Save(MemStream); + } + else + { + Package.Save(MemStream); + } - UpstreamCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - UpstreamCacheValue.Value.SetContentType(ZenContentType::kCbPackage); + ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage); } else { @@ -402,19 +420,21 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", Ref.BucketSegment, Ref.HashKey, - NiceBytes(UpstreamCacheValue.Value.Size()), - ToString(UpstreamCacheValue.Value.GetContentType())); + NiceBytes(ClientResultValue.Value.Size()), + ToString(ClientResultValue.Value.GetContentType())); m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; - if (SkipData) + if (SkipData && AcceptType == ZenContentType::kBinary) { AsyncRequest.WriteResponse(HttpResponseCode::OK); } else { - AsyncRequest.WriteResponse(HttpResponseCode::OK, UpstreamCacheValue.Value.GetContentType(), UpstreamCacheValue.Value); + // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally + // metadata. + AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } else @@ -427,7 +447,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } void -HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { IoBuffer Body = Request.ReadPayload(); @@ -436,8 +456,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request return Request.WriteResponse(HttpResponseCode::BadRequest); } - const HttpContentType ContentType = Request.RequestContentType(); - const bool StoreUpstream = (Policy & CachePolicy::StoreRemote) == CachePolicy::StoreRemote; + const HttpContentType ContentType = Request.RequestContentType(); Body.SetContentType(ContentType); @@ -446,7 +465,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request ZEN_DEBUG("PUT - '{}/{}' {} '{}'", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType)); m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); - if (StoreUpstream) + if (EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreRemote)) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kBinary, .Key = {Ref.BucketSegment, Ref.HashKey}}); } @@ -463,6 +482,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); } + CachePolicy Policy = PolicyFromURL; CbObjectView CacheRecord(Body.Data()); std::vector<IoHash> ValidAttachments; int32_t TotalCount = 0; @@ -489,10 +509,11 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size()); - if (StoreUpstream && !IsPartialRecord) + if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCbObject, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); @@ -506,6 +527,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request ZEN_WARN("PUT - '{}/{}' '{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey, ToString(ContentType)); return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); } + CachePolicy Policy = PolicyFromURL; CbObject CacheRecord = Package.GetObject(); AttachmentCount Count; @@ -570,10 +592,11 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request const bool IsPartialRecord = Count.Valid != Count.Total; - if (StoreUpstream && !IsPartialRecord) + if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCbPackage, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); @@ -585,18 +608,16 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request } void -HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: - { - HandleGetCachePayload(Request, Ref, Policy); - } + HandleGetCacheValue(Request, Ref, PolicyFromURL); break; case HttpVerb::kPut: - HandlePutCachePayload(Request, Ref, Policy); + HandlePutCacheValue(Request, Ref, PolicyFromURL); break; default: break; @@ -604,15 +625,17 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request } void -HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { - IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); - bool InUpstreamCache = false; - const bool QueryUpstream = !Payload && (Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote; + IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); + bool InUpstreamCache = false; + CachePolicy Policy = PolicyFromURL; + const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); if (QueryUpstream) { - if (auto UpstreamResult = m_UpstreamCache.GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success) + if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); + UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { @@ -621,14 +644,14 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques } else { - ZEN_WARN("got uncompressed upstream cache payload"); + ZEN_WARN("got uncompressed upstream cache value"); } } } - if (!Payload) + if (!Value) { - ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, ToString(Request.AcceptContentType())); + ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, ToString(Request.AcceptContentType())); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } @@ -636,9 +659,9 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", Ref.BucketSegment, Ref.HashKey, - Ref.PayloadId, - NiceBytes(Payload.Size()), - ToString(Payload.GetContentType()), + Ref.ValueContentId, + NiceBytes(Value.Size()), + ToString(Value.GetContentType()), InUpstreamCache ? "UPSTREAM" : "LOCAL"); m_CacheStats.HitCount++; @@ -647,21 +670,21 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques m_CacheStats.UpstreamHitCount++; } - if ((Policy & CachePolicy::SkipData) == CachePolicy::SkipData) + if (EnumHasAllFlags(Policy, CachePolicy::SkipData)) { Request.WriteResponse(HttpResponseCode::OK); } else { - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); } } void -HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { - // Note: Individual cache payloads are not propagated upstream until a valid cache record has been stored - ZEN_UNUSED(Policy); + // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored + ZEN_UNUSED(PolicyFromURL); IoBuffer Body = Request.ReadPayload(); @@ -679,9 +702,11 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); } - if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId) { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Payload ID does not match attachment hash"sv); + return Request.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "ValueContentId does not match attachment hash"sv); } CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); @@ -689,7 +714,7 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})", Ref.BucketSegment, Ref.HashKey, - Ref.PayloadId, + Ref.ValueContentId, NiceBytes(Body.Size()), ToString(Body.GetContentType()), Result.New ? "NEW" : "OLD"); @@ -718,22 +743,22 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& } std::string_view HashSegment; - std::string_view PayloadSegment; + std::string_view ValueSegment; - std::string_view::size_type PayloadSplitOffset = Key.find_last_of('/'); + std::string_view::size_type ValueSplitOffset = Key.find_last_of('/'); // We know there is a slash so no need to check for npos return - if (PayloadSplitOffset == BucketSplitOffset) + if (ValueSplitOffset == BucketSplitOffset) { // Basic cache record lookup HashSegment = Key.substr(BucketSplitOffset + 1); } else { - // Cache record + payload lookup - HashSegment = Key.substr(BucketSplitOffset + 1, PayloadSplitOffset - BucketSplitOffset - 1); - PayloadSegment = Key.substr(PayloadSplitOffset + 1); + // Cache record + valueid lookup + HashSegment = Key.substr(BucketSplitOffset + 1, ValueSplitOffset - BucketSplitOffset - 1); + ValueSegment = Key.substr(ValueSplitOffset + 1); } if (HashSegment.size() != IoHash::StringLength) @@ -741,9 +766,9 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& return false; } - if (!PayloadSegment.empty() && PayloadSegment.size() == IoHash::StringLength) + if (!ValueSegment.empty() && ValueSegment.size() == IoHash::StringLength) { - const bool IsOk = ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); + const bool IsOk = ParseHexBytes(ValueSegment.data(), ValueSegment.size(), OutRef.ValueContentId.Hash); if (!IsOk) { @@ -752,7 +777,7 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& } else { - OutRef.PayloadId = IoHash::Zero; + OutRef.ValueContentId = IoHash::Zero; } const bool IsOk = ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); @@ -775,27 +800,44 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) const HttpContentType ContentType = Request.RequestContentType(); const HttpContentType AcceptType = Request.AcceptContentType(); - if (ContentType != HttpContentType::kCbObject || AcceptType != HttpContentType::kCbPackage) + if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) || + AcceptType != HttpContentType::kCbPackage) { return Request.WriteResponse(HttpResponseCode::BadRequest); } - Request.WriteResponseAsync( - [this, RpcRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) { - const std::string_view Method = RpcRequest["Method"sv].AsString(); - if (Method == "GetCacheRecords"sv) - { - HandleRpcGetCacheRecords(AsyncRequest, RpcRequest); - } - else if (Method == "GetCachePayloads"sv) - { - HandleRpcGetCachePayloads(AsyncRequest, RpcRequest); - } - else - { - AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); - } - }); + Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable { + CbPackage Package; + CbObjectView Object; + CbObject ObjectBuffer; + if (ContentType == HttpContentType::kCbObject) + { + ObjectBuffer = zen::LoadCompactBinaryObject(std::move(Body)); + Object = ObjectBuffer; + } + else + { + Package = ParsePackageMessage(Body); + Object = Package.GetObject(); + } + const std::string_view Method = Object["Method"sv].AsString(); + if (Method == "PutCacheRecords"sv) + { + HandleRpcPutCacheRecords(AsyncRequest, Package); + } + else if (Method == "GetCacheRecords"sv) + { + HandleRpcGetCacheRecords(AsyncRequest, Object); + } + else if (Method == "GetCacheValues"sv) + { + HandleRpcGetCacheValues(AsyncRequest, Object); + } + else + { + AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + } + }); } break; default: @@ -805,25 +847,154 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) } void +HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest) +{ + ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); + CbObjectView BatchObject = BatchRequest.GetObject(); + + CbObjectView Params = BatchObject["Params"sv].AsObjectView(); + CachePolicy DefaultPolicy; + + ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); + + std::string_view PolicyText = Params["DefaultPolicy"].AsString(); + DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::vector<bool> Results; + for (CbFieldView RequestField : Params["Requests"sv]) + { + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView(); + CbObjectView KeyView = RecordObject["Key"sv].AsObjectView(); + CbFieldView BucketField = KeyView["Bucket"sv]; + CbFieldView HashField = KeyView["Hash"sv]; + CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); + if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + CacheRecordPolicy Policy = CacheRecordPolicy::Load(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); + PutRequestData PutRequest{std::move(Key), RecordObject, std::move(Policy)}; + + PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); + + if (Result == PutResult::Invalid) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + Results.push_back(Result == PutResult::Success); + } + if (Results.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + CbObjectWriter ResponseObject; + ResponseObject.BeginArray("Result"sv); + for (bool Value : Results) + { + ResponseObject.AddBool(Value); + } + ResponseObject.EndArray(); + + CbPackage RpcResponse; + RpcResponse.SetObject(ResponseObject.Save()); + + BinaryWriter MemStream; + RpcResponse.Save(MemStream); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} + +HttpStructuredCacheService::PutResult +HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) +{ + std::vector<IoHash> ValidAttachments; + AttachmentCount Count; + CbObjectView Record = Request.RecordObject; + uint64_t RecordObjectSize = Record.GetSize(); + uint64_t TransferredSize = RecordObjectSize; + + Request.RecordObject.IterateAttachments([this, &Request, Package, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) + { + if (Attachment->IsCompressedBinary()) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + + ValidAttachments.emplace_back(InsertResult.DecompressedId); + + if (InsertResult.New) + { + Count.New++; + } + Count.Valid++; + TransferredSize += Chunk.GetCompressedSize(); + } + else + { + ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Request.Key.Bucket, + Request.Key.Hash, + ToString(HttpContentType::kCbPackage), + ValueHash); + Count.Invalid++; + } + } + else if (m_CidStore.ContainsChunk(ValueHash)) + { + ValidAttachments.emplace_back(ValueHash); + Count.Valid++; + } + Count.Total++; + }); + + if (Count.Invalid > 0) + { + return PutResult::Invalid; + } + + ZEN_DEBUG("PUT - '{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)", + Request.Key.Bucket, + Request.Key.Hash, + NiceBytes(TransferredSize), + Count.New, + Count.Valid, + Count.Total); + + ZenCacheValue CacheValue; + CacheValue.Value = IoBuffer(Record.GetSize()); + Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); + CacheValue.Value.SetContentType(ZenContentType::kCbObject); + m_CacheStore.Put(Request.Key.Bucket, Request.Key.Hash, CacheValue); + + const bool IsPartialRecord = Count.Valid != Count.Total; + + if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) + { + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)}); + } + return PutResult::Success; +} + +void HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest) { ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); CbPackage RpcResponse; - CacheRecordPolicy Policy; - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + CacheRecordPolicy BatchPolicy = CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView()); std::vector<CacheKey> CacheKeys; std::vector<IoBuffer> CacheValues; std::vector<size_t> UpstreamRequests; ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); - CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy); - - const bool PartialOnError = Policy.HasRecordPolicy(CachePolicy::PartialOnError); - const bool SkipAttachments = Policy.HasRecordPolicy(CachePolicy::SkipAttachments); - const bool QueryRemote = Policy.HasRecordPolicy(CachePolicy::QueryRemote); - for (CbFieldView KeyView : Params["CacheKeys"sv]) { CbObjectView KeyObject = KeyView.AsObjectView(); @@ -840,54 +1011,84 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys) { ZenCacheValue CacheValue; - uint32_t MissingCount = 0; + uint32_t MissingCount = 0; + uint32_t MissingReadFromUpstreamCount = 0; - if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) + if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) { CbObjectView CacheRecord(CacheValue.Value.Data()); - CacheRecord.IterateAttachments([this, SkipAttachments, &MissingCount, &RpcResponse](CbFieldView AttachmentHash) { - if (SkipAttachments && MissingCount == 0) - { - if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) + CacheRecord.IterateAttachments( + [this, &MissingCount, &MissingReadFromUpstreamCount, &RpcResponse, &BatchPolicy](CbFieldView AttachmentHash) { + CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy(); + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal)) { - MissingCount++; + // A value that is requested without the Query flag (such as None/Disable) does not count as missing, because we + // didn't ask for it and thus the record is complete in its absence. + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + MissingReadFromUpstreamCount++; + MissingCount++; + } } - } - else - { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { - ZEN_ASSERT(Chunk.GetSize() > 0); - RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) + { + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + MissingReadFromUpstreamCount++; + } + MissingCount++; + } } else { - MissingCount++; + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + ZEN_ASSERT(Chunk.GetSize() > 0); + RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + } + else + { + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + MissingReadFromUpstreamCount++; + } + MissingCount++; + } } - } - }); + }); } - if (CacheValue.Value && (MissingCount == 0 || PartialOnError)) + if ((!CacheValue.Value && EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryRemote)) || + MissingReadFromUpstreamCount != 0) + { + UpstreamRequests.push_back(KeyIndex); + } + else if (CacheValue.Value && (MissingCount == 0 || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) { ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL) {}", Key.Bucket, Key.Hash, NiceBytes(CacheValue.Value.Size()), ToString(CacheValue.Value.GetContentType()), - MissingCount ? "(PARTIAl)" : ""sv); + MissingCount ? "(PARTIAL)" : ""sv); CacheValues[KeyIndex] = std::move(CacheValue.Value); m_CacheStats.HitCount++; } - else if (QueryRemote) - { - UpstreamRequests.push_back(KeyIndex); - } else { - ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAl)"sv : ""sv); - m_CacheStats.MissCount++; + if (!EnumHasAnyFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::Query)) + { + // If they requested no query, do not record this as a miss + ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash); + } + else + { + ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAL)"sv : ""sv); + m_CacheStats.MissCount++; + } } ++KeyIndex; @@ -895,82 +1096,95 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req if (!UpstreamRequests.empty()) { - const auto OnCacheRecordGetComplete = - [this, &CacheValues, &RpcResponse, PartialOnError, SkipAttachments](CacheRecordGetCompleteParams&& Params) { - ZEN_ASSERT(Params.KeyIndex < CacheValues.size()); + const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, &BatchPolicy](CacheRecordGetCompleteParams&& Params) { + ZEN_ASSERT(Params.KeyIndex < CacheValues.size()); - IoBuffer CacheValue; - AttachmentCount Count; + IoBuffer CacheValue; + AttachmentCount Count; - if (Params.Record) - { - Params.Record.IterateAttachments([this, &RpcResponse, SkipAttachments, &Params, &Count](CbFieldView HashView) { + if (Params.Record) + { + Params.Record.IterateAttachments([this, &RpcResponse, &Params, &Count, &BatchPolicy](CbFieldView HashView) { + CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy(); + bool FoundInUpstream = false; + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash())) { + FoundInUpstream = true; if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { - auto InsertResult = m_CidStore.AddChunk(Compressed); - if (InsertResult.New) + FoundInUpstream = true; + if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) { - Count.New++; + auto InsertResult = m_CidStore.AddChunk(Compressed); + if (InsertResult.New) + { + Count.New++; + } } Count.Valid++; - if (!SkipAttachments) + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { RpcResponse.AddAttachment(CbAttachment(Compressed)); } } else { - ZEN_DEBUG("Uncompressed payload '{}' from upstream cache record '{}/{}'", + ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'", HashView.AsHash(), Params.Key.Bucket, Params.Key.Hash); Count.Invalid++; } } - else if (m_CidStore.ContainsChunk(HashView.AsHash())) - { - Count.Valid++; - } - Count.Total++; - }); - - if ((Count.Valid == Count.Total) || PartialOnError) + } + if (!FoundInUpstream && EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal) && + m_CidStore.ContainsChunk(HashView.AsHash())) { - CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer(); + // We added the attachment for this Value in the local loop before calling m_UpstreamCache + Count.Valid++; } - } + Count.Total++; + }); - if (CacheValue) + if ((Count.Valid == Count.Total) || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)) { - ZEN_DEBUG("HIT - '{}/{}' {} '{}' attachments '{}/{}/{}' (new/valid/total) (UPSTREAM)", - Params.Key.Bucket, - Params.Key.Hash, - NiceBytes(CacheValue.GetSize()), - ToString(HttpContentType::kCbPackage), - Count.New, - Count.Valid, - Count.Total); - - CacheValue.SetContentType(ZenContentType::kCbObject); - - CacheValues[Params.KeyIndex] = CacheValue; - m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue}); - - m_CacheStats.HitCount++; - m_CacheStats.UpstreamHitCount++; + CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer(); } - else + } + + if (CacheValue) + { + ZEN_DEBUG("HIT - '{}/{}' {} '{}' attachments '{}/{}/{}' (new/valid/total) (UPSTREAM)", + Params.Key.Bucket, + Params.Key.Hash, + NiceBytes(CacheValue.GetSize()), + ToString(HttpContentType::kCbPackage), + Count.New, + Count.Valid, + Count.Total); + + CacheValue.SetContentType(ZenContentType::kCbObject); + CacheValues[Params.KeyIndex] = CacheValue; + if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) { - const bool IsPartial = Count.Valid != Count.Total; - ZEN_DEBUG("MISS - '{}/{}' {}", Params.Key.Bucket, Params.Key.Hash, IsPartial ? "(partial)"sv : ""sv); - m_CacheStats.MissCount++; + m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue}); } - }; - m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete)); + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount++; + } + else + { + const bool IsPartial = Count.Valid != Count.Total; + ZEN_DEBUG("MISS - '{}/{}' {}", Params.Key.Bucket, Params.Key.Hash, IsPartial ? "(partial)"sv : ""sv); + m_CacheStats.MissCount++; + } + }; + + m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, BatchPolicy, std::move(OnCacheRecordGetComplete)); } CbObjectWriter ResponseObject; @@ -1001,11 +1215,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req } void -HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView RpcRequest) +HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView RpcRequest) { - ZEN_TRACE_CPU("Z$::RpcGetCachePayloads"); + ZEN_TRACE_CPU("Z$::RpcGetCacheValues"); - ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCachePayloads"sv); + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); std::vector<CacheChunkRequest> ChunkRequests; std::vector<size_t> UpstreamRequests; @@ -1014,19 +1228,20 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re for (CbFieldView RequestView : Params["ChunkRequests"sv]) { - CbObjectView RequestObject = RequestView.AsObjectView(); - CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); - const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash(); - const Oid PayloadId = RequestObject["PayloadId"sv].AsObjectId(); - const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64(); - const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); - const uint32_t ChunkPolicy = RequestObject["Policy"sv].AsUInt32(); + CbObjectView RequestObject = RequestView.AsObjectView(); + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); + const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash(); + const Oid ValueId = RequestObject["ValueId"sv].AsObjectId(); + const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64(); + const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); + std::string_view PolicyText = RequestObject["Policy"sv].AsString(); + const CachePolicy ChunkPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; // Note we could use emplace_back here but [Apple] LLVM-12's C++ library // can't infer a constructor like other platforms (or can't handle an // initializer list like others do). - ChunkRequests.push_back({Key, ChunkId, PayloadId, RawOffset, RawSize, static_cast<CachePolicy>(ChunkPolicy)}); + ChunkRequests.push_back({Key, ChunkId, ValueId, RawOffset, RawSize, ChunkPolicy}); } if (ChunkRequests.empty()) @@ -1036,20 +1251,20 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re Chunks.resize(ChunkRequests.size()); - // Unreal uses a 12 byte ID to address cache record payloads. When the uncompressed hash (ChunkId) - // is missing, load the cache record and try to find the raw hash from the payload ID. + // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) + // is missing, load the cache record and try to find the raw hash from the ValueId. { - const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash { - if (PayloadId) + const auto GetChunkIdFromValueId = [](CbObjectView Record, const Oid& ValueId) -> IoHash { + if (ValueId) { - // A valid PayloadId indicates that the caller is searching for a Payload in a Record + // A valid ValueId indicates that the caller is searching for a Value in a Record // that was Put with ICacheStore::Put for (CbFieldView ValueView : Record["Values"sv]) { CbObjectView ValueObject = ValueView.AsObjectView(); const Oid Id = ValueObject["Id"sv].AsObjectId(); - if (Id == PayloadId) + if (Id == ValueId) { return ValueObject["RawHash"sv].AsHash(); } @@ -1059,7 +1274,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re if (CbObjectView ValueObject = Record["Value"sv].AsObjectView()) { const Oid Id = ValueObject["Id"sv].AsObjectId(); - if (Id == PayloadId) + if (Id == ValueId) { return ValueObject["RawHash"sv].AsHash(); } @@ -1070,7 +1285,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re CbObjectView AttachmentObject = AttachmentView.AsObjectView(); const Oid Id = AttachmentObject["Id"sv].AsObjectId(); - if (Id == PayloadId) + if (Id == ValueId) { return AttachmentObject["RawHash"sv].AsHash(); } @@ -1079,7 +1294,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re } else { - // An invalid PayloadId indicates that the caller is requesting a Value that + // An invalid ValueId indicates that the caller is requesting a Value that // was Put with ICacheStore::PutValue return Record["RawHash"sv].AsHash(); } @@ -1108,15 +1323,15 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re if (CurrentRecordBuffer) { - ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId); + ChunkRequest.ChunkId = GetChunkIdFromValueId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.ValueId); } } } for (size_t RequestIndex = 0; const CacheChunkRequest& ChunkRequest : ChunkRequests) { - const bool QueryLocal = (ChunkRequest.Policy & CachePolicy::QueryLocal) == CachePolicy::QueryLocal; - const bool QueryRemote = (ChunkRequest.Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote; + const bool QueryLocal = EnumHasAllFlags(ChunkRequest.Policy, CachePolicy::QueryLocal); + const bool QueryRemote = EnumHasAllFlags(ChunkRequest.Policy, CachePolicy::QueryRemote); if (QueryLocal) { @@ -1155,8 +1370,8 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re if (!UpstreamRequests.empty()) { - const auto OnCachePayloadGetComplete = [this, &Chunks](CachePayloadGetCompleteParams&& Params) { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload))) + const auto OnCacheValueGetComplete = [this, &Chunks](CacheValueGetCompleteParams&& Params) { + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value))) { m_CidStore.AddChunk(Compressed); @@ -1164,11 +1379,11 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re Params.Request.Key.Bucket, Params.Request.Key.Hash, Params.Request.ChunkId, - NiceBytes(Params.Payload.GetSize()), + NiceBytes(Params.Value.GetSize()), "UPSTREAM"); ZEN_ASSERT(Params.RequestIndex < Chunks.size()); - Chunks[Params.RequestIndex] = std::move(Params.Payload); + Chunks[Params.RequestIndex] = std::move(Params.Value); m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; @@ -1180,7 +1395,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re } }; - m_UpstreamCache.GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete)); + m_UpstreamCache.GetCacheValues(ChunkRequests, UpstreamRequests, std::move(OnCacheValueGetComplete)); } CbPackage RpcResponse; diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 1bf3940e7..88bf6cda1 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -19,6 +19,7 @@ namespace zen { class CasStore; class CidStore; class CbObjectView; +struct PutRequestData; class ScrubContext; class UpstreamCache; class ZenCacheStore; @@ -41,7 +42,7 @@ enum class CachePolicy : uint32_t; * * Additionally, attachments may be addressed as: * - * {BucketId}/{KeyHash}/{PayloadHash} + * {BucketId}/{KeyHash}/{ValueHash} * * Where the two initial components are the same as for the main endpoint * @@ -73,7 +74,7 @@ private: { std::string BucketSegment; IoHash HashKey; - IoHash PayloadId; + IoHash ValueContentId; }; struct CacheStats @@ -82,20 +83,28 @@ private: std::atomic_uint64_t UpstreamHitCount{}; std::atomic_uint64_t MissCount{}; }; + enum class PutResult + { + Success, + Fail, + Invalid, + }; [[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef); - void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); - void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); - void HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); - void HandleCachePayloadRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); - void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); - void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); + void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); + void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); + void HandleCacheValueRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); + void HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); + void HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); void HandleRpcRequest(zen::HttpServerRequest& Request); + void HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest); void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest); - void HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest); + void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; + PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 2bba2e8e6..de5bccc3a 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -614,6 +614,10 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& { EntryFlags |= DiskLocation::kStructured; } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } RwLock::ExclusiveLockScope _(m_IndexLock); diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index 8b015d0fb..4f162c0ba 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -84,11 +84,12 @@ struct DiskLocation } static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; - static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; + static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize) static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; - static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; - static const uint64_t kStructured = 0x4000'0000'0000'0000ull; - static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; + static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file + static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary + static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value + static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } @@ -103,6 +104,11 @@ struct DiskLocation { ContentType = ZenContentType::kCbObject; } + + if (IsFlagSet(DiskLocation::kCompressed)) + { + ContentType = ZenContentType::kCompressedBinary; + } return ContentType; } diff --git a/zenserver/config.cpp b/zenserver/config.cpp index a4439d914..6fd1c3bea 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -91,7 +91,15 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) const char* DefaultHttp = "asio"; #endif - std::string DataDir; + // Note to those adding future options; std::filesystem::path-type options + // must be read into a std::string first. As of cxxopts-3.0.0 it uses a >> + // stream operator to convert argv value into the options type. std::fs::path + // expects paths in streams to be quoted but argv paths are unquoted. By + // going into a std::string first, paths with whitespace parse correctly. + std::string DataDir; + std::string ContentDir; + std::string AbsLogFile; + std::string ConfigFile; cxxopts::Options options("zenserver", "Zen Server"); options.add_options()("dedicated", @@ -102,9 +110,9 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) options.add_options()("t, test", "Enable test mode", cxxopts::value<bool>(ServerOptions.IsTest)->default_value("false")); options.add_options()("log-id", "Specify id for adding context to log output", cxxopts::value<std::string>(ServerOptions.LogId)); options.add_options()("data-dir", "Specify persistence root", cxxopts::value<std::string>(DataDir)); - options.add_options()("content-dir", "Frontend content directory", cxxopts::value<std::filesystem::path>(ServerOptions.ContentDir)); - options.add_options()("abslog", "Path to log file", cxxopts::value<std::filesystem::path>(ServerOptions.AbsLogFile)); - options.add_options()("config", "Path to Lua config file", cxxopts::value<std::filesystem::path>(ServerOptions.ConfigFile)); + options.add_options()("content-dir", "Frontend content directory", cxxopts::value<std::string>(ContentDir)); + options.add_options()("abslog", "Path to log file", cxxopts::value<std::string>(AbsLogFile)); + options.add_options()("config", "Path to Lua config file", cxxopts::value<std::string>(ConfigFile)); options.add_options()("no-sentry", "Disable Sentry crash handler", cxxopts::value<bool>(ServerOptions.NoSentry)->default_value("false")); @@ -330,6 +338,10 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) exit(0); } + ServerOptions.DataDir = DataDir; + ServerOptions.ContentDir = ContentDir; + ServerOptions.AbsLogFile = AbsLogFile; + ServerOptions.ConfigFile = ConfigFile; ServerOptions.UpstreamCacheConfig.CachePolicy = ParseUpstreamCachePolicy(UpstreamCachePolicyOptions); if (!ServerOptions.ConfigFile.empty()) @@ -468,6 +480,9 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio if (auto JupiterConfig = UpstreamConfig->get<sol::optional<sol::table>>("jupiter")) { UpdateStringValueFromConfig(JupiterConfig.value(), + std::string_view("name"), + ServerOptions.UpstreamCacheConfig.JupiterConfig.Name); + UpdateStringValueFromConfig(JupiterConfig.value(), std::string_view("url"), ServerOptions.UpstreamCacheConfig.JupiterConfig.Url); UpdateStringValueFromConfig(JupiterConfig.value(), @@ -496,6 +511,8 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio if (auto ZenConfig = UpstreamConfig->get<sol::optional<sol::table>>("zen")) { + ServerOptions.UpstreamCacheConfig.ZenConfig.Name = ZenConfig.value().get_or("name", std::string("Zen")); + if (auto Url = ZenConfig.value().get<sol::optional<std::string>>("url")) { ServerOptions.UpstreamCacheConfig.ZenConfig.Urls.push_back(Url.value()); diff --git a/zenserver/config.h b/zenserver/config.h index 2c31e7bd9..e38b5c704 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -21,6 +21,7 @@ struct ZenUpstreamJupiterConfig { + std::string Name; std::string Url; std::string OAuthProvider; std::string OAuthClientId; @@ -34,6 +35,7 @@ struct ZenUpstreamJupiterConfig struct ZenUpstreamZenConfig { + std::string Name; std::vector<std::string> Urls; std::vector<std::string> Dns; }; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 68e7edfab..206787bf7 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -272,14 +272,14 @@ namespace detail { return Result; } - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey&, const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::Horde::GetSingleCachePayload"); + ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue"); try { CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId); + const CloudCacheResult Result = Session.GetCompressedBlob(ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -303,11 +303,11 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCachePayloadGetComplete&& OnComplete) override final + virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCacheValueGetComplete&& OnComplete) override final { - ZEN_TRACE_CPU("Upstream::Horde::GetCachePayloads"); + ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues"); CloudCacheSession Session(m_Client); GetUpstreamCacheResult Result; @@ -327,7 +327,7 @@ namespace detail { m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); } - OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload}); + OnComplete({.Request = Request, .RequestIndex = Index, .Value = Payload}); } return Result; @@ -335,11 +335,11 @@ namespace detail { virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, - std::span<IoBuffer const> Payloads) override + std::span<IoBuffer const> Values) override { ZEN_TRACE_CPU("Upstream::Horde::PutCacheRecord"); - ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); + ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size()); const int32_t MaxAttempts = 3; try @@ -373,30 +373,31 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - const auto PutBlobs = [&](std::span<IoHash> PayloadIds, std::string& OutReason) -> bool { - for (const IoHash& PayloadId : PayloadIds) + const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { + for (const IoHash& ValueContentId : ValueContentIds) { - const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId); + const auto It = + std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId); - if (It == std::end(CacheRecord.PayloadIds)) + if (It == std::end(CacheRecord.ValueContentIds)) { - OutReason = fmt::format("payload '{}' MISSING from local cache", PayloadId); + OutReason = fmt::format("value '{}' MISSING from local cache", ValueContentId); return false; } - const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It); + const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It); CloudCacheResult BlobResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { - BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + BlobResult = Session.PutCompressedBlob(CacheRecord.ValueContentIds[Idx], Values[Idx]); } m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); if (!BlobResult.Success) { - OutReason = fmt::format("upload payload '{}' FAILED, reason '{}'", PayloadId, BlobResult.Reason); + OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason); return false; } @@ -475,7 +476,7 @@ namespace detail { Sb << MissingHash.ToHexString() << ","; } - return {.Reason = fmt::format("finalize '{}/{}' FAILED, still needs payload(s) '{}'", + return {.Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Sb.ToString()), @@ -538,10 +539,12 @@ namespace detail { public: ZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options) : m_Log(zen::logging::Get("upstream")) - , m_Info({.Name = std::string(Options.Name)}) , m_ConnectTimeout(Options.ConnectTimeout) , m_Timeout(Options.Timeout) { + ZEN_ASSERT(!Options.Name.empty()); + m_Info.Name = Options.Name; + for (const auto& Url : Options.Urls) { m_Endpoints.push_back({.Url = Url}); @@ -649,9 +652,8 @@ namespace detail { } BatchRequest.EndArray(); - BatchRequest.BeginObject("Policy"sv); - CacheRecordPolicy::Save(Policy, BatchRequest); - BatchRequest.EndObject(); + BatchRequest.SetName("Policy"sv); + Policy.Save(BatchRequest); } BatchRequest.EndObject(); @@ -687,14 +689,14 @@ namespace detail { return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::Zen::GetSingleCachePayload"); + ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue"); try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId); + const ZenCacheResult Result = Session.GetCacheValue(CacheKey.Bucket, CacheKey.Hash, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -718,18 +720,18 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCachePayloadGetComplete&& OnComplete) override final + virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCacheValueGetComplete&& OnComplete) override final { - ZEN_TRACE_CPU("Upstream::Zen::GetCachePayloads"); + ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues"); std::vector<size_t> IndexMap; IndexMap.reserve(RequestIndex.size()); CbObjectWriter BatchRequest; BatchRequest << "Method"sv - << "GetCachePayloads"; + << "GetCacheValues"; BatchRequest.BeginObject("Params"sv); { @@ -746,12 +748,11 @@ namespace detail { BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); - - BatchRequest.AddObjectId("PayloadId"sv, Request.PayloadId); + BatchRequest.AddObjectId("ValueId"sv, Request.ValueId); BatchRequest << "ChunkId"sv << Request.ChunkId; BatchRequest << "RawOffset"sv << Request.RawOffset; BatchRequest << "RawSize"sv << Request.RawSize; - BatchRequest << "Policy"sv << static_cast<uint32_t>(Request.Policy); + BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView(); } BatchRequest.EndObject(); } @@ -787,7 +788,7 @@ namespace detail { } } - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)}); + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = std::move(Payload)}); } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; @@ -796,7 +797,7 @@ namespace detail { for (size_t Index : RequestIndex) { - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()}); } return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; @@ -804,11 +805,11 @@ namespace detail { virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, - std::span<IoBuffer const> Payloads) override + std::span<IoBuffer const> Values) override { ZEN_TRACE_CPU("Upstream::Zen::PutCacheRecord"); - ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); + ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size()); const int32_t MaxAttempts = 3; try @@ -823,15 +824,15 @@ namespace detail { CbPackage Package; Package.SetObject(CbObject(SharedBuffer(RecordValue))); - for (const IoBuffer& Payload : Payloads) + for (const IoBuffer& Value : Values) { - if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload))) + if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Value))) { Package.AddAttachment(CbAttachment(AttachmentBuffer)); } else { - return {.Reason = std::string("invalid payload buffer"), .Success = false}; + return {.Reason = std::string("invalid value buffer"), .Success = false}; } } @@ -851,15 +852,15 @@ namespace detail { } else { - for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) + for (size_t Idx = 0, Count = Values.size(); Idx < Count; Idx++) { Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCachePayload(CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - CacheRecord.PayloadIds[Idx], - Payloads[Idx]); + Result = Session.PutCacheValue(CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + CacheRecord.ValueContentIds[Idx], + Values[Idx]); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -869,7 +870,7 @@ namespace detail { if (!Result.Success) { - return {.Reason = "Failed to upload payload", + return {.Reason = "Failed to upload value", .Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = false}; @@ -1049,7 +1050,7 @@ public: virtual void GetCacheRecords(std::span<CacheKey> CacheKeys, std::span<size_t> KeyIndex, - const CacheRecordPolicy& Policy, + const CacheRecordPolicy& DownstreamPolicy, OnCacheRecordGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheRecords"); @@ -1060,6 +1061,8 @@ public: if (m_Options.ReadUpstream) { + CacheRecordPolicy UpstreamPolicy = DownstreamPolicy.ConvertToUpstream(); + for (auto& Endpoint : m_Endpoints) { if (RemainingKeys.empty()) @@ -1078,18 +1081,19 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) { - if (Params.Record) - { - OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); + Result = + Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, UpstreamPolicy, [&](CacheRecordGetCompleteParams&& Params) { + if (Params.Record) + { + OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); - Stats.CacheHitCount.Increment(1); - } - else - { - Missing.push_back(Params.KeyIndex); - } - }); + Stats.CacheHitCount.Increment(1); + } + else + { + Missing.push_back(Params.KeyIndex); + } + }); } Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size())); @@ -1115,11 +1119,11 @@ public: } } - virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCachePayloadGetComplete&& OnComplete) override final + virtual void GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCacheValueGetComplete&& OnComplete) override final { - ZEN_TRACE_CPU("Upstream::GetCachePayloads"); + ZEN_TRACE_CPU("Upstream::GetCacheValues"); std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); @@ -1145,10 +1149,10 @@ public: { metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); - Result = Endpoint->GetCachePayloads(CacheChunkRequests, RemainingKeys, [&](CachePayloadGetCompleteParams&& Params) { - if (Params.Payload) + Result = Endpoint->GetCacheValues(CacheChunkRequests, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { + if (Params.Value) { - OnComplete(std::forward<CachePayloadGetCompleteParams>(Params)); + OnComplete(std::forward<CacheValueGetCompleteParams>(Params)); Stats.CacheHitCount.Increment(1); } @@ -1166,7 +1170,7 @@ public: { Stats.CacheErrorCount.Increment(1); - ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + ZEN_ERROR("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); @@ -1178,13 +1182,13 @@ public: for (size_t Index : RemainingKeys) { - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()}); } } - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::GetCachePayload"); + ZEN_TRACE_CPU("Upstream::GetCacheValue"); if (m_Options.ReadUpstream) { @@ -1200,7 +1204,7 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCachePayload(CacheKey, PayloadId); + Result = Endpoint->GetCacheValue(CacheKey, ValueContentId); } Stats.CacheGetCount.Increment(1); @@ -1217,7 +1221,7 @@ public: { Stats.CacheErrorCount.Increment(1); - ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'", + ZEN_ERROR("get cache value FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); @@ -1302,18 +1306,18 @@ private: return; } - for (const IoHash& PayloadId : CacheRecord.PayloadIds) + for (const IoHash& ValueContentId : CacheRecord.ValueContentIds) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId)) + if (IoBuffer Payload = m_CidStore.FindChunkByCid(ValueContentId)) { Payloads.push_back(Payload); } else { - ZEN_WARN("process upstream FAILED, '{}/{}/{}', payload doesn't exist in CAS", + ZEN_WARN("process upstream FAILED, '{}/{}/{}', ValueContentId doesn't exist in CAS", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, - PayloadId); + ValueContentId); return; } } diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 805786cdf..5bc9f58d7 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -28,7 +28,7 @@ struct UpstreamCacheRecord { ZenContentType Type = ZenContentType::kBinary; CacheKey Key; - std::vector<IoHash> PayloadIds; + std::vector<IoHash> ValueContentIds; }; struct UpstreamCacheOptions @@ -74,14 +74,14 @@ struct CacheRecordGetCompleteParams using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>; -struct CachePayloadGetCompleteParams +struct CacheValueGetCompleteParams { const CacheChunkRequest& Request; size_t RequestIndex{~size_t(0)}; - IoBuffer Payload; + IoBuffer Value; }; -using OnCachePayloadGetComplete = std::function<void(CachePayloadGetCompleteParams&&)>; +using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>; struct UpstreamEndpointStats { @@ -157,11 +157,11 @@ public: const CacheRecordPolicy& Policy, OnCacheRecordGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; - virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCachePayloadGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCacheValueGetComplete&& OnComplete) = 0; virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, @@ -191,11 +191,11 @@ public: const CacheRecordPolicy& RecordPolicy, OnCacheRecordGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) = 0; - virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCachePayloadGetComplete&& OnComplete) = 0; + virtual void GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCacheValueGetComplete&& OnComplete) = 0; virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index cd7f48334..a2666ac02 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -433,10 +433,10 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas } ZenCacheResult -ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId) +ZenStructuredCacheSession::GetCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); @@ -486,10 +486,10 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas } ZenCacheResult -ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload) +ZenStructuredCacheSession::PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index c2be2165a..8cc4c121d 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -123,9 +123,9 @@ public: ZenCacheResult CheckHealth(); ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type); - ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId); + ZenCacheResult GetCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId); ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type); - ZenCacheResult PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload); + ZenCacheResult PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload); ZenCacheResult InvokeRpc(const CbObjectView& Request); private: diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index e96a4ceaa..bd4a3c1cf 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -156,7 +156,7 @@ namespace utils { class ZenServer : public IHttpStatusProvider { public: - void Initialize(const ZenServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry) + int Initialize(const ZenServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry) { m_UseSentry = ServerOptions.NoSentry == false; m_ServerEntry = ServerEntry; @@ -201,8 +201,8 @@ public: // Ok so now we're configured, let's kick things off - m_Http = zen::CreateHttpServer(ServerOptions.HttpServerClass); - m_Http->Initialize(ServerOptions.BasePort); + m_Http = zen::CreateHttpServer(ServerOptions.HttpServerClass); + int EffectiveBasePort = m_Http->Initialize(ServerOptions.BasePort); m_AuthMgr = MakeAuthMgr({.RootDirectory = m_DataRoot / "auth"}); m_AuthService = std::make_unique<zen::HttpAuthService>(*m_AuthMgr); @@ -267,7 +267,7 @@ public: #if ZEN_ENABLE_MESH if (ServerOptions.MeshEnabled) { - StartMesh(BasePort); + StartMesh(EffectiveBasePort); } else { @@ -314,6 +314,8 @@ public: .Enabled = ServerOptions.GcConfig.Enabled, }; m_GcScheduler.Initialize(GcConfig); + + return EffectiveBasePort; } void InitializeState(const ZenServerOptions& ServerOptions); @@ -684,7 +686,6 @@ void ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) { using namespace std::literals; - auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; }; ZEN_INFO("instantiating structured cache service"); m_CacheStore = std::make_unique<ZenCacheStore>(m_CasGc, m_DataRoot / "cache"); @@ -728,8 +729,10 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) if (!ZenUrls.empty()) { + const auto ZenEndpointName = UpstreamConfig.ZenConfig.Name.empty() ? "Zen"sv : UpstreamConfig.ZenConfig.Name; + std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = - zen::MakeZenUpstreamEndpoint({.Name = "Zen"sv, + zen::MakeZenUpstreamEndpoint({.Name = ZenEndpointName, .Urls = ZenUrls, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}); @@ -743,7 +746,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) if (UpstreamConfig.JupiterConfig.UseProductionSettings) { Options = - zen::CloudCacheClientOptions{.Name = "Horde"sv, + zen::CloudCacheClientOptions{.Name = "Jupiter-Prod"sv, .ServiceUrl = "https://jupiter.devtools.epicgames.com"sv, .DdcNamespace = "ue.ddc"sv, .BlobStoreNamespace = "ue.ddc"sv, @@ -757,7 +760,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings) { Options = - zen::CloudCacheClientOptions{.Name = "Horde"sv, + zen::CloudCacheClientOptions{.Name = "Jupiter-Dev"sv, .ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv, .DdcNamespace = "ue.ddc"sv, .BlobStoreNamespace = "ue.ddc"sv, @@ -768,14 +771,23 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds), .UseLegacyDdc = false}; } + else + { + const auto JupiterEndpointName = + UpstreamConfig.JupiterConfig.Name.empty() ? "Jupiter"sv : UpstreamConfig.JupiterConfig.Name; - Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl); - Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace); - Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace); - Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider); - Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId); - Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret); - Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc; + Options = + zen::CloudCacheClientOptions{.Name = JupiterEndpointName, + .ServiceUrl = UpstreamConfig.JupiterConfig.Url, + .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace, + .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace, + .OAuthProvider = UpstreamConfig.JupiterConfig.OAuthProvider, + .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId, + .OAuthSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds), + .UseLegacyDdc = false}; + } if (!Options.ServiceUrl.empty()) { @@ -885,6 +897,20 @@ ZenEntryPoint::Run() Entry->AddSponsorProcess(ServerOptions.OwnerPid); } + ZenServer Server; + Server.SetDataRoot(ServerOptions.DataDir); + Server.SetContentRoot(ServerOptions.ContentDir); + Server.SetTestMode(ServerOptions.IsTest); + Server.SetDedicatedMode(ServerOptions.IsDedicated); + int EffectiveBasePort = Server.Initialize(ServerOptions, Entry); + + Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); + if (EffectiveBasePort != ServerOptions.BasePort) + { + ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); + ServerOptions.BasePort = EffectiveBasePort; + } + std::unique_ptr<std::thread> ShutdownThread; std::unique_ptr<zen::NamedEvent> ShutdownEvent; @@ -892,13 +918,6 @@ ZenEntryPoint::Run() ShutdownEventName << "Zen_" << ServerOptions.BasePort << "_Shutdown"; ShutdownEvent.reset(new zen::NamedEvent{ShutdownEventName}); - ZenServer Server; - Server.SetDataRoot(ServerOptions.DataDir); - Server.SetContentRoot(ServerOptions.ContentDir); - Server.SetTestMode(ServerOptions.IsTest); - Server.SetDedicatedMode(ServerOptions.IsDedicated); - Server.Initialize(ServerOptions, Entry); - // Monitor shutdown signals ShutdownThread.reset(new std::thread{[&] { diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp index 0e93f1c3d..7be93b4af 100644 --- a/zenstore/gc.cpp +++ b/zenstore/gc.cpp @@ -404,7 +404,10 @@ GcScheduler::Shutdown() m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped); m_GcSignal.notify_one(); - m_GcThread.join(); + if (m_GcThread.joinable()) + { + m_GcThread.join(); + } } } diff --git a/zenutil/cache/cachepolicy.cpp b/zenutil/cache/cachepolicy.cpp index e1c31d885..3bf7a0c67 100644 --- a/zenutil/cache/cachepolicy.cpp +++ b/zenutil/cache/cachepolicy.cpp @@ -4,64 +4,115 @@ #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> +#include <zencore/enumflags.h> #include <zencore/string.h> +#include <algorithm> +#include <unordered_map> + namespace zen { using namespace std::literals; -namespace detail { namespace cacheopt { - constexpr std::string_view Local = "local"sv; - constexpr std::string_view Remote = "remote"sv; - constexpr std::string_view Data = "data"sv; - constexpr std::string_view Meta = "meta"sv; - constexpr std::string_view Value = "value"sv; - constexpr std::string_view Attachments = "attachments"sv; -}} // namespace detail::cacheopt +namespace detail::CachePolicyImpl { + constexpr char DelimiterChar = ','; + constexpr std::string_view None = "None"sv; + constexpr std::string_view QueryLocal = "QueryLocal"sv; + constexpr std::string_view QueryRemote = "QueryRemote"sv; + constexpr std::string_view Query = "Query"sv; + constexpr std::string_view StoreLocal = "StoreLocal"sv; + constexpr std::string_view StoreRemote = "StoreRemote"sv; + constexpr std::string_view Store = "Store"sv; + constexpr std::string_view SkipMeta = "SkipMeta"sv; + constexpr std::string_view SkipData = "SkipData"sv; + constexpr std::string_view PartialRecord = "PartialRecord"sv; + constexpr std::string_view KeepAlive = "KeepAlive"sv; + constexpr std::string_view Local = "Local"sv; + constexpr std::string_view Remote = "Remote"sv; + constexpr std::string_view Default = "Default"sv; + constexpr std::string_view Disable = "Disable"sv; -CachePolicy -ParseQueryCachePolicy(std::string_view QueryPolicy, CachePolicy Default) -{ - if (QueryPolicy.empty()) - { - return Default; - } + using TextToPolicyMap = std::unordered_map<std::string_view, CachePolicy>; + const TextToPolicyMap TextToPolicy = {{None, CachePolicy::None}, + {QueryLocal, CachePolicy::QueryLocal}, + {QueryRemote, CachePolicy::QueryRemote}, + {Query, CachePolicy::Query}, + {StoreLocal, CachePolicy::StoreLocal}, + {StoreRemote, CachePolicy::StoreRemote}, + {Store, CachePolicy::Store}, + {SkipMeta, CachePolicy::SkipMeta}, + {SkipData, CachePolicy::SkipData}, + {PartialRecord, CachePolicy::PartialRecord}, + {KeepAlive, CachePolicy::KeepAlive}, + {Local, CachePolicy::Local}, + {Remote, CachePolicy::Remote}, + {Default, CachePolicy::Default}, + {Disable, CachePolicy::Disable}}; - CachePolicy Result = CachePolicy::None; + using PolicyTextPair = std::pair<CachePolicy, std::string_view>; + const PolicyTextPair FlagsToString[]{ + // Order of these Flags is important: we want the aliases before the atomic values, + // and the bigger aliases first, to reduce the number of tokens we add + {CachePolicy::Default, Default}, + {CachePolicy::Remote, Remote}, + {CachePolicy::Local, Local}, + {CachePolicy::Store, Store}, + {CachePolicy::Query, Query}, - ForEachStrTok(QueryPolicy, ',', [&Result](const std::string_view& Token) { - if (Token == detail::cacheopt::Local) - { - Result |= CachePolicy::QueryLocal; - } - if (Token == detail::cacheopt::Remote) + // Order of Atomics doesn't matter, so arbitrarily we list them in enum order + {CachePolicy::QueryLocal, QueryLocal}, + {CachePolicy::QueryRemote, QueryRemote}, + {CachePolicy::StoreLocal, StoreLocal}, + {CachePolicy::StoreRemote, StoreRemote}, + {CachePolicy::SkipMeta, SkipMeta}, + {CachePolicy::SkipData, SkipData}, + {CachePolicy::PartialRecord, PartialRecord}, + {CachePolicy::KeepAlive, KeepAlive}, + + // None must come at the end of the array, to write out only if no others exist + {CachePolicy::None, None}, + }; + constexpr CachePolicy KnownFlags = + CachePolicy::Default | CachePolicy::SkipMeta | CachePolicy::SkipData | CachePolicy::KeepAlive | CachePolicy::PartialRecord; +} // namespace detail::CachePolicyImpl + +StringBuilderBase& +AppendToBuilderImpl(StringBuilderBase& Builder, CachePolicy Policy) +{ + // Remove any bits we don't recognize; write None if there are not any bits we recognize + Policy = Policy & detail::CachePolicyImpl::KnownFlags; + for (const detail::CachePolicyImpl::PolicyTextPair& Pair : detail::CachePolicyImpl::FlagsToString) + { + if (EnumHasAllFlags(Policy, Pair.first)) { - Result |= CachePolicy::QueryRemote; + EnumRemoveFlags(Policy, Pair.first); + Builder << Pair.second << detail::CachePolicyImpl::DelimiterChar; + if (Policy == CachePolicy::None) + { + break; + } } - return true; - }); - - return Result; + } + Builder.RemoveSuffix(1); // Text will have been added by CachePolicy::None if not by anything else + return Builder; +} +StringBuilderBase& +operator<<(StringBuilderBase& Builder, CachePolicy Policy) +{ + return AppendToBuilderImpl(Builder, Policy); } CachePolicy -ParseStoreCachePolicy(std::string_view StorePolicy, CachePolicy Default) +ParseCachePolicy(std::string_view Text) { - if (StorePolicy.empty()) - { - return Default; - } + ZEN_ASSERT(!Text.empty()); // Empty string is not valid input to ParseCachePolicy CachePolicy Result = CachePolicy::None; - - ForEachStrTok(StorePolicy, ',', [&Result](const std::string_view& Token) { - if (Token == detail::cacheopt::Local) - { - Result |= CachePolicy::StoreLocal; - } - if (Token == detail::cacheopt::Remote) + ForEachStrTok(Text, detail::CachePolicyImpl::DelimiterChar, [&Result](const std::string_view& Token) { + auto it = detail::CachePolicyImpl::TextToPolicy.find(Token); + if (it != detail::CachePolicyImpl::TextToPolicy.end()) { - Result |= CachePolicy::StoreRemote; + Result |= it->second; } return true; }); @@ -69,101 +120,139 @@ ParseStoreCachePolicy(std::string_view StorePolicy, CachePolicy Default) return Result; } -CachePolicy -ParseSkipCachePolicy(std::string_view SkipPolicy, CachePolicy Default) -{ - if (SkipPolicy.empty()) +namespace Private { + + class CacheRecordPolicyShared final : public ICacheRecordPolicyShared { - return Default; - } + public: + inline std::span<const CacheValuePolicy> GetValuePolicies() const final { return Values; } - CachePolicy Result = CachePolicy::None; + inline void AddValuePolicy(const CacheValuePolicy& Policy) final { Values.push_back(Policy); } - ForEachStrTok(SkipPolicy, ',', [&Result](const std::string_view& Token) { - if (Token == detail::cacheopt::Meta) - { - Result |= CachePolicy::SkipMeta; - } - if (Token == detail::cacheopt::Value) + inline void Build() final { - Result |= CachePolicy::SkipValue; + std::sort(Values.begin(), Values.end(), [](const CacheValuePolicy& A, const CacheValuePolicy& B) { return A.Id < B.Id; }); } - if (Token == detail::cacheopt::Attachments) - { - Result |= CachePolicy::SkipAttachments; - } - if (Token == detail::cacheopt::Data) - { - Result |= CachePolicy::SkipData; - } - return true; - }); - return Result; -} + private: + std::vector<CacheValuePolicy> Values; + }; -CacheRecordPolicy::CacheRecordPolicy(const CachePolicy RecordPolicy, const CachePolicy PayloadPolicy) -: m_RecordPolicy(RecordPolicy) -, m_DefaultPayloadPolicy(PayloadPolicy) -{ -} +} // namespace Private CachePolicy -CacheRecordPolicy::GetPayloadPolicy(const Oid& PayloadId) const +CacheRecordPolicy::GetValuePolicy(const Oid& Id) const { - if (const auto It = m_PayloadPolicies.find(PayloadId); It != m_PayloadPolicies.end()) + if (Shared) { - return It->second; + if (std::span<const CacheValuePolicy> Values = Shared->GetValuePolicies(); !Values.empty()) + { + auto Iter = + std::lower_bound(Values.begin(), Values.end(), Id, [](const CacheValuePolicy& A, const Oid& B) { return A.Id < B; }); + if (Iter != Values.end() && Iter->Id == Id) + { + return Iter->Policy; + } + } } + return DefaultValuePolicy; +} - return m_DefaultPayloadPolicy; +void +CacheRecordPolicy::Save(CbWriter& Writer) const +{ + Writer.BeginObject(); + { + // The RecordPolicy is calculated from the ValuePolicies and does not need to be saved separately. + Writer << "DefaultValuePolicy"sv << WriteToString<128>(GetDefaultValuePolicy()); + if (!IsUniform()) + { + // FCacheRecordPolicyBuilder guarantees IsUniform -> non-empty GetValuePolicies. Small size penalty here if not. + Writer.BeginArray("ValuePolicies"sv); + { + for (const CacheValuePolicy& ValuePolicy : GetValuePolicies()) + { + // FCacheRecordPolicyBuilder is responsible for ensuring that each ValuePolicy != DefaultValuePolicy + // If it lets any duplicates through we will incur a small serialization size penalty here + Writer.BeginObject(); + Writer << "Id"sv << ValuePolicy.Id; + Writer << "Policy"sv << WriteToString<128>(ValuePolicy.Policy); + Writer.EndObject(); + } + } + Writer.EndArray(); + } + } + Writer.EndObject(); } -bool -CacheRecordPolicy::Load(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy) +CacheRecordPolicy +CacheRecordPolicy::Load(CbObjectView Object, CachePolicy DefaultPolicy) { - using namespace std::literals; + std::string_view PolicyText = Object["DefaultValuePolicy"sv].AsString(); + CachePolicy DefaultValuePolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - const uint32_t RecordPolicy = RecordPolicyObject["RecordPolicy"sv].AsUInt32(static_cast<uint32_t>(CachePolicy::Default)); - const uint32_t DefaultPayloadPolicy = - RecordPolicyObject["DefaultPayloadPolicy"sv].AsUInt32(static_cast<uint32_t>(CachePolicy::Default)); + CacheRecordPolicyBuilder Builder(DefaultValuePolicy); + for (CbFieldView ValueObjectField : Object["ValuePolicies"sv]) + { + CbObjectView ValueObject = ValueObjectField.AsObjectView(); + const Oid ValueId = ValueObject["Id"sv].AsObjectId(); + PolicyText = ValueObject["Policy"sv].AsString(); + CachePolicy ValuePolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultValuePolicy; + // FCacheRecordPolicyBuilder should guarantee that FValueId(ValueId).IsValid and ValuePolicy != DefaultValuePolicy + // If it lets any through we will have unused data in the record we create. + Builder.AddValuePolicy(ValueId, ValuePolicy); + } - OutRecordPolicy.m_RecordPolicy = static_cast<CachePolicy>(RecordPolicy); - OutRecordPolicy.m_DefaultPayloadPolicy = static_cast<CachePolicy>(DefaultPayloadPolicy); + return Builder.Build(); +} - for (CbFieldView PayloadPolicyView : RecordPolicyObject["PayloadPolicies"sv]) +CacheRecordPolicy +CacheRecordPolicy::ConvertToUpstream() const +{ + auto DownstreamToUpstream = [](CachePolicy P) { + // Remote|Local -> Set Remote + // Delete Skip Flags + // Maintain Remaining Flags + return (EnumHasAllFlags(P, CachePolicy::QueryRemote) ? CachePolicy::QueryLocal : CachePolicy::None) | + (EnumHasAllFlags(P, CachePolicy::StoreRemote) ? CachePolicy::StoreLocal : CachePolicy::None) | + (P & ~(CachePolicy::SkipData | CachePolicy::SkipMeta)); + }; + CacheRecordPolicyBuilder Builder(DownstreamToUpstream(GetDefaultValuePolicy())); + for (const CacheValuePolicy& ValuePolicy : GetValuePolicies()) { - CbObjectView PayloadPolicyObject = PayloadPolicyView.AsObjectView(); - const Oid PayloadId = PayloadPolicyObject["Id"sv].AsObjectId(); - const uint32_t PayloadPolicy = PayloadPolicyObject["Policy"sv].AsUInt32(); - - if (PayloadId != Oid::Zero && PayloadPolicy != 0) - { - OutRecordPolicy.m_PayloadPolicies.emplace(PayloadId, static_cast<CachePolicy>(PayloadPolicy)); - } + Builder.AddValuePolicy(ValuePolicy.Id, DownstreamToUpstream(ValuePolicy.Policy)); } - - return true; + return Builder.Build(); } void -CacheRecordPolicy::Save(const CacheRecordPolicy& Policy, CbWriter& Writer) +CacheRecordPolicyBuilder::AddValuePolicy(const CacheValuePolicy& Policy) { - Writer << "RecordPolicy"sv << static_cast<uint32_t>(Policy.GetRecordPolicy()); - Writer << "DefaultPayloadPolicy"sv << static_cast<uint32_t>(Policy.GetDefaultPayloadPolicy()); + if (!Shared) + { + Shared = new Private::CacheRecordPolicyShared; + } + Shared->AddValuePolicy(Policy); +} - if (!Policy.m_PayloadPolicies.empty()) +CacheRecordPolicy +CacheRecordPolicyBuilder::Build() +{ + CacheRecordPolicy Policy(BasePolicy); + if (Shared) { - Writer.BeginArray("PayloadPolicies"sv); - for (const auto& Kv : Policy.m_PayloadPolicies) + Shared->Build(); + const auto PolicyOr = [](CachePolicy A, CachePolicy B) { return A | (B & ~CachePolicy::SkipData); }; + const std::span<const CacheValuePolicy> Values = Shared->GetValuePolicies(); + Policy.RecordPolicy = BasePolicy; + for (const CacheValuePolicy& ValuePolicy : Values) { - Writer.BeginObject(); - Writer.AddObjectId("Id"sv, Kv.first); - Writer << "Policy"sv << static_cast<uint32_t>(Kv.second); - Writer.EndObject(); + Policy.RecordPolicy = PolicyOr(Policy.RecordPolicy, ValuePolicy.Policy); } - Writer.EndArray(); + Policy.Shared = std::move(Shared); } + return Policy; } } // namespace zen diff --git a/zenutil/include/zenutil/cache/cachekey.h b/zenutil/include/zenutil/cache/cachekey.h index fb36c7759..a0a83a883 100644 --- a/zenutil/include/zenutil/cache/cachekey.h +++ b/zenutil/include/zenutil/cache/cachekey.h @@ -44,7 +44,7 @@ struct CacheChunkRequest { CacheKey Key; IoHash ChunkId; - Oid PayloadId; + Oid ValueId; uint64_t RawOffset = 0ull; uint64_t RawSize = ~uint64_t(0); CachePolicy Policy = CachePolicy::Default; @@ -69,11 +69,11 @@ operator<(const CacheChunkRequest& A, const CacheChunkRequest& B) { return false; } - if (A.PayloadId < B.PayloadId) + if (A.ValueId < B.ValueId) { return true; } - if (B.PayloadId < A.PayloadId) + if (B.ValueId < A.ValueId) { return false; } diff --git a/zenutil/include/zenutil/cache/cachepolicy.h b/zenutil/include/zenutil/cache/cachepolicy.h index 5675ccf4d..b3602edbd 100644 --- a/zenutil/include/zenutil/cache/cachepolicy.h +++ b/zenutil/include/zenutil/cache/cachepolicy.h @@ -2,10 +2,14 @@ #pragma once +#include <zencore/compactbinary.h> +#include <zencore/enumflags.h> +#include <zencore/refcount.h> #include <zencore/string.h> #include <zencore/uid.h> #include <gsl/gsl-lite.hpp> +#include <span> #include <unordered_map> namespace zen { @@ -34,16 +38,21 @@ enum class CachePolicy : uint32_t /** Skip fetching the metadata for record requests. */ SkipMeta = 1 << 4, - /** Skip fetching the value for record, chunk, or value requests. */ - SkipValue = 1 << 5, - /** Skip fetching the attachments for record requests. */ - SkipAttachments = 1 << 6, + /** Skip fetching the data for values. */ + SkipData = 1 << 5, + /** - * Skip fetching the data for any requests. + * Partial output will be provided with the error status when a required value is missing. + * + * This is meant for cases when the missing values can be individually recovered, or rebuilt, + * without rebuilding the whole record. The cache automatically adds this flag when there are + * other cache stores that it may be able to recover missing values from. + * + * Missing values will be returned in the records or chunks, but with only the hash and size. * - * Put requests with skip flags may assume that record existence implies payload existence. + * Applying this flag for a put of a record allows a partial record to be stored. */ - SkipData = SkipMeta | SkipValue | SkipAttachments, + PartialRecord = 1 << 6, /** * Keep records in the cache for at least the duration of the session. @@ -53,18 +62,6 @@ enum class CachePolicy : uint32_t */ KeepAlive = 1 << 7, - /** - * Partial output will be provided with the error status when a required payload is missing. - * - * This is meant for cases when the missing payloads can be individually recovered or rebuilt - * without rebuilding the whole record. The cache automatically adds this flag when there are - * other cache stores that it may be able to recover missing payloads from. - * - * Requests for records would return records where the missing payloads have a hash and size, - * but no data. Requests for chunks or values would return the hash and size, but no data. - */ - PartialOnError = 1 << 8, - /** Allow cache requests to query and store records and values in local caches. */ Local = QueryLocal | StoreLocal, /** Allow cache requests to query and store records and values in remote caches. */ @@ -78,35 +75,107 @@ enum class CachePolicy : uint32_t }; gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy); +/** Serialize Policy to text and append to Builder. Appended text will not be empty. */ +StringBuilderBase& operator<<(StringBuilderBase& Builder, CachePolicy Policy); +/** Parse text written by operator<< back into an ECachePolicy. Text must not be empty. */ +CachePolicy ParseCachePolicy(std::string_view Text); -CachePolicy ParseQueryCachePolicy(std::string_view QueryPolicy, CachePolicy Default = CachePolicy::Query); - -CachePolicy ParseStoreCachePolicy(std::string_view StorePolicy, CachePolicy Default = CachePolicy::Store); - -CachePolicy ParseSkipCachePolicy(std::string_view SkipPolicy, CachePolicy Default = CachePolicy::None); +/** A value ID and the cache policy to use for that value. */ +struct CacheValuePolicy +{ + Oid Id; + CachePolicy Policy = CachePolicy::Default; +}; +namespace Private { + /** Interface for the private implementation of the cache record policy. */ + class ICacheRecordPolicyShared : public RefCounted + { + public: + virtual ~ICacheRecordPolicyShared() = default; + virtual std::span<const CacheValuePolicy> GetValuePolicies() const = 0; + virtual void AddValuePolicy(const CacheValuePolicy& Policy) = 0; + virtual void Build() = 0; + }; +} // namespace Private + +/** + * Flags to control the behavior of cache record requests, with optional overrides by value. + * + * Examples: + * - A base policy of Disable, with value policy overrides of Default, will fetch those values if + * they exist in the record, and skip data for any other values. + * - A base policy of Default, with value policy overrides of (Query | SkipData), will skip those + * values, but still check if they exist, and will load any other values. + */ class CacheRecordPolicy { public: + /** Construct a cache record policy that uses the default policy. */ CacheRecordPolicy() = default; - CacheRecordPolicy(const CachePolicy RecordPolicy, const CachePolicy DefaultPayloadPolicy = CachePolicy::Default); - CachePolicy GetRecordPolicy() const { return m_RecordPolicy; } - CachePolicy GetPayloadPolicy(const Oid& PayloadId) const; - CachePolicy GetDefaultPayloadPolicy() const { return m_DefaultPayloadPolicy; } + /** Construct a cache record policy with a uniform policy for the record and every value. */ + inline CacheRecordPolicy(CachePolicy Policy) : RecordPolicy(Policy), DefaultValuePolicy(Policy) {} + + /** Returns true if the record and every value use the same cache policy. */ + inline bool IsUniform() const { return !Shared && RecordPolicy == DefaultValuePolicy; } + + /** Returns the cache policy to use for the record. */ + inline CachePolicy GetRecordPolicy() const { return RecordPolicy; } - bool HasRecordPolicy(const CachePolicy Policy) const { return (m_RecordPolicy & Policy) == Policy; } - bool HasPayloadPolicy(const Oid& PayloadId, const CachePolicy Policy) const { return (GetPayloadPolicy(PayloadId) & Policy) == Policy; } + /** Returns the cache policy to use for the value. */ + CachePolicy GetValuePolicy(const Oid& Id) const; - static bool Load(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy); - static void Save(const CacheRecordPolicy& Policy, CbWriter& Writer); + /** Returns the cache policy to use for values with no override. */ + inline CachePolicy GetDefaultValuePolicy() const { return DefaultValuePolicy; } + + /** Returns the array of cache policy overrides for values, sorted by ID. */ + inline std::span<const CacheValuePolicy> GetValuePolicies() const + { + return Shared ? Shared->GetValuePolicies() : std::span<const CacheValuePolicy>(); + } + + /** Save the values from *this into the given writer. */ + void Save(CbWriter& Writer) const; + + /** + * Returns a policy loaded from values on Object. + * Invalid data will result in a uniform CacheRecordPolicy with defaultValuePolicy == DefaultPolicy. + */ + static CacheRecordPolicy Load(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default); + + /** Return *this converted into the equivalent policy that the upstream should use when forwarding a put or get to an upstream server. + */ + CacheRecordPolicy ConvertToUpstream() const; private: - using PayloadPolicyMap = std::unordered_map<Oid, CachePolicy, Oid::Hasher>; + friend class CacheRecordPolicyBuilder; + + CachePolicy RecordPolicy = CachePolicy::Default; + CachePolicy DefaultValuePolicy = CachePolicy::Default; + RefPtr<const Private::ICacheRecordPolicyShared> Shared; +}; + +/** A cache record policy builder is used to construct a cache record policy. */ +class CacheRecordPolicyBuilder +{ +public: + /** Construct a policy builder that uses the default policy as its base policy. */ + CacheRecordPolicyBuilder() = default; - CachePolicy m_RecordPolicy = CachePolicy::Default; - CachePolicy m_DefaultPayloadPolicy = CachePolicy::Default; - PayloadPolicyMap m_PayloadPolicies; + /** Construct a policy builder that uses the provided policy for the record and values with no override. */ + inline explicit CacheRecordPolicyBuilder(CachePolicy Policy) : BasePolicy(Policy) {} + + /** Adds a cache policy override for a value. */ + void AddValuePolicy(const CacheValuePolicy& Policy); + inline void AddValuePolicy(const Oid& Id, CachePolicy Policy) { AddValuePolicy({Id, Policy}); } + + /** Build a cache record policy, which makes this builder subsequently unusable. */ + CacheRecordPolicy Build(); + +private: + CachePolicy BasePolicy = CachePolicy::Default; + RefPtr<Private::ICacheRecordPolicyShared> Shared; }; } // namespace zen diff --git a/zenutil/include/zenutil/zenserverprocess.h b/zenutil/include/zenutil/zenserverprocess.h index 55b9a50cd..2a3146e2d 100644 --- a/zenutil/include/zenutil/zenserverprocess.h +++ b/zenutil/include/zenutil/zenserverprocess.h @@ -100,11 +100,12 @@ public: // additional state. For example, you can use the session ID // to introduce additional named objects std::atomic<uint32_t> Pid; - std::atomic<uint16_t> ListenPort; + std::atomic<uint16_t> DesiredListenPort; std::atomic<uint16_t> Flags; uint8_t SessionId[12]; std::atomic<uint32_t> SponsorPids[8]; - uint8_t Padding[12]; + std::atomic<uint16_t> EffectiveListenPort; + uint8_t Padding[10]; enum class FlagsEnum : uint16_t { @@ -125,8 +126,8 @@ public: void Initialize(); [[nodiscard]] bool InitializeReadOnly(); - [[nodiscard]] ZenServerEntry* Lookup(int ListenPort); - ZenServerEntry* Register(int ListenPort); + [[nodiscard]] ZenServerEntry* Lookup(int DesiredListenPort); + ZenServerEntry* Register(int DesiredListenPort); void Sweep(); void Snapshot(std::function<void(const ZenServerEntry&)>&& Callback); inline bool IsReadOnly() const { return m_IsReadOnly; } diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp index fe6236d18..5bddc72bc 100644 --- a/zenutil/zenserverprocess.cpp +++ b/zenutil/zenserverprocess.cpp @@ -230,11 +230,11 @@ ZenServerState::InitializeReadOnly() } ZenServerState::ZenServerEntry* -ZenServerState::Lookup(int ListenPort) +ZenServerState::Lookup(int DesiredListenPort) { for (int i = 0; i < m_MaxEntryCount; ++i) { - if (m_Data[i].ListenPort == ListenPort) + if (m_Data[i].DesiredListenPort == DesiredListenPort) { return &m_Data[i]; } @@ -244,7 +244,7 @@ ZenServerState::Lookup(int ListenPort) } ZenServerState::ZenServerEntry* -ZenServerState::Register(int ListenPort) +ZenServerState::Register(int DesiredListenPort) { if (m_Data == nullptr) { @@ -259,17 +259,18 @@ ZenServerState::Register(int ListenPort) { ZenServerEntry& Entry = m_Data[i]; - if (Entry.ListenPort.load(std::memory_order_relaxed) == 0) + if (Entry.DesiredListenPort.load(std::memory_order_relaxed) == 0) { uint16_t Expected = 0; - if (Entry.ListenPort.compare_exchange_strong(Expected, uint16_t(ListenPort))) + if (Entry.DesiredListenPort.compare_exchange_strong(Expected, uint16_t(DesiredListenPort))) { // Successfully allocated entry m_OurEntry = &Entry; - Entry.Pid = Pid; - Entry.Flags = 0; + Entry.Pid = Pid; + Entry.EffectiveListenPort = 0; + Entry.Flags = 0; const Oid SesId = GetSessionId(); memcpy(Entry.SessionId, &SesId, sizeof SesId); @@ -296,11 +297,11 @@ ZenServerState::Sweep() { ZenServerEntry& Entry = m_Data[i]; - if (Entry.ListenPort) + if (Entry.DesiredListenPort) { if (IsProcessRunning(Entry.Pid) == false) { - ZEN_DEBUG("Sweep - pid {} not running, reclaiming entry (port {})", Entry.Pid, Entry.ListenPort); + ZEN_DEBUG("Sweep - pid {} not running, reclaiming entry (port {})", Entry.Pid, Entry.DesiredListenPort); Entry.Reset(); } @@ -320,7 +321,7 @@ ZenServerState::Snapshot(std::function<void(const ZenServerEntry&)>&& Callback) { ZenServerEntry& Entry = m_Data[i]; - if (Entry.ListenPort) + if (Entry.DesiredListenPort) { Callback(Entry); } @@ -330,9 +331,10 @@ ZenServerState::Snapshot(std::function<void(const ZenServerEntry&)>&& Callback) void ZenServerState::ZenServerEntry::Reset() { - Pid = 0; - ListenPort = 0; - Flags = 0; + Pid = 0; + DesiredListenPort = 0; + Flags = 0; + EffectiveListenPort = 0; } void |