aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--prepare_commit.bat2
-rw-r--r--scripts/deploybuild.py15
-rw-r--r--scripts/formatcode.py84
-rw-r--r--scripts/generateprojects.bat1
-rw-r--r--scripts/upload_syms.bat3
-rw-r--r--thirdparty/trace/trace.h343
-rw-r--r--vs-chromium-project.txt2
-rw-r--r--zen/cmds/top.cpp4
-rw-r--r--zencore/include/zencore/refcount.h18
-rw-r--r--zencore/include/zencore/string.h22
-rw-r--r--zenhttp/httpasio.cpp22
-rw-r--r--zenhttp/httpasio.h2
-rw-r--r--zenhttp/httpnull.cpp4
-rw-r--r--zenhttp/httpnull.h2
-rw-r--r--zenhttp/httpsys.cpp55
-rw-r--r--zenhttp/httpsys.h4
-rw-r--r--zenhttp/include/zenhttp/httpserver.h2
-rw-r--r--zenserver-test/cachepolicy-tests.cpp164
-rw-r--r--zenserver-test/zenserver-test.cpp289
-rw-r--r--zenserver/cache/structuredcache.cpp683
-rw-r--r--zenserver/cache/structuredcache.h27
-rw-r--r--zenserver/cache/structuredcachestore.cpp4
-rw-r--r--zenserver/cache/structuredcachestore.h14
-rw-r--r--zenserver/config.cpp25
-rw-r--r--zenserver/config.h2
-rw-r--r--zenserver/upstream/upstreamcache.cpp156
-rw-r--r--zenserver/upstream/upstreamcache.h24
-rw-r--r--zenserver/upstream/zen.cpp8
-rw-r--r--zenserver/upstream/zen.h4
-rw-r--r--zenserver/zenserver.cpp63
-rw-r--r--zenstore/gc.cpp5
-rw-r--r--zenutil/cache/cachepolicy.cpp299
-rw-r--r--zenutil/include/zenutil/cache/cachekey.h6
-rw-r--r--zenutil/include/zenutil/cache/cachepolicy.h141
-rw-r--r--zenutil/include/zenutil/zenserverprocess.h9
-rw-r--r--zenutil/zenserverprocess.cpp28
36 files changed, 1701 insertions, 835 deletions
diff --git a/prepare_commit.bat b/prepare_commit.bat
index 97bcea6ed..8e621c96c 100644
--- a/prepare_commit.bat
+++ b/prepare_commit.bat
@@ -1 +1 @@
-python %~dp0\\scripts\formatcode.py
+python %~dp0scripts\formatcode.py %*
diff --git a/scripts/deploybuild.py b/scripts/deploybuild.py
index 4327d14f7..37518fd0c 100644
--- a/scripts/deploybuild.py
+++ b/scripts/deploybuild.py
@@ -30,12 +30,10 @@ origcwd = os.getcwd()
parser = argparse.ArgumentParser(description='Deploy a zen build to an UE tree')
parser.add_argument("root", help="Path to an UE5 root directory")
parser.add_argument("--sentry", action="store_true", help="Whether to upload symobls to Sentry")
-parser.add_argument("--xmake", action="store_true", help="Build with XMake")
args = parser.parse_args()
engineroot = args.root
upload_symbols = args.sentry
-use_xmake = args.xmake
if not os.path.isfile(os.path.join(engineroot, "RunUAT.bat")):
print(f"{Fore.RED}Not a valid UE5 engine root directory: '{engineroot}'")
@@ -53,16 +51,9 @@ jazz_print("Zen root:", zenroot)
# Build fresh binaries
try:
- if use_xmake:
- subprocess.run(["xmake.exe", "config", "-p", "windows", "-a", "x64", "-m", "release"], check=True)
- build_cmd = ["xmake.exe", "build", "--rebuild", "zenserver"]
- build_output_dir = r'build\windows\x64\release'
- else:
- vs_path = vswhere.get_latest_path() # can also specify prerelease=True
- jazz_print("BUILDING CODE", f"using VS root: {vs_path}")
- devenv_path = os.path.join(vs_path, "Common7\\IDE\\devenv.com")
- build_cmd = [devenv_path, "/build", "Release", "vsxmake2019\\zen.sln"]
- build_output_dir = r'x64\Release'
+ subprocess.run(["xmake.exe", "config", "-p", "windows", "-a", "x64", "-m", "release"], check=True)
+ build_cmd = ["xmake.exe", "build", "--rebuild", "zenserver"]
+ build_output_dir = r'build\windows\x64\release'
subprocess.run(build_cmd, check=True)
except:
diff --git a/scripts/formatcode.py b/scripts/formatcode.py
index 1a214380d..423d2b4e7 100644
--- a/scripts/formatcode.py
+++ b/scripts/formatcode.py
@@ -1,5 +1,12 @@
+import argparse
import os
import fileinput
+import pathlib
+import re
+
+match_expressions = []
+valid_extensions = []
+root_dir = ''
def is_header_missing(f):
with open(f) as reader:
@@ -20,9 +27,11 @@ def scan_tree(root):
for entry in dirs:
if entry.is_dir():
scan_tree(os.path.join(root, entry.name))
- elif entry.name.endswith(".cpp") or entry.name.endswith(".h"):
- print("... formatting: %s"%(entry.name))
- full_path = os.path.join(root, entry.name)
+ continue
+ full_path = os.path.join(root, entry.name)
+ relative_root_path = os.path.relpath(full_path, start=root_dir)
+ if is_matching_filename(relative_root_path):
+ print("... formatting: {}".format(relative_root_path))
files.append(full_path)
if is_header_missing(full_path):
header_files.append(full_path)
@@ -38,12 +47,65 @@ def scan_zen(root):
if entry.is_dir() and entry.name.startswith("zen"):
scan_tree(os.path.join(root, entry.name))
-while True:
- if (os.path.isfile(".clang-format")):
- scan_zen(".")
- quit()
- else:
- cwd = os.getcwd()
- if os.path.dirname(cwd) == cwd:
+def is_matching_filename(relative_root_path):
+ global match_expressions
+ global root_dir
+ global valid_extensions
+
+ if os.path.splitext(relative_root_path)[1].lower() not in valid_extensions:
+ return False
+ if not match_expressions:
+ return True
+ relative_root_path = relative_root_path.replace('\\', '/')
+ for regex in match_expressions:
+ if regex.fullmatch(relative_root_path):
+ return True
+ return False
+
+def parse_match_expressions(wildcards, matches):
+ global match_expressions
+ global valid_extensions
+
+ valid_extensions = ['.cpp', '.h']
+
+ for wildcard in wildcards:
+ regex = wildcard.replace('*', '%FORMAT_STAR%').replace('\\', '/')
+ regex = re.escape(regex)
+ regex = '.*' + regex.replace('%FORMAT_STAR%', '.*') + '.*'
+ try:
+ match_expressions.append(re.compile(regex, re.IGNORECASE))
+ except Exception as ex:
+ print('Could not parse input filename expression \'{}\': {}'.format(wildcard, str(ex)))
+ quit()
+ for regex in matches:
+ try:
+ match_expressions.append(re.compile(regex, re.IGNORECASE))
+ except Exception as ex:
+ print('Could not parse input --match expression \'{}\': {}'.format(regex, str(ex)))
quit()
- os.chdir("..")
+
+def _main():
+ global root_dir
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('filenames', nargs='*', help="Match text for filenames. If fullpath contains text it is a match, " +\
+ "* is a wildcard. Directory separators are matched by either / or \\. Case insensitive.")
+ parser.add_argument('--match', action='append', default=[], help="Match regular expression for filenames. " +\
+ "Relative path from the root zen directory must be a complete match. Directory separators are matched only by /. Case insensitive.")
+ options = parser.parse_args()
+ parse_match_expressions(options.filenames, options.match)
+ root_dir = pathlib.Path(__file__).parent.parent.resolve()
+
+ while True:
+ if (os.path.isfile(".clang-format")):
+ scan_zen(".")
+ quit()
+ else:
+ cwd = os.getcwd()
+ if os.path.dirname(cwd) == cwd:
+ quit()
+ os.chdir("..")
+
+
+if __name__ == '__main__':
+ _main() \ No newline at end of file
diff --git a/scripts/generateprojects.bat b/scripts/generateprojects.bat
new file mode 100644
index 000000000..cc6732aaa
--- /dev/null
+++ b/scripts/generateprojects.bat
@@ -0,0 +1 @@
+xmake project -k vsxmake2022 -y
diff --git a/scripts/upload_syms.bat b/scripts/upload_syms.bat
new file mode 100644
index 000000000..663d64e06
--- /dev/null
+++ b/scripts/upload_syms.bat
@@ -0,0 +1,3 @@
+rem This is a temporary hack until everyone has access to Sentry
+
+scripts\sentry-cli upload-dif --org to --project zen-server \ue5-main\Engine\Binaries\Win64\zenserver.exe \ue5-main\engine\Binaries\Win64\zenserver.pdb
diff --git a/thirdparty/trace/trace.h b/thirdparty/trace/trace.h
index fb538b3af..caa862ffe 100644
--- a/thirdparty/trace/trace.h
+++ b/thirdparty/trace/trace.h
@@ -347,6 +347,7 @@ struct FInitializeDesc
};
typedef void* AllocFunc(SIZE_T, uint32);
typedef void FreeFunc(void*, SIZE_T);
+typedef void ChannelIterFunc(const ANSICHAR*, bool, void*);
struct FStatistics
{
uint64 BytesSent;
@@ -357,6 +358,7 @@ struct FStatistics
};
UE_TRACE_API void SetMemoryHooks(AllocFunc Alloc, FreeFunc Free) UE_TRACE_IMPL();
UE_TRACE_API void Initialize(const FInitializeDesc& Desc) UE_TRACE_IMPL();
+UE_TRACE_API void StartWorkerThread() UE_TRACE_IMPL();
UE_TRACE_API void Shutdown() UE_TRACE_IMPL();
UE_TRACE_API void Update() UE_TRACE_IMPL();
UE_TRACE_API void GetStatistics(FStatistics& Out) UE_TRACE_IMPL();
@@ -366,6 +368,7 @@ UE_TRACE_API bool IsTracing() UE_TRACE_IMPL(false);
UE_TRACE_API bool Stop() UE_TRACE_IMPL(false);
UE_TRACE_API bool IsChannel(const TCHAR* ChanneName) UE_TRACE_IMPL(false);
UE_TRACE_API bool ToggleChannel(const TCHAR* ChannelName, bool bEnabled) UE_TRACE_IMPL(false);
+UE_TRACE_API void EnumerateChannels(ChannelIterFunc IterFunc, void* User);
UE_TRACE_API void ThreadRegister(const TCHAR* Name, uint32 SystemId, int32 SortHint) UE_TRACE_IMPL();
UE_TRACE_API void ThreadGroupBegin(const TCHAR* Name) UE_TRACE_IMPL();
UE_TRACE_API void ThreadGroupEnd() UE_TRACE_IMPL();
@@ -388,6 +391,7 @@ UE_TRACE_API void ThreadGroupEnd() UE_TRACE_IMPL();
#if UE_TRACE_ENABLED
namespace UE {
namespace Trace {
+typedef void ChannelIterFunc(const ANSICHAR*, bool, void*);
/*
A named channel which can be used to filter trace events. Channels can be
combined using the '|' operator which allows expressions like
@@ -421,6 +425,7 @@ public:
static bool Toggle(const ANSICHAR* ChannelName, bool bEnabled);
static void ToggleAll(bool bEnabled);
static FChannel* FindChannel(const ANSICHAR* ChannelName);
+ static void EnumerateChannels(ChannelIterFunc Func, void* User);
bool Toggle(bool bEnabled);
bool IsEnabled() const;
explicit operator bool () const;
@@ -2043,7 +2048,7 @@ FChannel* FChannel::FindChannel(const ANSICHAR* ChannelName)
};
for (FChannel* Channel : ChannelLists)
{
- for (; Channel != nullptr; Channel = (FChannel*)(Channel->Next))
+ for (; Channel != nullptr; Channel = Channel->Next)
{
if (Channel->Name.Hash == ChannelNameHash)
{
@@ -2053,10 +2058,26 @@ FChannel* FChannel::FindChannel(const ANSICHAR* ChannelName)
}
return nullptr;
}
+void FChannel::EnumerateChannels(ChannelIterFunc Func, void* User)
+{
+ using namespace Private;
+ FChannel* ChannelLists[] =
+ {
+ AtomicLoadAcquire(&GNewChannelList),
+ AtomicLoadAcquire(&GHeadChannel),
+ };
+ for (FChannel* Channel : ChannelLists)
+ {
+ for (; Channel != nullptr; Channel = Channel->Next)
+ {
+ Func(Channel->Name.Ptr, Channel->IsEnabled(), User);
+ }
+ }
+}
bool FChannel::Toggle(bool bEnabled)
{
using namespace Private;
- AtomicAddRelaxed(&Enabled, bEnabled ? 1 : -1);
+ AtomicStoreRelaxed(&Enabled, bEnabled ? 1 : -1);
UE_TRACE_LOG(Trace, ChannelToggle, TraceLogChannel)
<< ChannelToggle.Id(Name.Hash)
<< ChannelToggle.IsEnabled(IsEnabled());
@@ -3055,6 +3076,7 @@ namespace Private
{
void Writer_MemorySetHooks(AllocFunc, FreeFunc);
void Writer_Initialize(const FInitializeDesc&);
+void Writer_WorkerCreate();
void Writer_Shutdown();
void Writer_Update();
bool Writer_SendTo(const ANSICHAR*, uint32);
@@ -3132,6 +3154,14 @@ bool ToggleChannel(const TCHAR* ChannelName, bool bEnabled)
ToAnsiCheap(ChannelNameA, ChannelName);
return FChannel::Toggle(ChannelNameA, bEnabled);
}
+void EnumerateChannels(ChannelIterFunc IterFunc, void* User)
+{
+ FChannel::EnumerateChannels(IterFunc, User);
+}
+void StartWorkerThread()
+{
+ Private::Writer_WorkerCreate();
+}
UE_TRACE_CHANNEL_EXTERN(TraceLogChannel)
UE_TRACE_EVENT_BEGIN($Trace, ThreadInfo, NoSync|Important)
UE_TRACE_EVENT_FIELD(uint32, ThreadId)
@@ -3533,14 +3563,18 @@ static bool Writer_UpdateConnection()
Writer_DescribeEvents();
Writer_CacheOnConnect();
Writer_TailOnConnect();
- Writer_SendSync();
GSyncPacketCountdown = GNumSyncPackets;
return true;
}
static UPTRINT GWorkerThread; // = 0;
static volatile bool GWorkerThreadQuit; // = false;
+static volatile unsigned int GUpdateInProgress = 1; // Don't allow updates until initalized
static void Writer_WorkerUpdate()
{
+ if (!AtomicCompareExchangeAcquire(&GUpdateInProgress, 1u, 0u))
+ {
+ return;
+ }
Writer_UpdateControl();
Writer_UpdateConnection();
Writer_DescribeAnnounce();
@@ -3554,6 +3588,7 @@ static void Writer_WorkerUpdate()
Writer_FlushSendBuffer();
}
#endif
+ AtomicExchangeRelease(&GUpdateInProgress, 0u);
}
static void Writer_WorkerThread()
{
@@ -3565,7 +3600,7 @@ static void Writer_WorkerThread()
ThreadSleep(SleepMs);
}
}
-static void Writer_WorkerCreate()
+void Writer_WorkerCreate()
{
if (GWorkerThread)
{
@@ -3664,6 +3699,7 @@ void Writer_Initialize(const FInitializeDesc& Desc)
{
Writer_WorkerCreate();
}
+ GUpdateInProgress = 0;
}
void Writer_Shutdown()
{
@@ -3709,7 +3745,7 @@ bool Writer_WriteTo(const ANSICHAR* Path)
}
bool Writer_IsTracing()
{
- return (GDataHandle != 0);
+ return GDataHandle != 0 || GPendingDataHandle != 0;
}
bool Writer_Stop()
{
@@ -4359,7 +4395,13 @@ UPTRINT TcpSocketConnect(const ANSICHAR* Host, uint16 Port)
{
struct FAddrInfoPtr
{
- ~FAddrInfoPtr() { freeaddrinfo(Value); }
+ ~FAddrInfoPtr()
+ {
+ if (Value != nullptr)
+ {
+ freeaddrinfo(Value);
+ }
+ }
addrinfo* operator -> () { return Value; }
addrinfo** operator & () { return &Value; }
addrinfo* Value;
@@ -4371,6 +4413,7 @@ UPTRINT TcpSocketConnect(const ANSICHAR* Host, uint16 Port)
Hints.ai_protocol = IPPROTO_TCP;
if (getaddrinfo(Host, nullptr, &Hints, &Info))
{
+ Info.Value = nullptr;
return 0;
}
if (&Info == nullptr)
@@ -4962,6 +5005,8 @@ void Writer_ShutdownSharedBuffers()
// Copyright Epic Games, Inc. All Rights Reserved.
+// {{{1 amalgamation-tail //////////////////////////////////////////////////////
+
#if PLATFORM_WINDOWS
# pragma warning(pop)
#endif
@@ -4978,6 +5023,10 @@ void Writer_ShutdownSharedBuffers()
#endif // TRACE_UE_COMPAT_LAYER
+
+
+// {{{1 library-setup //////////////////////////////////////////////////////////
+
#include <string_view>
#define TRACE_EVENT_DEFINE UE_TRACE_EVENT_DEFINE
@@ -4998,15 +5047,108 @@ namespace trace = UE::Trace;
#define TRACE_PRIVATE_CONCAT(x, y) TRACE_PRIVATE_CONCAT_(x, y)
#define TRACE_PRIVATE_UNIQUE_VAR(name) TRACE_PRIVATE_CONCAT($trace_##name, __LINE__)
+
+
+// {{{1 session-header /////////////////////////////////////////////////////////
+
+namespace UE {
+namespace Trace {
+
+enum class Build
+{
+ Unknown,
+ Debug,
+ DebugGame,
+ Development,
+ Shipping,
+ Test
+};
+
+void DescribeSession(
+ const std::string_view& AppName,
+ Build Variant=Build::Unknown,
+ const std::string_view& CommandLine="",
+ const std::string_view& BuildVersion="unknown_ver");
+
+} // namespace Trace
+} // namespace UE
+
+// {{{1 session-source /////////////////////////////////////////////////////////
+
+#if TRACE_IMPLEMENT
+
+namespace UE {
+namespace Trace {
+namespace Private {
+
+TRACE_EVENT_BEGIN(Diagnostics, Session2, NoSync|Important)
+ TRACE_EVENT_FIELD(uint8, ConfigurationType)
+ TRACE_EVENT_FIELD(trace::AnsiString, AppName)
+ TRACE_EVENT_FIELD(trace::AnsiString, BuildVersion)
+ TRACE_EVENT_FIELD(trace::AnsiString, Platform)
+ TRACE_EVENT_FIELD(trace::AnsiString, CommandLine)
+TRACE_EVENT_END()
+
+} // namespace Private
+
+////////////////////////////////////////////////////////////////////////////////
+void DescribeSession(
+ const std::string_view& AppName,
+ Build Variant,
+ const std::string_view& CommandLine,
+ const std::string_view& BuildVersion)
+{
+ using namespace Private;
+ using namespace std::literals;
+
+ std::string_view Platform;
+#if PLATFORM_WINDOWS
+ Platform = "Windows"sv;
+#elif PLATFORM_UNIX
+ Platform = "Linux"sv;
+#elif PLATFORM_MAC
+ Platform = "Mac"sv;
+#else
+ Platform = "Unknown"sv;
+#endif
+
+ int DataSize = 0;
+ DataSize += AppName.size();
+ DataSize += BuildVersion.size();
+ DataSize += Platform.size();
+ DataSize += CommandLine.size();
+
+ TRACE_LOG(Diagnostics, Session2, true, DataSize)
+ << Session2.AppName(AppName.data(), int(AppName.size()))
+ << Session2.BuildVersion(BuildVersion.data(), int(BuildVersion.size()))
+ << Session2.Platform(Platform.data(), int(Platform.size()))
+ << Session2.CommandLine(CommandLine.data(), int(CommandLine.size()))
+ << Session2.ConfigurationType(uint8(Variant));
+}
+
+} // namespace Trace
+} // namespace UE
+
+#endif // TRACE_IMPLEMENT
+
+
+
+// {{{1 cpu-header /////////////////////////////////////////////////////////////
+
TRACE_CHANNEL_EXTERN(CpuChannel)
namespace UE {
namespace Trace {
+enum CpuScopeFlags : int
+{
+ CpuFlush = 1 << 0,
+};
+
struct TraceCpuScope
{
~TraceCpuScope();
- void Enter(int ScopeId);
+ void Enter(int ScopeId, int Flags=0);
int _ScopeId = 0;
};
@@ -5015,54 +5157,44 @@ int ScopeNew(const std::string_view& Name);
} // namespace Trace
} // namespace UE
-#define TRACE_CPU_SCOPE(name) \
- using namespace std::literals; \
+#define TRACE_CPU_SCOPE(name, ...) \
trace::TraceCpuScope TRACE_PRIVATE_UNIQUE_VAR(cpu_scope); \
if (CpuChannel) { \
+ using namespace std::literals; \
static int TRACE_PRIVATE_UNIQUE_VAR(scope_id); \
if (0 == TRACE_PRIVATE_UNIQUE_VAR(scope_id)) \
TRACE_PRIVATE_UNIQUE_VAR(scope_id) = trace::ScopeNew(name##sv); \
- TRACE_PRIVATE_UNIQUE_VAR(cpu_scope).Enter(TRACE_PRIVATE_UNIQUE_VAR(scope_id)); \
+ TRACE_PRIVATE_UNIQUE_VAR(cpu_scope).Enter(TRACE_PRIVATE_UNIQUE_VAR(scope_id), ##__VA_ARGS__); \
} \
do {} while (0)
+// {{{1 cpu-source /////////////////////////////////////////////////////////////
+
#if TRACE_IMPLEMENT
-////////////////////////////////////////////////////////////////////////////////
TRACE_CHANNEL_DEFINE(CpuChannel)
+namespace UE {
+namespace Trace {
+namespace Private {
+
TRACE_EVENT_BEGIN(CpuProfiler, EventSpec, NoSync|Important)
TRACE_EVENT_FIELD(uint32, Id)
- TRACE_EVENT_FIELD(UE::Trace::AnsiString, Name)
+ TRACE_EVENT_FIELD(trace::AnsiString, Name)
TRACE_EVENT_END()
TRACE_EVENT_BEGIN(CpuProfiler, EventBatch, NoSync)
TRACE_EVENT_FIELD(uint8[], Data)
TRACE_EVENT_END()
-namespace UE {
-namespace Trace {
-namespace Private {
-
+////////////////////////////////////////////////////////////////////////////////
static int32_t encode32_7bit(int32_t value, void* __restrict out)
{
// Calculate the number of bytes
-#if 0
- int32_t msb_test = (value << sizeof(value)) | 0x10;
-#if _MSC_VER
- unsigned long bit_index;
- _BitScanReverse(&bit_index, msb_test);
-#else
- int32_t leading_zeros = __builtin_clz(msb_test);
- int32_t bit_index = ((sizeof(value) * 8) - 1) - leading_zeros;
-#endif
- int32_t length = (bit_index + 3) / 7;
-#else
int32_t length = 1;
length += (value >= (1 << 7));
length += (value >= (1 << 14));
length += (value >= (1 << 21));
-#endif
// Add a gap every eigth bit for the continuations
int32_t ret = value;
@@ -5079,20 +5211,10 @@ static int32_t encode32_7bit(int32_t value, void* __restrict out)
return length;
}
+////////////////////////////////////////////////////////////////////////////////
static int32_t encode64_7bit(int64_t value, void* __restrict out)
{
// Calculate the output length
-#if 0
- int64_t msb_test = (value << sizeof(value)) | 0x100ull;
-#if _MSC_VER
- unsigned long bit_index;
- _BitScanReverse64(&bit_index, msb_test);
-#else
- int32_t leading_zeros = __builtin_clzll(msb_test);
- int32_t bit_index = ((sizeof(value) * 8) - 1) - leading_zeros;
-#endif
- int32_t length = (bit_index - 1) / 7;
-#else
uint32_t length = 1;
length += (value >= (1ll << 7));
length += (value >= (1ll << 14));
@@ -5101,7 +5223,6 @@ static int32_t encode64_7bit(int64_t value, void* __restrict out)
length += (value >= (1ll << 35));
length += (value >= (1ll << 42));
length += (value >= (1ll << 49));
-#endif
// Add a gap every eigth bit for the continuations
int64_t ret = value;
@@ -5123,13 +5244,13 @@ static int32_t encode64_7bit(int64_t value, void* __restrict out)
class ThreadBuffer
{
public:
- static void Enter(uint64_t Timestamp, uint32_t ScopeId) { TlsInstance.EnterImpl(Timestamp, ScopeId); }
- static void Leave(uint64_t Timestamp) { TlsInstance.LeaveImpl(Timestamp); }
+ static void Enter(uint64_t Timestamp, uint32_t ScopeId, int Flags);
+ static void Leave(uint64_t Timestamp);
private:
~ThreadBuffer();
void Flush(bool Force);
- void EnterImpl(uint64_t Timestamp, uint32_t ScopeId);
+ void EnterImpl(uint64_t Timestamp, uint32_t ScopeId, int Flags);
void LeaveImpl(uint64_t Timestamp);
enum
{
@@ -5148,6 +5269,18 @@ private:
thread_local ThreadBuffer ThreadBuffer::TlsInstance;
////////////////////////////////////////////////////////////////////////////////
+inline void ThreadBuffer::Enter(uint64_t Timestamp, uint32_t ScopeId, int Flags)
+{
+ TlsInstance.EnterImpl(Timestamp, ScopeId, Flags);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+inline void ThreadBuffer::Leave(uint64_t Timestamp)
+{
+ TlsInstance.LeaveImpl(Timestamp);
+}
+
+////////////////////////////////////////////////////////////////////////////////
ThreadBuffer::~ThreadBuffer()
{
Flush(true);
@@ -5156,6 +5289,8 @@ ThreadBuffer::~ThreadBuffer()
////////////////////////////////////////////////////////////////////////////////
void ThreadBuffer::Flush(bool Force)
{
+ using namespace Private;
+
if (!Force && (Cursor <= (Buffer + BufferSize - Overflow)))
return;
@@ -5167,14 +5302,15 @@ void ThreadBuffer::Flush(bool Force)
}
////////////////////////////////////////////////////////////////////////////////
-void ThreadBuffer::EnterImpl(uint64_t Timestamp, uint32_t ScopeId)
+void ThreadBuffer::EnterImpl(uint64_t Timestamp, uint32_t ScopeId, int Flags)
{
Timestamp -= PrevTimestamp;
PrevTimestamp += Timestamp;
Cursor += encode64_7bit((Timestamp) << 1 | EnterLsb, Cursor);
Cursor += encode32_7bit(ScopeId, Cursor);
- Flush(false);
+ bool ShouldFlush = (Flags & CpuScopeFlags::CpuFlush);
+ Flush(ShouldFlush);
}
////////////////////////////////////////////////////////////////////////////////
@@ -5194,8 +5330,10 @@ void ThreadBuffer::LeaveImpl(uint64_t Timestamp)
////////////////////////////////////////////////////////////////////////////////
int ScopeNew(const std::string_view& Name)
{
+ using namespace Private;
+
static int volatile NextSpecId = 1;
- int SpecId = Private::AtomicAddRelaxed(&NextSpecId, 1);
+ int SpecId = AtomicAddRelaxed(&NextSpecId, 1);
uint32 NameSize = uint32(Name.size());
TRACE_LOG(CpuProfiler, EventSpec, true, NameSize)
@@ -5220,15 +5358,124 @@ TraceCpuScope::~TraceCpuScope()
}
////////////////////////////////////////////////////////////////////////////////
-void TraceCpuScope::Enter(int ScopeId)
+void TraceCpuScope::Enter(int ScopeId, int Flags)
{
using namespace Private;
_ScopeId = ScopeId;
uint64 Timestamp = TimeGetTimestamp();
- ThreadBuffer::Enter(Timestamp, ScopeId);
+ ThreadBuffer::Enter(Timestamp, ScopeId, Flags);
+}
+
+} // namespace Trace
+} // namespace UE
+
+#endif // TRACE_IMPLEMENT
+
+
+
+// {{{1 log-header /////////////////////////////////////////////////////////////
+
+TRACE_CHANNEL_EXTERN(LogChannel)
+
+namespace UE {
+namespace Trace {
+namespace Private {
+
+void LogMessageImpl(int Id, const void* ParamBuffer, int ParamSize);
+int LogMessageNew(const std::string_view& Format, const std::string_view& File, int Line);
+
+template <typename... ARGS>
+void LogMessage(int Id, ARGS&&... Args)
+{
+ LogMessageImpl(Id, nullptr, 0);
+}
+
+} // namespace Private
+} // namespace Trace
+} // namespace UE
+
+#define TRACE_LOG_MESSAGE(format, ...) \
+ if (LogChannel) { \
+ using namespace std::literals; \
+ static int message_id; \
+ if (message_id == 0) \
+ message_id = trace::Private::LogMessageNew( \
+ format##sv, \
+ TRACE_PRIVATE_CONCAT(__FILE__, sv), \
+ __LINE__); \
+ trace::Private::LogMessage(message_id, ##__VA_ARGS__); \
+ } \
+ do {} while (0)
+
+// {{{1 log-source /////////////////////////////////////////////////////////////
+
+#if TRACE_IMPLEMENT
+
+TRACE_CHANNEL_DEFINE(LogChannel)
+
+namespace UE {
+namespace Trace {
+namespace Private {
+
+#if 0
+TRACE_EVENT_BEGIN(Logging, LogCategory, NoSync|Important)
+ TRACE_EVENT_FIELD(const void*, CategoryPointer)
+ TRACE_EVENT_FIELD(uint8, DefaultVerbosity)
+ TRACE_EVENT_FIELD(UE::Trace::AnsiString, Name)
+TRACE_EVENT_END()
+#endif
+
+TRACE_EVENT_BEGIN(Logging, LogMessageSpec, NoSync|Important)
+ TRACE_EVENT_FIELD(uint32, LogPoint)
+ //TRACE_EVENT_FIELD(uint16, CategoryPointer)
+ TRACE_EVENT_FIELD(uint16, Line)
+ TRACE_EVENT_FIELD(UE::Trace::AnsiString, FileName)
+ TRACE_EVENT_FIELD(UE::Trace::AnsiString, FormatString)
+TRACE_EVENT_END()
+
+TRACE_EVENT_BEGIN(Logging, LogMessage, NoSync)
+ TRACE_EVENT_FIELD(uint32, LogPoint)
+ TRACE_EVENT_FIELD(uint64, Cycle)
+ TRACE_EVENT_FIELD(uint8[], FormatArgs)
+TRACE_EVENT_END()
+
+////////////////////////////////////////////////////////////////////////////////
+void LogMessageImpl(int Id, const void* ParamBuffer, int ParamSize)
+{
+ (void)ParamBuffer;
+ (void)ParamSize;
+
+ uint64 Timestamp = TimeGetTimestamp();
+
+ TRACE_LOG(Logging, LogMessage, true)
+ << LogMessage.LogPoint(Id)
+ << LogMessage.Cycle(Timestamp);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int LogMessageNew(
+ const std::string_view& Format,
+ const std::string_view& File,
+ int Line)
+{
+ static int volatile NextId = 1;
+ int Id = AtomicAddRelaxed(&NextId, 1);
+
+ int DataSize = 0;
+ DataSize += Format.size();
+ DataSize += File.size();
+
+ TRACE_LOG(Logging, LogMessageSpec, true, DataSize)
+ << LogMessageSpec.LogPoint(Id)
+ << LogMessageSpec.Line(uint16(Line))
+ << LogMessageSpec.FileName(File.data(), int(File.size()))
+ << LogMessageSpec.FormatString(Format.data(), int(Format.size()));
+
+ return Id;
}
+} // namespace Private
} // namespace Trace
} // namespace UE
diff --git a/vs-chromium-project.txt b/vs-chromium-project.txt
index 0a1075662..1e948baad 100644
--- a/vs-chromium-project.txt
+++ b/vs-chromium-project.txt
@@ -7,3 +7,5 @@ vcpkg_installed/
x64/
*.suo
**/x64/
+.test/
+vsxmake*/
diff --git a/zen/cmds/top.cpp b/zen/cmds/top.cpp
index 315d8cb38..f5b9d654a 100644
--- a/zen/cmds/top.cpp
+++ b/zen/cmds/top.cpp
@@ -43,7 +43,7 @@ TopCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) {
- ZEN_CONSOLE("{:5} {:6} {:24}", Entry.ListenPort, Entry.Pid, Entry.GetSessionId());
+ ZEN_CONSOLE("{:5} {:6} {:24}", Entry.EffectiveListenPort, Entry.Pid, Entry.GetSessionId());
});
zen::Sleep(1000);
@@ -78,7 +78,7 @@ PsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return 0;
}
- State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) { ZEN_CONSOLE("Port {} : pid {}", Entry.ListenPort, Entry.Pid); });
+ State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) { ZEN_CONSOLE("Port {} : pid {}", Entry.EffectiveListenPort, Entry.Pid); });
return 0;
}
diff --git a/zencore/include/zencore/refcount.h b/zencore/include/zencore/refcount.h
index 0324b94cc..254a22db5 100644
--- a/zencore/include/zencore/refcount.h
+++ b/zencore/include/zencore/refcount.h
@@ -94,10 +94,28 @@ public:
}
return *this;
}
+ template<typename OtherType>
+ inline RefPtr& operator=(RefPtr<OtherType>&& Rhs) noexcept
+ {
+ if ((RefPtr*)&Rhs != this)
+ {
+ m_Ref && m_Ref->Release();
+ m_Ref = Rhs.m_Ref;
+ Rhs.m_Ref = nullptr;
+ }
+ return *this;
+ }
inline RefPtr(RefPtr&& Rhs) noexcept : m_Ref(Rhs.m_Ref) { Rhs.m_Ref = nullptr; }
+ template<typename OtherType>
+ explicit inline RefPtr(RefPtr<OtherType>&& Rhs) noexcept : m_Ref(Rhs.m_Ref)
+ {
+ Rhs.m_Ref = nullptr;
+ }
private:
T* m_Ref = nullptr;
+ template <typename U>
+ friend class RefPtr;
};
/**
diff --git a/zencore/include/zencore/string.h b/zencore/include/zencore/string.h
index 1e8907906..4c378730f 100644
--- a/zencore/include/zencore/string.h
+++ b/zencore/include/zencore/string.h
@@ -413,6 +413,17 @@ private:
char m_StringBuffer[N];
};
+template<size_t N>
+class WriteToString : public ExtendableStringBuilder<N>
+{
+public:
+ template<typename... ArgTypes>
+ explicit WriteToString(ArgTypes&&... Args)
+ {
+ (*this << ... << std::forward<ArgTypes>(Args));
+ }
+};
+
//////////////////////////////////////////////////////////////////////////
extern template class StringBuilderImpl<wchar_t>;
@@ -454,6 +465,17 @@ private:
wchar_t m_Buffer[N];
};
+template<size_t N>
+class WriteToWideString : public ExtendableWideStringBuilder<N>
+{
+public:
+ template<typename... ArgTypes>
+ explicit WriteToWideString(ArgTypes&&... Args)
+ {
+ (*this << ... << Forward<ArgTypes>(Args));
+ }
+};
+
//////////////////////////////////////////////////////////////////////////
void Utf8ToWide(const char8_t* str, WideStringBuilderBase& out);
diff --git a/zenhttp/httpasio.cpp b/zenhttp/httpasio.cpp
index 801bb51ac..f2d48200e 100644
--- a/zenhttp/httpasio.cpp
+++ b/zenhttp/httpasio.cpp
@@ -64,7 +64,7 @@ public:
HttpAsioServerImpl();
~HttpAsioServerImpl();
- void Start(uint16_t Port, int ThreadCount);
+ int Start(uint16_t Port, int ThreadCount);
void Stop();
void RegisterService(const char* UrlPath, HttpService& Service);
HttpService* RouteRequest(std::string_view Url);
@@ -934,7 +934,12 @@ struct HttpAcceptor
m_Acceptor.set_option(asio::ip::v6_only(false));
m_Acceptor.set_option(asio::socket_base::reuse_address(true));
m_Acceptor.set_option(asio::ip::tcp::no_delay(true));
- m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v6::any(), Port));
+ asio::error_code BindErrorCode;
+ m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v6::any(), Port), BindErrorCode);
+ if (BindErrorCode == asio::error::access_denied)
+ {
+ m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v6::any(), 0));
+ }
m_Acceptor.listen();
}
@@ -980,6 +985,8 @@ struct HttpAcceptor
});
}
+ int GetAcceptPort() { return m_Acceptor.local_endpoint().port(); }
+
private:
HttpAsioServerImpl& m_Server;
asio::io_service& m_IoService;
@@ -1119,7 +1126,7 @@ HttpAsioServerImpl::~HttpAsioServerImpl()
{
}
-void
+int
HttpAsioServerImpl::Start(uint16_t Port, int ThreadCount)
{
ZEN_ASSERT(ThreadCount > 0);
@@ -1142,6 +1149,8 @@ HttpAsioServerImpl::Start(uint16_t Port, int ThreadCount)
}
});
}
+
+ return m_Acceptor->GetAcceptPort();
}
void
@@ -1212,12 +1221,11 @@ HttpAsioServer::RegisterService(HttpService& Service)
m_Impl->RegisterService(Service.BaseUri(), Service);
}
-void
+int
HttpAsioServer::Initialize(int BasePort)
{
- m_BasePort = BasePort;
-
- m_Impl->Start(gsl::narrow<uint16_t>(m_BasePort), Max(std::thread::hardware_concurrency(), 8u));
+ m_BasePort = m_Impl->Start(gsl::narrow<uint16_t>(BasePort), Max(std::thread::hardware_concurrency(), 8u));
+ return m_BasePort;
}
void
diff --git a/zenhttp/httpasio.h b/zenhttp/httpasio.h
index 08834ba21..716145955 100644
--- a/zenhttp/httpasio.h
+++ b/zenhttp/httpasio.h
@@ -22,7 +22,7 @@ public:
~HttpAsioServer();
virtual void RegisterService(HttpService& Service) override;
- virtual void Initialize(int BasePort) override;
+ virtual int Initialize(int BasePort) override;
virtual void Run(bool IsInteractiveSession) override;
virtual void RequestExit() override;
diff --git a/zenhttp/httpnull.cpp b/zenhttp/httpnull.cpp
index 31b13a6ce..a6e1d3567 100644
--- a/zenhttp/httpnull.cpp
+++ b/zenhttp/httpnull.cpp
@@ -24,10 +24,10 @@ HttpNullServer::RegisterService(HttpService& Service)
ZEN_UNUSED(Service);
}
-void
+int
HttpNullServer::Initialize(int BasePort)
{
- ZEN_UNUSED(BasePort);
+ return BasePort;
}
void
diff --git a/zenhttp/httpnull.h b/zenhttp/httpnull.h
index 867bbe4d2..74f021f6b 100644
--- a/zenhttp/httpnull.h
+++ b/zenhttp/httpnull.h
@@ -18,7 +18,7 @@ public:
~HttpNullServer();
virtual void RegisterService(HttpService& Service) override;
- virtual void Initialize(int BasePort) override;
+ virtual int Initialize(int BasePort) override;
virtual void Run(bool IsInteractiveSession) override;
virtual void RequestExit() override;
diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp
index b3d109b6a..3c57f7ce3 100644
--- a/zenhttp/httpsys.cpp
+++ b/zenhttp/httpsys.cpp
@@ -746,7 +746,7 @@ HttpSysServer::~HttpSysServer()
}
}
-void
+int
HttpSysServer::InitializeServer(int BasePort)
{
using namespace std::literals;
@@ -762,7 +762,7 @@ HttpSysServer::InitializeServer(int BasePort)
{
ZEN_ERROR("Failed to create server session for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result);
- return;
+ return BasePort;
}
Result = HttpCreateUrlGroup(m_HttpSessionId, &m_HttpUrlGroupId, 0);
@@ -771,17 +771,29 @@ HttpSysServer::InitializeServer(int BasePort)
{
ZEN_ERROR("Failed to create URL group for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result);
- return;
+ return BasePort;
}
+ int EffectivePort = BasePort;
+
Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, WildcardUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0);
+ // Sharing violation implies the port is being used by another process
+ for (int PortOffset = 1; (Result == ERROR_SHARING_VIOLATION) && (PortOffset < 10); ++PortOffset)
+ {
+ EffectivePort = BasePort + (PortOffset * 100);
+ WildcardUrlPath.Reset();
+ WildcardUrlPath << u8"http://*:"sv << int64_t(EffectivePort) << u8"/"sv;
+
+ Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, WildcardUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0);
+ }
+
m_BaseUris.clear();
if (Result == NO_ERROR)
{
m_BaseUris.push_back(WildcardUrlPath.c_str());
}
- else
+ else if (Result == ERROR_ACCESS_DENIED)
{
// If we can't register the wildcard path, we fall back to local paths
// This local paths allow requests originating locally to function, but will not allow
@@ -792,14 +804,26 @@ HttpSysServer::InitializeServer(int BasePort)
const std::u8string_view Hosts[] = {u8"[::1]"sv, u8"localhost"sv, u8"127.0.0.1"sv};
- for (const std::u8string_view Host : Hosts)
+ ULONG InternalResult = ERROR_SHARING_VIOLATION;
+ for (int PortOffset = 0; (InternalResult == ERROR_SHARING_VIOLATION) && (PortOffset < 10); ++PortOffset)
{
- WideStringBuilder<64> LocalUrlPath;
- LocalUrlPath << u8"http://"sv << Host << u8":"sv << int64_t(BasePort) << u8"/"sv;
+ EffectivePort = BasePort + (PortOffset * 100);
- if (HttpAddUrlToUrlGroup(m_HttpUrlGroupId, LocalUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0) == NO_ERROR)
+ for (const std::u8string_view Host : Hosts)
{
- m_BaseUris.push_back(LocalUrlPath.c_str());
+ WideStringBuilder<64> LocalUrlPath;
+ LocalUrlPath << u8"http://"sv << Host << u8":"sv << int64_t(EffectivePort) << u8"/"sv;
+
+ InternalResult = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, LocalUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0);
+
+ if (InternalResult == NO_ERROR)
+ {
+ m_BaseUris.push_back(LocalUrlPath.c_str());
+ }
+ else
+ {
+ break;
+ }
}
}
}
@@ -808,7 +832,7 @@ HttpSysServer::InitializeServer(int BasePort)
{
ZEN_ERROR("Failed to add base URL to URL group for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result);
- return;
+ return BasePort;
}
HTTP_BINDING_INFO HttpBindingInfo = {{0}, 0};
@@ -823,7 +847,7 @@ HttpSysServer::InitializeServer(int BasePort)
{
ZEN_ERROR("Failed to create request queue for '{}': {:#x}", WideToUtf8(m_BaseUris.front()), Result);
- return;
+ return EffectivePort;
}
HttpBindingInfo.Flags.Present = 1;
@@ -835,7 +859,7 @@ HttpSysServer::InitializeServer(int BasePort)
{
ZEN_ERROR("Failed to set server binding property for '{}': {:#x}", WideToUtf8(m_BaseUris.front()), Result);
- return;
+ return EffectivePort;
}
// Create I/O completion port
@@ -853,6 +877,8 @@ HttpSysServer::InitializeServer(int BasePort)
ZEN_INFO("Started http.sys server at '{}'", WideToUtf8(m_BaseUris.front()));
}
+
+ return EffectivePort;
}
void
@@ -1603,11 +1629,12 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT
// HttpServer interface implementation
//
-void
+int
HttpSysServer::Initialize(int BasePort)
{
- InitializeServer(BasePort);
+ int EffectivePort = InitializeServer(BasePort);
StartServer();
+ return EffectivePort;
}
void
diff --git a/zenhttp/httpsys.h b/zenhttp/httpsys.h
index 06bad99c3..0453b8740 100644
--- a/zenhttp/httpsys.h
+++ b/zenhttp/httpsys.h
@@ -41,7 +41,7 @@ public:
// HttpServer interface implementation
- virtual void Initialize(int BasePort) override;
+ virtual int Initialize(int BasePort) override;
virtual void Run(bool TestMode) override;
virtual void RequestExit() override;
virtual void RegisterService(HttpService& Service) override;
@@ -52,7 +52,7 @@ public:
inline bool IsAsyncResponseEnabled() const { return m_IsAsyncResponseEnabled; }
private:
- void InitializeServer(int BasePort);
+ int InitializeServer(int BasePort);
void Cleanup();
void StartServer();
diff --git a/zenhttp/include/zenhttp/httpserver.h b/zenhttp/include/zenhttp/httpserver.h
index 902310f04..545b96db2 100644
--- a/zenhttp/include/zenhttp/httpserver.h
+++ b/zenhttp/include/zenhttp/httpserver.h
@@ -169,7 +169,7 @@ class HttpServer : public RefCounted
{
public:
virtual void RegisterService(HttpService& Service) = 0;
- virtual void Initialize(int BasePort) = 0;
+ virtual int Initialize(int BasePort) = 0;
virtual void Run(bool IsInteractiveSession) = 0;
virtual void RequestExit() = 0;
};
diff --git a/zenserver-test/cachepolicy-tests.cpp b/zenserver-test/cachepolicy-tests.cpp
new file mode 100644
index 000000000..686ff818c
--- /dev/null
+++ b/zenserver-test/cachepolicy-tests.cpp
@@ -0,0 +1,164 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zencore/zencore.h>
+
+#if ZEN_WITH_TESTS
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/string.h>
+# include <zencore/testing.h>
+# include <zencore/uid.h>
+# include <zenutil/cache/cachepolicy.h>
+
+namespace zen::tests {
+
+using namespace std::literals;
+
+TEST_CASE("cachepolicy")
+{
+ SUBCASE("atomics serialization")
+ {
+ CachePolicy SomeAtomics[] = {CachePolicy::None,
+ CachePolicy::QueryLocal,
+ CachePolicy::StoreRemote,
+ CachePolicy::SkipData,
+ CachePolicy::KeepAlive,
+ CachePolicy::Disable};
+ for (CachePolicy Atomic : SomeAtomics)
+ {
+ CHECK(ParseCachePolicy(WriteToString<128>(Atomic)) == Atomic);
+ }
+ // Also verify that we ignore unrecognized bits
+ for (CachePolicy Atomic : SomeAtomics)
+ {
+ CHECK(ParseCachePolicy(WriteToString<128>(Atomic | (CachePolicy)0x10000000)) == Atomic);
+ }
+ }
+ SUBCASE("aliases serialization")
+ {
+ CachePolicy SomeAliases[] = {CachePolicy::Query, CachePolicy::Local};
+ for (CachePolicy Alias : SomeAliases)
+ {
+ CHECK(ParseCachePolicy(WriteToString<128>(Alias)) == Alias);
+ }
+ // Also verify that we ignore unrecognized bits
+ for (CachePolicy Alias : SomeAliases)
+ {
+ CHECK(ParseCachePolicy(WriteToString<128>(Alias | (CachePolicy)0x10000000)) == Alias);
+ }
+ }
+ SUBCASE("aliases take priority over atomics")
+ {
+ CHECK(WriteToString<128>(CachePolicy::Default).ToView() == "Default"sv);
+ CHECK(WriteToString<128>(CachePolicy::Query).ToView() == "Query"sv);
+ CHECK(WriteToString<128>(CachePolicy::Local).ToView() == "Local"sv);
+ }
+ SUBCASE("policies requiring multiple strings work")
+ {
+ char Delimiter = ',';
+ CachePolicy Combination = CachePolicy::SkipData | CachePolicy::QueryLocal;
+ CHECK(WriteToString<128>(Combination).ToView().find(Delimiter) != std::string_view::npos);
+ CHECK(ParseCachePolicy(WriteToString<128>(Combination)) == Combination);
+ }
+ SUBCASE("parsing invalid text")
+ {
+ CHECK(ParseCachePolicy(",,,") == CachePolicy::None);
+ CHECK(ParseCachePolicy("fee,fie,foo,fum") == CachePolicy::None);
+ CHECK(ParseCachePolicy("fee,KeepAlive,foo,fum") == CachePolicy::KeepAlive);
+ }
+}
+
+TEST_CASE("cacherecordpolicy")
+{
+ SUBCASE("policy with no values")
+ {
+ CachePolicy Policy = CachePolicy::SkipData | CachePolicy::QueryLocal;
+ CacheRecordPolicy RecordPolicy;
+ CacheRecordPolicyBuilder Builder(Policy);
+ RecordPolicy = Builder.Build();
+ SUBCASE("construct")
+ {
+ CHECK(RecordPolicy.IsUniform());
+ CHECK(RecordPolicy.GetRecordPolicy() == Policy);
+ CHECK(RecordPolicy.GetDefaultValuePolicy() == Policy);
+ CHECK(RecordPolicy.GetValuePolicy(Oid::NewOid()) == Policy);
+ CHECK(RecordPolicy.GetValuePolicies().size() == 0);
+ }
+ SUBCASE("saveload")
+ {
+ CbWriter Writer;
+ RecordPolicy.Save(Writer);
+ CbObject Saved = Writer.Save()->AsObject();
+ CacheRecordPolicy Loaded = CacheRecordPolicy::Load(Saved);
+ CHECK(Loaded.IsUniform());
+ CHECK(Loaded.GetRecordPolicy() == Policy);
+ CHECK(Loaded.GetDefaultValuePolicy() == Policy);
+ CHECK(Loaded.GetValuePolicy(Oid::NewOid()) == Policy);
+ CHECK(Loaded.GetValuePolicies().size() == 0);
+ }
+ }
+
+ SUBCASE("policy with values")
+ {
+ CachePolicy DefaultPolicy = CachePolicy::StoreRemote | CachePolicy::QueryLocal;
+ CachePolicy PartialOverlap = CachePolicy::StoreRemote;
+ CachePolicy NoOverlap = CachePolicy::QueryRemote;
+ CachePolicy UnionPolicy = DefaultPolicy | PartialOverlap | NoOverlap;
+
+ CacheRecordPolicy RecordPolicy;
+ CacheRecordPolicyBuilder Builder(DefaultPolicy);
+ Oid PartialOid = Oid::NewOid();
+ Oid NoOverlapOid = Oid::NewOid();
+ Oid OtherOid = Oid::NewOid();
+ Builder.AddValuePolicy(PartialOid, PartialOverlap);
+ Builder.AddValuePolicy(NoOverlapOid, NoOverlap);
+ RecordPolicy = Builder.Build();
+ SUBCASE("construct")
+ {
+ CHECK(!RecordPolicy.IsUniform());
+ CHECK(RecordPolicy.GetRecordPolicy() == UnionPolicy);
+ CHECK(RecordPolicy.GetDefaultValuePolicy() == DefaultPolicy);
+ CHECK(RecordPolicy.GetValuePolicy(PartialOid) == PartialOverlap);
+ CHECK(RecordPolicy.GetValuePolicy(NoOverlapOid) == NoOverlap);
+ CHECK(RecordPolicy.GetValuePolicy(OtherOid) == DefaultPolicy);
+ CHECK(RecordPolicy.GetValuePolicies().size() == 2);
+ }
+ SUBCASE("saveload")
+ {
+ CbWriter Writer;
+ RecordPolicy.Save(Writer);
+ CbObject Saved = Writer.Save()->AsObject();
+ CacheRecordPolicy Loaded = CacheRecordPolicy::Load(Saved);
+ CHECK(!RecordPolicy.IsUniform());
+ CHECK(RecordPolicy.GetRecordPolicy() == UnionPolicy);
+ CHECK(RecordPolicy.GetDefaultValuePolicy() == DefaultPolicy);
+ CHECK(RecordPolicy.GetValuePolicy(PartialOid) == PartialOverlap);
+ CHECK(RecordPolicy.GetValuePolicy(NoOverlapOid) == NoOverlap);
+ CHECK(RecordPolicy.GetValuePolicy(OtherOid) == DefaultPolicy);
+ CHECK(RecordPolicy.GetValuePolicies().size() == 2);
+ }
+ }
+
+ SUBCASE("parsing invalid text")
+ {
+ CacheRecordPolicy Loaded = CacheRecordPolicy::Load(CbObject());
+ CHECK(Loaded.IsUniform());
+ CHECK(Loaded.GetRecordPolicy() == CachePolicy::Default);
+ CHECK(Loaded.GetDefaultValuePolicy() == CachePolicy::Default);
+ CHECK(Loaded.GetValuePolicy(Oid::NewOid()) == CachePolicy::Default);
+ CHECK(Loaded.GetValuePolicies().size() == 0);
+
+ CachePolicy Policy = CachePolicy::SkipData;
+ Loaded = CacheRecordPolicy::Load(CbObject(), Policy);
+ CHECK(Loaded.IsUniform());
+ CHECK(Loaded.GetRecordPolicy() == Policy);
+ CHECK(Loaded.GetDefaultValuePolicy() == Policy);
+ CHECK(Loaded.GetValuePolicy(Oid::NewOid()) == Policy);
+ CHECK(Loaded.GetValuePolicies().size() == 0);
+ }
+}
+
+} // namespace zen::tests
+
+#endif
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 75aae6321..85393aed2 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -1536,13 +1536,13 @@ TEST_CASE("zcache.policy")
}
{
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?query=local", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=QueryLocal,Store", LocalCfg.BaseUri, Bucket, Key)},
cpr::Header{{"Accept", "application/octet-stream"}});
CHECK(Result.status_code == 404);
}
{
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?query=local,remote", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)},
cpr::Header{{"Accept", "application/octet-stream"}});
CHECK(Result.status_code == 200);
}
@@ -1564,7 +1564,7 @@ TEST_CASE("zcache.policy")
// Store binary cache value locally
{
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?store=local", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,StoreLocal", LocalCfg.BaseUri, Bucket, Key)},
cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
cpr::Header{{"Content-Type", "application/octet-stream"}});
CHECK(Result.status_code == 201);
@@ -1599,7 +1599,7 @@ TEST_CASE("zcache.policy")
// Store binary cache value locally and upstream
{
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?store=local,remote", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)},
cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
cpr::Header{{"Content-Type", "application/octet-stream"}});
CHECK(Result.status_code == 201);
@@ -1643,13 +1643,13 @@ TEST_CASE("zcache.policy")
}
{
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?query=local", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=QueryLocal,Store", LocalCfg.BaseUri, Bucket, Key)},
cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
CHECK(Result.status_code == 404);
}
{
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?query=local,remote", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)},
cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
CHECK(Result.status_code == 200);
}
@@ -1673,7 +1673,7 @@ TEST_CASE("zcache.policy")
// Store packge locally
{
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?store=local", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,StoreLocal", LocalCfg.BaseUri, Bucket, Key)},
cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
CHECK(Result.status_code == 201);
@@ -1710,7 +1710,7 @@ TEST_CASE("zcache.policy")
// Store package locally and upstream
{
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?store=local,remote", LocalCfg.BaseUri, Bucket, Key)},
+ cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)},
cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
CHECK(Result.status_code == 201);
@@ -1729,130 +1729,7 @@ TEST_CASE("zcache.policy")
}
}
- SUBCASE("skip - 'attachments' does not return attachments")
- {
- ZenConfig LocalCfg = ZenConfig::New();
- ZenServerInstance LocalInst(TestEnv);
- const auto Bucket = "texture"sv;
-
- LocalCfg.Spawn(LocalInst);
-
- zen::IoHash Key;
- zen::IoHash PayloadId;
-
- // Store package locally
- {
- zen::CbPackage Package = GeneratePackage(Key, PayloadId);
- auto Buf = ToBuffer(Package);
-
- CHECK(Package.GetAttachments().size() != 0);
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 201);
- }
-
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?skip=attachments", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 200);
-
- zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
- zen::CbPackage Package;
- const bool Ok = Package.TryLoad(Body);
- CHECK(Ok);
-
- CHECK(Ok);
-
- CbObject CacheRecord = Package.GetObject();
- std::vector<IoHash> AttachmentKeys;
-
- CacheRecord.IterateAttachments(
- [&AttachmentKeys](CbFieldView AttachmentKey) { AttachmentKeys.push_back(AttachmentKey.AsHash()); });
-
- CHECK(AttachmentKeys.size() != 0);
- CHECK(Package.GetAttachments().size() == 0);
- }
-
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 200);
-
- zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
- zen::CbPackage Package;
- const bool Ok = Package.TryLoad(Body);
- CHECK(Ok);
-
- CHECK(Ok);
- CHECK(Package.GetAttachments().size() != 0);
- }
- }
-
- SUBCASE("skip - 'attachments' does not return attachments when retrieved from upstream")
- {
- ZenConfig UpstreamCfg = ZenConfig::New(13338);
- ZenServerInstance UpstreamInst(TestEnv);
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
- ZenServerInstance LocalInst(TestEnv);
- const auto Bucket = "texture"sv;
-
- UpstreamCfg.Spawn(UpstreamInst);
- LocalCfg.Spawn(LocalInst);
-
- zen::IoHash Key;
- zen::IoHash PayloadId;
-
- // Store package upstream
- {
- zen::CbPackage Package = GeneratePackage(Key, PayloadId);
- auto Buf = ToBuffer(Package);
-
- CHECK(Package.GetAttachments().size() != 0);
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
- cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 201);
- }
-
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?skip=attachments", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 200);
-
- zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
- zen::CbPackage Package;
- const bool Ok = Package.TryLoad(Body);
- CHECK(Ok);
-
- CHECK(Ok);
-
- CbObject CacheRecord = Package.GetObject();
- std::vector<IoHash> AttachmentKeys;
-
- CacheRecord.IterateAttachments(
- [&AttachmentKeys](CbFieldView AttachmentKey) { AttachmentKeys.push_back(AttachmentKey.AsHash()); });
-
- CHECK(AttachmentKeys.size() != 0);
- CHECK(Package.GetAttachments().size() == 0);
- }
-
- {
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.status_code == 200);
-
- zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
- zen::CbPackage Package;
- const bool Ok = Package.TryLoad(Body);
- CHECK(Ok);
-
- CHECK(Ok);
- CHECK(Package.GetAttachments().size() != 0);
- }
- }
-
- SUBCASE("skip - 'data' returns empty cache record/payload")
+ SUBCASE("skip - 'data' returns cache record without attachments/empty payload")
{
ZenConfig Cfg = ZenConfig::New();
ZenServerInstance Instance(TestEnv);
@@ -1875,24 +1752,30 @@ TEST_CASE("zcache.policy")
// Get package
{
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?skip=data", Cfg.BaseUri, Bucket, Key)},
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)},
cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
CHECK(IsHttpSuccessCode(Result.status_code));
- CHECK(Result.text.size() == 0);
+ IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size());
+ CbPackage ResponsePackage;
+ CHECK(ResponsePackage.TryLoad(Buffer));
+ CHECK(ResponsePackage.GetAttachments().size() == 0);
}
// Get record
{
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?skip=data", Cfg.BaseUri, Bucket, Key)},
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)},
cpr::Header{{"Accept", "application/x-ue-cbobject"}});
CHECK(IsHttpSuccessCode(Result.status_code));
- CHECK(Result.text.size() == 0);
+ IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size());
+ CbObject ResponseObject = zen::LoadCompactBinaryObject(Buffer);
+ CHECK((bool)ResponseObject);
}
// Get payload
{
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}/{}?skip=data", Cfg.BaseUri, Bucket, Key, PayloadId)},
- cpr::Header{{"Accept", "application/x-ue-comp"}});
+ cpr::Response Result =
+ cpr::Get(cpr::Url{fmt::format("{}/{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key, PayloadId)},
+ cpr::Header{{"Accept", "application/x-ue-comp"}});
CHECK(IsHttpSuccessCode(Result.status_code));
CHECK(Result.text.size() == 0);
}
@@ -1919,7 +1802,7 @@ TEST_CASE("zcache.policy")
// Get package
{
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?skip=data", Cfg.BaseUri, Bucket, Key)},
+ cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)},
cpr::Header{{"Accept", "application/octet-stream"}});
CHECK(IsHttpSuccessCode(Result.status_code));
CHECK(Result.text.size() == 0);
@@ -1931,7 +1814,12 @@ TEST_CASE("zcache.rpc")
{
using namespace std::literals;
- auto CreateCacheRecord = [](const zen::CacheKey& CacheKey, size_t PayloadSize) -> zen::CbPackage {
+ auto AppendCacheRecord = [](CbPackage& Package,
+ CbWriter& Writer,
+ const zen::CacheKey& CacheKey,
+ size_t PayloadSize,
+ CachePolicy /* BatchDefaultPolicy */,
+ CachePolicy RecordPolicy) {
std::vector<uint8_t> Data;
Data.resize(PayloadSize);
for (size_t Idx = 0; Idx < PayloadSize; ++Idx)
@@ -1941,17 +1829,24 @@ TEST_CASE("zcache.rpc")
zen::CbAttachment Attachment(zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())));
- zen::CbObjectWriter CacheRecord;
- CacheRecord.BeginObject("CacheKey"sv);
- CacheRecord << "Bucket"sv << CacheKey.Bucket << "Hash"sv << CacheKey.Hash;
- CacheRecord.EndObject();
- CacheRecord << "Data"sv << Attachment;
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Record"sv);
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << CacheKey.Bucket << "Hash"sv << CacheKey.Hash;
+ }
+ Writer.EndObject();
+ Writer << "Data"sv << Attachment;
+ }
+ Writer.EndObject();
+ Writer.SetName("Policy"sv);
+ CacheRecordPolicy(RecordPolicy).Save(Writer);
+ }
+ Writer.EndObject();
- zen::CbPackage Package;
- Package.SetObject(CacheRecord.Save());
Package.AddAttachment(Attachment);
-
- return Package;
};
auto ToIoBuffer = [](zen::CbPackage Package) -> zen::IoBuffer {
@@ -1960,27 +1855,46 @@ TEST_CASE("zcache.rpc")
return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size());
};
- auto PutCacheRecords = [&CreateCacheRecord, &ToIoBuffer](std::string_view BaseUri,
- std::string_view Query,
- std::string_view Bucket,
- size_t Num,
- size_t PayloadSize = 1024) -> std::vector<CacheKey> {
+ auto PutCacheRecords =
+ [&AppendCacheRecord,
+ &ToIoBuffer](std::string_view BaseUri, std::string_view Bucket, size_t Num, size_t PayloadSize = 1024) -> std::vector<CacheKey> {
std::vector<zen::CacheKey> OutKeys;
for (uint32_t Key = 1; Key <= Num; ++Key)
{
- const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, zen::IoHash::HashBuffer(&Key, sizeof(uint32_t)));
- CbPackage CacheRecord = CreateCacheRecord(CacheKey, PayloadSize);
+ zen::IoHash KeyHash;
+ ((uint32_t*)(KeyHash.Hash))[0] = Key;
+ const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash);
+ CbPackage Package;
+ CbWriter Writer;
- OutKeys.push_back(CacheKey);
+ Writer.BeginObject();
+ {
+ Writer << "Method"sv
+ << "PutCacheRecords"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("Requests"sv);
+ {
+ AppendCacheRecord(Package, Writer, CacheKey, PayloadSize, BatchDefaultPolicy, CachePolicy::Default);
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save().AsObject());
- IoBuffer Payload = ToIoBuffer(CacheRecord);
+ OutKeys.push_back(CacheKey);
- cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}{}", BaseUri, CacheKey.Bucket, CacheKey.Hash, Query)},
- cpr::Body{(const char*)Payload.Data(), Payload.Size()},
- cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
- CHECK(Result.status_code == 201);
+ CHECK(Result.status_code == 200);
}
return OutKeys;
@@ -2011,9 +1925,8 @@ TEST_CASE("zcache.rpc")
}
Request.EndArray();
- Request.BeginObject("Policy");
- CacheRecordPolicy::Save(Policy, Request);
- Request.EndObject();
+ Request.SetName("Policy"sv);
+ Policy.Save(Request);
Request.EndObject();
@@ -2066,7 +1979,7 @@ TEST_CASE("zcache.rpc")
Inst.WaitUntilReady();
CacheRecordPolicy Policy;
- std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 128);
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "mastodon"sv, 128);
GetCacheRecordResult Result = GetCacheRecords(BaseUri, Keys, Policy);
CHECK(Result.Records.size() == Keys.size());
@@ -2076,7 +1989,7 @@ TEST_CASE("zcache.rpc")
const CacheKey& ExpectedKey = Keys[Index++];
CbObjectView RecordObj = RecordView.AsObjectView();
- CbObjectView KeyObj = RecordObj["CacheKey"sv].AsObjectView();
+ CbObjectView KeyObj = RecordObj["Key"sv].AsObjectView();
const CacheKey Key = CacheKey::Create(KeyObj["Bucket"sv].AsString(), KeyObj["Hash"].AsHash());
const IoHash AttachmentHash = RecordObj["Data"sv].AsHash();
const CbAttachment* Attachment = Result.Response.FindAttachment(AttachmentHash);
@@ -2098,7 +2011,7 @@ TEST_CASE("zcache.rpc")
Inst.WaitUntilReady();
CacheRecordPolicy Policy;
- std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 128);
+ std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "mastodon"sv, 128);
std::vector<zen::CacheKey> Keys;
for (const zen::CacheKey& Key : ExistingKeys)
@@ -2124,7 +2037,7 @@ TEST_CASE("zcache.rpc")
{
const CacheKey& ExpectedKey = ExistingKeys[KeyIndex++];
CbObjectView RecordObj = RecordView.AsObjectView();
- zen::CacheKey Key = LoadKey(RecordObj["CacheKey"sv]);
+ zen::CacheKey Key = LoadKey(RecordObj["Key"sv]);
const IoHash AttachmentHash = RecordObj["Data"sv].AsHash();
const CbAttachment* Attachment = Result.Response.FindAttachment(AttachmentHash);
@@ -2134,38 +2047,6 @@ TEST_CASE("zcache.rpc")
}
}
- SUBCASE("policy - 'SkipAttachments' does not return any record attachments")
- {
- std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
- const uint16_t PortNumber = 13337;
- const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
-
- ZenServerInstance Inst(TestEnv);
- Inst.SetTestDir(TestDir);
- Inst.SpawnServer(PortNumber);
- Inst.WaitUntilReady();
-
- CacheRecordPolicy Policy(CachePolicy::QueryLocal | CachePolicy::SkipAttachments);
- std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 4);
- GetCacheRecordResult Result = GetCacheRecords(BaseUri, Keys, Policy);
-
- CHECK(Result.Records.size() == Keys.size());
-
- std::span<const zen::CbAttachment> Attachments = Result.Response.GetAttachments();
- CHECK(Attachments.empty());
-
- for (size_t Index = 0; CbFieldView RecordView : Result.Records)
- {
- const CacheKey& ExpectedKey = Keys[Index++];
-
- CbObjectView RecordObj = RecordView.AsObjectView();
- CbObjectView KeyObj = RecordObj["CacheKey"sv].AsObjectView();
- const CacheKey Key = CacheKey::Create(KeyObj["Bucket"sv].AsString(), KeyObj["Hash"].AsHash());
-
- CHECK(Key == ExpectedKey);
- }
- }
-
SUBCASE("policy - 'QueryLocal' does not query upstream")
{
using namespace utils;
@@ -2178,7 +2059,7 @@ TEST_CASE("zcache.rpc")
SpawnServer(UpstreamServer, UpstreamCfg);
SpawnServer(LocalServer, LocalCfg);
- std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, ""sv, "mastodon"sv, 4);
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "mastodon"sv, 4);
CacheRecordPolicy Policy(CachePolicy::QueryLocal);
GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, Keys, Policy);
@@ -2203,7 +2084,7 @@ TEST_CASE("zcache.rpc")
SpawnServer(UpstreamServer, UpstreamCfg);
SpawnServer(LocalServer, LocalCfg);
- std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, ""sv, "mastodon"sv, 4);
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "mastodon"sv, 4);
CacheRecordPolicy Policy(CachePolicy::QueryLocal | CachePolicy::QueryRemote);
GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, Keys, Policy);
@@ -2214,7 +2095,7 @@ TEST_CASE("zcache.rpc")
{
const zen::CacheKey& ExpectedKey = Keys[Index++];
CbObjectView RecordObj = RecordView.AsObjectView();
- zen::CacheKey Key = LoadKey(RecordObj["CacheKey"sv]);
+ zen::CacheKey Key = LoadKey(RecordObj["Key"sv]);
CHECK(Key == ExpectedKey);
}
}
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 5918d5178..d39b95a1e 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -7,6 +7,7 @@
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/compress.h>
+#include <zencore/enumflags.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
@@ -14,6 +15,7 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zenhttp/httpserver.h>
+#include <zenhttp/httpshared.h>
#include <zenstore/cas.h>
#include <zenutil/cache/cache.h>
@@ -42,11 +44,8 @@ using namespace std::literals;
CachePolicy
ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams)
{
- const CachePolicy QueryPolicy = zen::ParseQueryCachePolicy(QueryParams.GetValue("query"sv));
- const CachePolicy StorePolicy = zen::ParseStoreCachePolicy(QueryParams.GetValue("store"sv));
- const CachePolicy SkipPolicy = zen::ParseSkipCachePolicy(QueryParams.GetValue("skip"sv));
-
- return QueryPolicy | StorePolicy | SkipPolicy;
+ std::string_view PolicyText = QueryParams.GetValue("Policy"sv);
+ return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default;
}
struct AttachmentCount
@@ -57,6 +56,13 @@ struct AttachmentCount
uint32_t Total = 0;
};
+struct PutRequestData
+{
+ CacheKey Key;
+ CbObjectView RecordObject;
+ CacheRecordPolicy Policy;
+};
+
//////////////////////////////////////////////////////////////////////////
HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
@@ -134,16 +140,15 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
}
- const auto QueryParams = Request.GetQueryParams();
- CachePolicy Policy = ParseCachePolicy(QueryParams);
+ CachePolicy PolicyFromURL = ParseCachePolicy(Request.GetQueryParams());
- if (Ref.PayloadId == IoHash::Zero)
+ if (Ref.ValueContentId == IoHash::Zero)
{
- return HandleCacheRecordRequest(Request, Ref, Policy);
+ return HandleCacheRecordRequest(Request, Ref, PolicyFromURL);
}
else
{
- return HandleCachePayloadRequest(Request, Ref, Policy);
+ return HandleCacheValueRequest(Request, Ref, PolicyFromURL);
}
return;
@@ -180,19 +185,19 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
}
void
-HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
+HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
{
switch (Request.RequestVerb())
{
case HttpVerb::kHead:
case HttpVerb::kGet:
{
- HandleGetCacheRecord(Request, Ref, Policy);
+ HandleGetCacheRecord(Request, Ref, PolicyFromURL);
}
break;
case HttpVerb::kPut:
- HandlePutCacheRecord(Request, Ref, Policy);
+ HandlePutCacheRecord(Request, Ref, PolicyFromURL);
break;
default:
break;
@@ -200,18 +205,20 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request,
}
void
-HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
+HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
{
- const ZenContentType AcceptType = Request.AcceptContentType();
- const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData;
- const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments;
- const bool PartialOnError = (Policy & CachePolicy::PartialOnError) == CachePolicy::PartialOnError;
- const bool QueryUpstream = (Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote;
+ const ZenContentType AcceptType = Request.AcceptContentType();
+ const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData);
+ const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord);
bool Success = false;
- ZenCacheValue LocalCacheValue;
+ ZenCacheValue ClientResultValue;
+ if (!EnumHasAnyFlags(PolicyFromURL, CachePolicy::Query))
+ {
+ return Request.WriteResponse(HttpResponseCode::OK);
+ }
- if (m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, LocalCacheValue))
+ if (EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal) && m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, ClientResultValue))
{
Success = true;
@@ -220,14 +227,11 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
CbPackage Package;
uint32_t MissingCount = 0;
- CbObjectView CacheRecord(LocalCacheValue.Value.Data());
- CacheRecord.IterateAttachments([this, SkipAttachments, &MissingCount, &Package](CbFieldView AttachmentHash) {
- if (SkipAttachments && MissingCount == 0)
+ CbObjectView CacheRecord(ClientResultValue.Value.Data());
+ CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) {
+ if (SkipData)
{
- if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash()))
- {
- MissingCount++;
- }
+ MissingCount += m_CidStore.ContainsChunk(AttachmentHash.AsHash()) ? 0 : 1;
}
else
{
@@ -242,17 +246,17 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
}
});
- Success = MissingCount == 0 || PartialOnError;
+ Success = MissingCount == 0 || PartialRecord;
if (Success)
{
- Package.SetObject(LoadCompactBinaryObject(LocalCacheValue.Value));
+ Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value));
BinaryWriter MemStream;
Package.Save(MemStream);
- LocalCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- LocalCacheValue.Value.SetContentType(HttpContentType::kCbPackage);
+ ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage);
}
}
}
@@ -262,21 +266,21 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)",
Ref.BucketSegment,
Ref.HashKey,
- NiceBytes(LocalCacheValue.Value.Size()),
- ToString(LocalCacheValue.Value.GetContentType()));
+ NiceBytes(ClientResultValue.Value.Size()),
+ ToString(ClientResultValue.Value.GetContentType()));
m_CacheStats.HitCount++;
-
- if (SkipData)
+ if (SkipData && AcceptType == ZenContentType::kBinary)
{
return Request.WriteResponse(HttpResponseCode::OK);
}
else
{
- return Request.WriteResponse(HttpResponseCode::OK, LocalCacheValue.Value.GetContentType(), LocalCacheValue.Value);
+ // Other types handled SkipData when constructing the ClientResultValue
+ return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value);
}
}
- else if (!QueryUpstream)
+ else if (!EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryRemote))
{
ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
m_CacheStats.MissCount++;
@@ -286,9 +290,13 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
// Issue upstream query asynchronously in order to keep requests flowing without
// hogging I/O servicing threads with blocking work
- Request.WriteResponseAsync([this, AcceptType, SkipData, SkipAttachments, PartialOnError, Ref](HttpServerRequest& AsyncRequest) {
- bool Success = false;
- ZenCacheValue UpstreamCacheValue;
+ Request.WriteResponseAsync([this, AcceptType, PolicyFromURL, Ref](HttpServerRequest& AsyncRequest) {
+ bool Success = false;
+ const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord);
+ const bool QueryLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal);
+ const bool StoreLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreLocal);
+ const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData);
+ ZenCacheValue ClientResultValue;
metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming);
@@ -297,8 +305,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
{
Success = true;
- UpstreamCacheValue.Value = UpstreamResult.Value;
- UpstreamCacheValue.Value.SetContentType(AcceptType);
+ ClientResultValue.Value = UpstreamResult.Value;
+ ClientResultValue.Value.SetContentType(AcceptType);
if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject)
{
@@ -313,72 +321,82 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
Ref.HashKey,
ToString(AcceptType));
}
+
+ // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data
}
- if (Success)
+ if (Success && StoreLocal)
{
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, UpstreamCacheValue);
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, ClientResultValue);
}
}
else if (AcceptType == ZenContentType::kCbPackage)
{
CbPackage Package;
- if (Package.TryLoad(UpstreamCacheValue.Value))
+ if (Package.TryLoad(ClientResultValue.Value))
{
CbObject CacheRecord = Package.GetObject();
AttachmentCount Count;
- CacheRecord.IterateAttachments([this, &Package, &Ref, &Count, SkipAttachments](CbFieldView HashView) {
+ CacheRecord.IterateAttachments([this, &Package, &Ref, &Count, QueryLocal, StoreLocal](CbFieldView HashView) {
if (const CbAttachment* Attachment = Package.FindAttachment(HashView.AsHash()))
{
if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
{
- auto InsertResult = m_CidStore.AddChunk(Compressed);
- if (InsertResult.New)
+ if (StoreLocal)
{
- Count.New++;
+ auto InsertResult = m_CidStore.AddChunk(Compressed);
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
}
Count.Valid++;
}
else
{
- ZEN_WARN("Uncompressed payload '{}' from upstream cache record '{}/{}'",
+ ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'",
HashView.AsHash(),
Ref.BucketSegment,
Ref.HashKey);
Count.Invalid++;
}
}
- else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash()))
+ else if (QueryLocal)
{
- if (!SkipAttachments)
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash()))
{
Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ Count.Valid++;
}
- Count.Valid++;
}
Count.Total++;
});
- if ((Count.Valid == Count.Total) || PartialOnError)
+ if ((Count.Valid == Count.Total) || PartialRecord)
{
ZenCacheValue CacheValue;
CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
-
- if (SkipAttachments)
+ if (StoreLocal)
{
- Package.Reset();
- Package.SetObject(CacheRecord);
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
}
BinaryWriter MemStream;
- Package.Save(MemStream);
+ if (SkipData)
+ {
+ // Save a package containing only the object.
+ CbPackage(Package.GetObject()).Save(MemStream);
+ }
+ else
+ {
+ Package.Save(MemStream);
+ }
- UpstreamCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- UpstreamCacheValue.Value.SetContentType(ZenContentType::kCbPackage);
+ ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage);
}
else
{
@@ -402,19 +420,21 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)",
Ref.BucketSegment,
Ref.HashKey,
- NiceBytes(UpstreamCacheValue.Value.Size()),
- ToString(UpstreamCacheValue.Value.GetContentType()));
+ NiceBytes(ClientResultValue.Value.Size()),
+ ToString(ClientResultValue.Value.GetContentType()));
m_CacheStats.HitCount++;
m_CacheStats.UpstreamHitCount++;
- if (SkipData)
+ if (SkipData && AcceptType == ZenContentType::kBinary)
{
AsyncRequest.WriteResponse(HttpResponseCode::OK);
}
else
{
- AsyncRequest.WriteResponse(HttpResponseCode::OK, UpstreamCacheValue.Value.GetContentType(), UpstreamCacheValue.Value);
+ // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally
+ // metadata.
+ AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value);
}
}
else
@@ -427,7 +447,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
}
void
-HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
+HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
{
IoBuffer Body = Request.ReadPayload();
@@ -436,8 +456,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
- const HttpContentType ContentType = Request.RequestContentType();
- const bool StoreUpstream = (Policy & CachePolicy::StoreRemote) == CachePolicy::StoreRemote;
+ const HttpContentType ContentType = Request.RequestContentType();
Body.SetContentType(ContentType);
@@ -446,7 +465,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
ZEN_DEBUG("PUT - '{}/{}' {} '{}'", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType));
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
- if (StoreUpstream)
+ if (EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreRemote))
{
m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kBinary, .Key = {Ref.BucketSegment, Ref.HashKey}});
}
@@ -463,6 +482,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv);
}
+ CachePolicy Policy = PolicyFromURL;
CbObjectView CacheRecord(Body.Data());
std::vector<IoHash> ValidAttachments;
int32_t TotalCount = 0;
@@ -489,10 +509,11 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size());
- if (StoreUpstream && !IsPartialRecord)
+ if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
- m_UpstreamCache.EnqueueUpstream(
- {.Type = ZenContentType::kCbObject, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)});
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ .Key = {Ref.BucketSegment, Ref.HashKey},
+ .ValueContentIds = std::move(ValidAttachments)});
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -506,6 +527,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
ZEN_WARN("PUT - '{}/{}' '{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey, ToString(ContentType));
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv);
}
+ CachePolicy Policy = PolicyFromURL;
CbObject CacheRecord = Package.GetObject();
AttachmentCount Count;
@@ -570,10 +592,11 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
const bool IsPartialRecord = Count.Valid != Count.Total;
- if (StoreUpstream && !IsPartialRecord)
+ if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
- m_UpstreamCache.EnqueueUpstream(
- {.Type = ZenContentType::kCbPackage, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)});
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
+ .Key = {Ref.BucketSegment, Ref.HashKey},
+ .ValueContentIds = std::move(ValidAttachments)});
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -585,18 +608,16 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
}
void
-HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
+HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
{
switch (Request.RequestVerb())
{
case HttpVerb::kHead:
case HttpVerb::kGet:
- {
- HandleGetCachePayload(Request, Ref, Policy);
- }
+ HandleGetCacheValue(Request, Ref, PolicyFromURL);
break;
case HttpVerb::kPut:
- HandlePutCachePayload(Request, Ref, Policy);
+ HandlePutCacheValue(Request, Ref, PolicyFromURL);
break;
default:
break;
@@ -604,15 +625,17 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request
}
void
-HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
+HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
{
- IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId);
- bool InUpstreamCache = false;
- const bool QueryUpstream = !Payload && (Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote;
+ IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId);
+ bool InUpstreamCache = false;
+ CachePolicy Policy = PolicyFromURL;
+ const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote);
if (QueryUpstream)
{
- if (auto UpstreamResult = m_UpstreamCache.GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success)
+ if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
+ UpstreamResult.Success)
{
if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
{
@@ -621,14 +644,14 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
}
else
{
- ZEN_WARN("got uncompressed upstream cache payload");
+ ZEN_WARN("got uncompressed upstream cache value");
}
}
}
- if (!Payload)
+ if (!Value)
{
- ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, ToString(Request.AcceptContentType()));
+ ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, ToString(Request.AcceptContentType()));
m_CacheStats.MissCount++;
return Request.WriteResponse(HttpResponseCode::NotFound);
}
@@ -636,9 +659,9 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
Ref.BucketSegment,
Ref.HashKey,
- Ref.PayloadId,
- NiceBytes(Payload.Size()),
- ToString(Payload.GetContentType()),
+ Ref.ValueContentId,
+ NiceBytes(Value.Size()),
+ ToString(Value.GetContentType()),
InUpstreamCache ? "UPSTREAM" : "LOCAL");
m_CacheStats.HitCount++;
@@ -647,21 +670,21 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
m_CacheStats.UpstreamHitCount++;
}
- if ((Policy & CachePolicy::SkipData) == CachePolicy::SkipData)
+ if (EnumHasAllFlags(Policy, CachePolicy::SkipData))
{
Request.WriteResponse(HttpResponseCode::OK);
}
else
{
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload);
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value);
}
}
void
-HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
+HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
{
- // Note: Individual cache payloads are not propagated upstream until a valid cache record has been stored
- ZEN_UNUSED(Policy);
+ // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored
+ ZEN_UNUSED(PolicyFromURL);
IoBuffer Body = Request.ReadPayload();
@@ -679,9 +702,11 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv);
}
- if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId)
+ if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId)
{
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Payload ID does not match attachment hash"sv);
+ return Request.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "ValueContentId does not match attachment hash"sv);
}
CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed);
@@ -689,7 +714,7 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques
ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})",
Ref.BucketSegment,
Ref.HashKey,
- Ref.PayloadId,
+ Ref.ValueContentId,
NiceBytes(Body.Size()),
ToString(Body.GetContentType()),
Result.New ? "NEW" : "OLD");
@@ -718,22 +743,22 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
}
std::string_view HashSegment;
- std::string_view PayloadSegment;
+ std::string_view ValueSegment;
- std::string_view::size_type PayloadSplitOffset = Key.find_last_of('/');
+ std::string_view::size_type ValueSplitOffset = Key.find_last_of('/');
// We know there is a slash so no need to check for npos return
- if (PayloadSplitOffset == BucketSplitOffset)
+ if (ValueSplitOffset == BucketSplitOffset)
{
// Basic cache record lookup
HashSegment = Key.substr(BucketSplitOffset + 1);
}
else
{
- // Cache record + payload lookup
- HashSegment = Key.substr(BucketSplitOffset + 1, PayloadSplitOffset - BucketSplitOffset - 1);
- PayloadSegment = Key.substr(PayloadSplitOffset + 1);
+ // Cache record + valueid lookup
+ HashSegment = Key.substr(BucketSplitOffset + 1, ValueSplitOffset - BucketSplitOffset - 1);
+ ValueSegment = Key.substr(ValueSplitOffset + 1);
}
if (HashSegment.size() != IoHash::StringLength)
@@ -741,9 +766,9 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
return false;
}
- if (!PayloadSegment.empty() && PayloadSegment.size() == IoHash::StringLength)
+ if (!ValueSegment.empty() && ValueSegment.size() == IoHash::StringLength)
{
- const bool IsOk = ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash);
+ const bool IsOk = ParseHexBytes(ValueSegment.data(), ValueSegment.size(), OutRef.ValueContentId.Hash);
if (!IsOk)
{
@@ -752,7 +777,7 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
}
else
{
- OutRef.PayloadId = IoHash::Zero;
+ OutRef.ValueContentId = IoHash::Zero;
}
const bool IsOk = ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash);
@@ -775,27 +800,44 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
const HttpContentType ContentType = Request.RequestContentType();
const HttpContentType AcceptType = Request.AcceptContentType();
- if (ContentType != HttpContentType::kCbObject || AcceptType != HttpContentType::kCbPackage)
+ if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) ||
+ AcceptType != HttpContentType::kCbPackage)
{
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
- Request.WriteResponseAsync(
- [this, RpcRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) {
- const std::string_view Method = RpcRequest["Method"sv].AsString();
- if (Method == "GetCacheRecords"sv)
- {
- HandleRpcGetCacheRecords(AsyncRequest, RpcRequest);
- }
- else if (Method == "GetCachePayloads"sv)
- {
- HandleRpcGetCachePayloads(AsyncRequest, RpcRequest);
- }
- else
- {
- AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
- }
- });
+ Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable {
+ CbPackage Package;
+ CbObjectView Object;
+ CbObject ObjectBuffer;
+ if (ContentType == HttpContentType::kCbObject)
+ {
+ ObjectBuffer = zen::LoadCompactBinaryObject(std::move(Body));
+ Object = ObjectBuffer;
+ }
+ else
+ {
+ Package = ParsePackageMessage(Body);
+ Object = Package.GetObject();
+ }
+ const std::string_view Method = Object["Method"sv].AsString();
+ if (Method == "PutCacheRecords"sv)
+ {
+ HandleRpcPutCacheRecords(AsyncRequest, Package);
+ }
+ else if (Method == "GetCacheRecords"sv)
+ {
+ HandleRpcGetCacheRecords(AsyncRequest, Object);
+ }
+ else if (Method == "GetCacheValues"sv)
+ {
+ HandleRpcGetCacheValues(AsyncRequest, Object);
+ }
+ else
+ {
+ AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ });
}
break;
default:
@@ -805,25 +847,154 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
}
void
+HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
+ CbObjectView BatchObject = BatchRequest.GetObject();
+
+ CbObjectView Params = BatchObject["Params"sv].AsObjectView();
+ CachePolicy DefaultPolicy;
+
+ ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
+
+ std::string_view PolicyText = Params["DefaultPolicy"].AsString();
+ DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::vector<bool> Results;
+ for (CbFieldView RequestField : Params["Requests"sv])
+ {
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView();
+ CbObjectView KeyView = RecordObject["Key"sv].AsObjectView();
+ CbFieldView BucketField = KeyView["Bucket"sv];
+ CbFieldView HashField = KeyView["Hash"sv];
+ CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash());
+ if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ CacheRecordPolicy Policy = CacheRecordPolicy::Load(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
+ PutRequestData PutRequest{std::move(Key), RecordObject, std::move(Policy)};
+
+ PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
+
+ if (Result == PutResult::Invalid)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ Results.push_back(Result == PutResult::Success);
+ }
+ if (Results.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ CbObjectWriter ResponseObject;
+ ResponseObject.BeginArray("Result"sv);
+ for (bool Value : Results)
+ {
+ ResponseObject.AddBool(Value);
+ }
+ ResponseObject.EndArray();
+
+ CbPackage RpcResponse;
+ RpcResponse.SetObject(ResponseObject.Save());
+
+ BinaryWriter MemStream;
+ RpcResponse.Save(MemStream);
+
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+}
+
+HttpStructuredCacheService::PutResult
+HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
+{
+ std::vector<IoHash> ValidAttachments;
+ AttachmentCount Count;
+ CbObjectView Record = Request.RecordObject;
+ uint64_t RecordObjectSize = Record.GetSize();
+ uint64_t TransferredSize = RecordObjectSize;
+
+ Request.RecordObject.IterateAttachments([this, &Request, Package, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
+ {
+ if (Attachment->IsCompressedBinary())
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+
+ ValidAttachments.emplace_back(InsertResult.DecompressedId);
+
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ Count.Valid++;
+ TransferredSize += Chunk.GetCompressedSize();
+ }
+ else
+ {
+ ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ToString(HttpContentType::kCbPackage),
+ ValueHash);
+ Count.Invalid++;
+ }
+ }
+ else if (m_CidStore.ContainsChunk(ValueHash))
+ {
+ ValidAttachments.emplace_back(ValueHash);
+ Count.Valid++;
+ }
+ Count.Total++;
+ });
+
+ if (Count.Invalid > 0)
+ {
+ return PutResult::Invalid;
+ }
+
+ ZEN_DEBUG("PUT - '{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)",
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ NiceBytes(TransferredSize),
+ Count.New,
+ Count.Valid,
+ Count.Total);
+
+ ZenCacheValue CacheValue;
+ CacheValue.Value = IoBuffer(Record.GetSize());
+ Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
+ CacheValue.Value.SetContentType(ZenContentType::kCbObject);
+ m_CacheStore.Put(Request.Key.Bucket, Request.Key.Hash, CacheValue);
+
+ const bool IsPartialRecord = Count.Valid != Count.Total;
+
+ if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
+ {
+ m_UpstreamCache.EnqueueUpstream(
+ {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)});
+ }
+ return PutResult::Success;
+}
+
+void
HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
{
ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
CbPackage RpcResponse;
- CacheRecordPolicy Policy;
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ CacheRecordPolicy BatchPolicy = CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView());
std::vector<CacheKey> CacheKeys;
std::vector<IoBuffer> CacheValues;
std::vector<size_t> UpstreamRequests;
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
- CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy);
-
- const bool PartialOnError = Policy.HasRecordPolicy(CachePolicy::PartialOnError);
- const bool SkipAttachments = Policy.HasRecordPolicy(CachePolicy::SkipAttachments);
- const bool QueryRemote = Policy.HasRecordPolicy(CachePolicy::QueryRemote);
-
for (CbFieldView KeyView : Params["CacheKeys"sv])
{
CbObjectView KeyObject = KeyView.AsObjectView();
@@ -840,54 +1011,84 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys)
{
ZenCacheValue CacheValue;
- uint32_t MissingCount = 0;
+ uint32_t MissingCount = 0;
+ uint32_t MissingReadFromUpstreamCount = 0;
- if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
+ if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
{
CbObjectView CacheRecord(CacheValue.Value.Data());
- CacheRecord.IterateAttachments([this, SkipAttachments, &MissingCount, &RpcResponse](CbFieldView AttachmentHash) {
- if (SkipAttachments && MissingCount == 0)
- {
- if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash()))
+ CacheRecord.IterateAttachments(
+ [this, &MissingCount, &MissingReadFromUpstreamCount, &RpcResponse, &BatchPolicy](CbFieldView AttachmentHash) {
+ CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy();
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal))
{
- MissingCount++;
+ // A value that is requested without the Query flag (such as None/Disable) does not count as missing, because we
+ // didn't ask for it and thus the record is complete in its absence.
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ MissingReadFromUpstreamCount++;
+ MissingCount++;
+ }
}
- }
- else
- {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
{
- ZEN_ASSERT(Chunk.GetSize() > 0);
- RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash()))
+ {
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ MissingReadFromUpstreamCount++;
+ }
+ MissingCount++;
+ }
}
else
{
- MissingCount++;
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ ZEN_ASSERT(Chunk.GetSize() > 0);
+ RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ }
+ else
+ {
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ MissingReadFromUpstreamCount++;
+ }
+ MissingCount++;
+ }
}
- }
- });
+ });
}
- if (CacheValue.Value && (MissingCount == 0 || PartialOnError))
+ if ((!CacheValue.Value && EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryRemote)) ||
+ MissingReadFromUpstreamCount != 0)
+ {
+ UpstreamRequests.push_back(KeyIndex);
+ }
+ else if (CacheValue.Value && (MissingCount == 0 || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)))
{
ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL) {}",
Key.Bucket,
Key.Hash,
NiceBytes(CacheValue.Value.Size()),
ToString(CacheValue.Value.GetContentType()),
- MissingCount ? "(PARTIAl)" : ""sv);
+ MissingCount ? "(PARTIAL)" : ""sv);
CacheValues[KeyIndex] = std::move(CacheValue.Value);
m_CacheStats.HitCount++;
}
- else if (QueryRemote)
- {
- UpstreamRequests.push_back(KeyIndex);
- }
else
{
- ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAl)"sv : ""sv);
- m_CacheStats.MissCount++;
+ if (!EnumHasAnyFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::Query))
+ {
+ // If they requested no query, do not record this as a miss
+ ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash);
+ }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAL)"sv : ""sv);
+ m_CacheStats.MissCount++;
+ }
}
++KeyIndex;
@@ -895,82 +1096,95 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
if (!UpstreamRequests.empty())
{
- const auto OnCacheRecordGetComplete =
- [this, &CacheValues, &RpcResponse, PartialOnError, SkipAttachments](CacheRecordGetCompleteParams&& Params) {
- ZEN_ASSERT(Params.KeyIndex < CacheValues.size());
+ const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, &BatchPolicy](CacheRecordGetCompleteParams&& Params) {
+ ZEN_ASSERT(Params.KeyIndex < CacheValues.size());
- IoBuffer CacheValue;
- AttachmentCount Count;
+ IoBuffer CacheValue;
+ AttachmentCount Count;
- if (Params.Record)
- {
- Params.Record.IterateAttachments([this, &RpcResponse, SkipAttachments, &Params, &Count](CbFieldView HashView) {
+ if (Params.Record)
+ {
+ Params.Record.IterateAttachments([this, &RpcResponse, &Params, &Count, &BatchPolicy](CbFieldView HashView) {
+ CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy();
+ bool FoundInUpstream = false;
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash()))
{
+ FoundInUpstream = true;
if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
{
- auto InsertResult = m_CidStore.AddChunk(Compressed);
- if (InsertResult.New)
+ FoundInUpstream = true;
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
{
- Count.New++;
+ auto InsertResult = m_CidStore.AddChunk(Compressed);
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
}
Count.Valid++;
- if (!SkipAttachments)
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
{
RpcResponse.AddAttachment(CbAttachment(Compressed));
}
}
else
{
- ZEN_DEBUG("Uncompressed payload '{}' from upstream cache record '{}/{}'",
+ ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'",
HashView.AsHash(),
Params.Key.Bucket,
Params.Key.Hash);
Count.Invalid++;
}
}
- else if (m_CidStore.ContainsChunk(HashView.AsHash()))
- {
- Count.Valid++;
- }
- Count.Total++;
- });
-
- if ((Count.Valid == Count.Total) || PartialOnError)
+ }
+ if (!FoundInUpstream && EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal) &&
+ m_CidStore.ContainsChunk(HashView.AsHash()))
{
- CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer();
+ // We added the attachment for this Value in the local loop before calling m_UpstreamCache
+ Count.Valid++;
}
- }
+ Count.Total++;
+ });
- if (CacheValue)
+ if ((Count.Valid == Count.Total) || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))
{
- ZEN_DEBUG("HIT - '{}/{}' {} '{}' attachments '{}/{}/{}' (new/valid/total) (UPSTREAM)",
- Params.Key.Bucket,
- Params.Key.Hash,
- NiceBytes(CacheValue.GetSize()),
- ToString(HttpContentType::kCbPackage),
- Count.New,
- Count.Valid,
- Count.Total);
-
- CacheValue.SetContentType(ZenContentType::kCbObject);
-
- CacheValues[Params.KeyIndex] = CacheValue;
- m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue});
-
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount++;
+ CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer();
}
- else
+ }
+
+ if (CacheValue)
+ {
+ ZEN_DEBUG("HIT - '{}/{}' {} '{}' attachments '{}/{}/{}' (new/valid/total) (UPSTREAM)",
+ Params.Key.Bucket,
+ Params.Key.Hash,
+ NiceBytes(CacheValue.GetSize()),
+ ToString(HttpContentType::kCbPackage),
+ Count.New,
+ Count.Valid,
+ Count.Total);
+
+ CacheValue.SetContentType(ZenContentType::kCbObject);
+ CacheValues[Params.KeyIndex] = CacheValue;
+ if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::StoreLocal))
{
- const bool IsPartial = Count.Valid != Count.Total;
- ZEN_DEBUG("MISS - '{}/{}' {}", Params.Key.Bucket, Params.Key.Hash, IsPartial ? "(partial)"sv : ""sv);
- m_CacheStats.MissCount++;
+ m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue});
}
- };
- m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete));
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount++;
+ }
+ else
+ {
+ const bool IsPartial = Count.Valid != Count.Total;
+ ZEN_DEBUG("MISS - '{}/{}' {}", Params.Key.Bucket, Params.Key.Hash, IsPartial ? "(partial)"sv : ""sv);
+ m_CacheStats.MissCount++;
+ }
+ };
+
+ m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, BatchPolicy, std::move(OnCacheRecordGetComplete));
}
CbObjectWriter ResponseObject;
@@ -1001,11 +1215,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
}
void
-HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
+HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
{
- ZEN_TRACE_CPU("Z$::RpcGetCachePayloads");
+ ZEN_TRACE_CPU("Z$::RpcGetCacheValues");
- ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCachePayloads"sv);
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
std::vector<CacheChunkRequest> ChunkRequests;
std::vector<size_t> UpstreamRequests;
@@ -1014,19 +1228,20 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
for (CbFieldView RequestView : Params["ChunkRequests"sv])
{
- CbObjectView RequestObject = RequestView.AsObjectView();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
- const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash());
- const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash();
- const Oid PayloadId = RequestObject["PayloadId"sv].AsObjectId();
- const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64();
- const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
- const uint32_t ChunkPolicy = RequestObject["Policy"sv].AsUInt32();
+ CbObjectView RequestObject = RequestView.AsObjectView();
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+ const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash());
+ const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash();
+ const Oid ValueId = RequestObject["ValueId"sv].AsObjectId();
+ const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64();
+ const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
+ std::string_view PolicyText = RequestObject["Policy"sv].AsString();
+ const CachePolicy ChunkPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
// Note we could use emplace_back here but [Apple] LLVM-12's C++ library
// can't infer a constructor like other platforms (or can't handle an
// initializer list like others do).
- ChunkRequests.push_back({Key, ChunkId, PayloadId, RawOffset, RawSize, static_cast<CachePolicy>(ChunkPolicy)});
+ ChunkRequests.push_back({Key, ChunkId, ValueId, RawOffset, RawSize, ChunkPolicy});
}
if (ChunkRequests.empty())
@@ -1036,20 +1251,20 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
Chunks.resize(ChunkRequests.size());
- // Unreal uses a 12 byte ID to address cache record payloads. When the uncompressed hash (ChunkId)
- // is missing, load the cache record and try to find the raw hash from the payload ID.
+ // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
+ // is missing, load the cache record and try to find the raw hash from the ValueId.
{
- const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash {
- if (PayloadId)
+ const auto GetChunkIdFromValueId = [](CbObjectView Record, const Oid& ValueId) -> IoHash {
+ if (ValueId)
{
- // A valid PayloadId indicates that the caller is searching for a Payload in a Record
+ // A valid ValueId indicates that the caller is searching for a Value in a Record
// that was Put with ICacheStore::Put
for (CbFieldView ValueView : Record["Values"sv])
{
CbObjectView ValueObject = ValueView.AsObjectView();
const Oid Id = ValueObject["Id"sv].AsObjectId();
- if (Id == PayloadId)
+ if (Id == ValueId)
{
return ValueObject["RawHash"sv].AsHash();
}
@@ -1059,7 +1274,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
if (CbObjectView ValueObject = Record["Value"sv].AsObjectView())
{
const Oid Id = ValueObject["Id"sv].AsObjectId();
- if (Id == PayloadId)
+ if (Id == ValueId)
{
return ValueObject["RawHash"sv].AsHash();
}
@@ -1070,7 +1285,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
CbObjectView AttachmentObject = AttachmentView.AsObjectView();
const Oid Id = AttachmentObject["Id"sv].AsObjectId();
- if (Id == PayloadId)
+ if (Id == ValueId)
{
return AttachmentObject["RawHash"sv].AsHash();
}
@@ -1079,7 +1294,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
}
else
{
- // An invalid PayloadId indicates that the caller is requesting a Value that
+ // An invalid ValueId indicates that the caller is requesting a Value that
// was Put with ICacheStore::PutValue
return Record["RawHash"sv].AsHash();
}
@@ -1108,15 +1323,15 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
if (CurrentRecordBuffer)
{
- ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId);
+ ChunkRequest.ChunkId = GetChunkIdFromValueId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.ValueId);
}
}
}
for (size_t RequestIndex = 0; const CacheChunkRequest& ChunkRequest : ChunkRequests)
{
- const bool QueryLocal = (ChunkRequest.Policy & CachePolicy::QueryLocal) == CachePolicy::QueryLocal;
- const bool QueryRemote = (ChunkRequest.Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote;
+ const bool QueryLocal = EnumHasAllFlags(ChunkRequest.Policy, CachePolicy::QueryLocal);
+ const bool QueryRemote = EnumHasAllFlags(ChunkRequest.Policy, CachePolicy::QueryRemote);
if (QueryLocal)
{
@@ -1155,8 +1370,8 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
if (!UpstreamRequests.empty())
{
- const auto OnCachePayloadGetComplete = [this, &Chunks](CachePayloadGetCompleteParams&& Params) {
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload)))
+ const auto OnCacheValueGetComplete = [this, &Chunks](CacheValueGetCompleteParams&& Params) {
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)))
{
m_CidStore.AddChunk(Compressed);
@@ -1164,11 +1379,11 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
Params.Request.Key.Bucket,
Params.Request.Key.Hash,
Params.Request.ChunkId,
- NiceBytes(Params.Payload.GetSize()),
+ NiceBytes(Params.Value.GetSize()),
"UPSTREAM");
ZEN_ASSERT(Params.RequestIndex < Chunks.size());
- Chunks[Params.RequestIndex] = std::move(Params.Payload);
+ Chunks[Params.RequestIndex] = std::move(Params.Value);
m_CacheStats.HitCount++;
m_CacheStats.UpstreamHitCount++;
@@ -1180,7 +1395,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
}
};
- m_UpstreamCache.GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete));
+ m_UpstreamCache.GetCacheValues(ChunkRequests, UpstreamRequests, std::move(OnCacheValueGetComplete));
}
CbPackage RpcResponse;
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 1bf3940e7..88bf6cda1 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -19,6 +19,7 @@ namespace zen {
class CasStore;
class CidStore;
class CbObjectView;
+struct PutRequestData;
class ScrubContext;
class UpstreamCache;
class ZenCacheStore;
@@ -41,7 +42,7 @@ enum class CachePolicy : uint32_t;
*
* Additionally, attachments may be addressed as:
*
- * {BucketId}/{KeyHash}/{PayloadHash}
+ * {BucketId}/{KeyHash}/{ValueHash}
*
* Where the two initial components are the same as for the main endpoint
*
@@ -73,7 +74,7 @@ private:
{
std::string BucketSegment;
IoHash HashKey;
- IoHash PayloadId;
+ IoHash ValueContentId;
};
struct CacheStats
@@ -82,20 +83,28 @@ private:
std::atomic_uint64_t UpstreamHitCount{};
std::atomic_uint64_t MissCount{};
};
+ enum class PutResult
+ {
+ Success,
+ Fail,
+ Invalid,
+ };
[[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef);
- void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
- void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
- void HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
- void HandleCachePayloadRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
- void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
- void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
+ void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
+ void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
+ void HandleCacheValueRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
+ void HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
+ void HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
void HandleRpcRequest(zen::HttpServerRequest& Request);
+ void HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest);
void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
- void HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
+ void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;
+ PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 2bba2e8e6..de5bccc3a 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -614,6 +614,10 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
{
EntryFlags |= DiskLocation::kStructured;
}
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ EntryFlags |= DiskLocation::kCompressed;
+ }
RwLock::ExclusiveLockScope _(m_IndexLock);
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index 8b015d0fb..4f162c0ba 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -84,11 +84,12 @@ struct DiskLocation
}
static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
- static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull;
+ static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize)
static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
- static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull;
- static const uint64_t kStructured = 0x4000'0000'0000'0000ull;
- static const uint64_t kTombStone = 0x2000'0000'0000'0000ull;
+ static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file
+ static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary
+ static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value
+ static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format
static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
@@ -103,6 +104,11 @@ struct DiskLocation
{
ContentType = ZenContentType::kCbObject;
}
+
+ if (IsFlagSet(DiskLocation::kCompressed))
+ {
+ ContentType = ZenContentType::kCompressedBinary;
+ }
return ContentType;
}
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index a4439d914..6fd1c3bea 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -91,7 +91,15 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
const char* DefaultHttp = "asio";
#endif
- std::string DataDir;
+ // Note to those adding future options; std::filesystem::path-type options
+ // must be read into a std::string first. As of cxxopts-3.0.0 it uses a >>
+ // stream operator to convert argv value into the options type. std::fs::path
+ // expects paths in streams to be quoted but argv paths are unquoted. By
+ // going into a std::string first, paths with whitespace parse correctly.
+ std::string DataDir;
+ std::string ContentDir;
+ std::string AbsLogFile;
+ std::string ConfigFile;
cxxopts::Options options("zenserver", "Zen Server");
options.add_options()("dedicated",
@@ -102,9 +110,9 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
options.add_options()("t, test", "Enable test mode", cxxopts::value<bool>(ServerOptions.IsTest)->default_value("false"));
options.add_options()("log-id", "Specify id for adding context to log output", cxxopts::value<std::string>(ServerOptions.LogId));
options.add_options()("data-dir", "Specify persistence root", cxxopts::value<std::string>(DataDir));
- options.add_options()("content-dir", "Frontend content directory", cxxopts::value<std::filesystem::path>(ServerOptions.ContentDir));
- options.add_options()("abslog", "Path to log file", cxxopts::value<std::filesystem::path>(ServerOptions.AbsLogFile));
- options.add_options()("config", "Path to Lua config file", cxxopts::value<std::filesystem::path>(ServerOptions.ConfigFile));
+ options.add_options()("content-dir", "Frontend content directory", cxxopts::value<std::string>(ContentDir));
+ options.add_options()("abslog", "Path to log file", cxxopts::value<std::string>(AbsLogFile));
+ options.add_options()("config", "Path to Lua config file", cxxopts::value<std::string>(ConfigFile));
options.add_options()("no-sentry",
"Disable Sentry crash handler",
cxxopts::value<bool>(ServerOptions.NoSentry)->default_value("false"));
@@ -330,6 +338,10 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
exit(0);
}
+ ServerOptions.DataDir = DataDir;
+ ServerOptions.ContentDir = ContentDir;
+ ServerOptions.AbsLogFile = AbsLogFile;
+ ServerOptions.ConfigFile = ConfigFile;
ServerOptions.UpstreamCacheConfig.CachePolicy = ParseUpstreamCachePolicy(UpstreamCachePolicyOptions);
if (!ServerOptions.ConfigFile.empty())
@@ -468,6 +480,9 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio
if (auto JupiterConfig = UpstreamConfig->get<sol::optional<sol::table>>("jupiter"))
{
UpdateStringValueFromConfig(JupiterConfig.value(),
+ std::string_view("name"),
+ ServerOptions.UpstreamCacheConfig.JupiterConfig.Name);
+ UpdateStringValueFromConfig(JupiterConfig.value(),
std::string_view("url"),
ServerOptions.UpstreamCacheConfig.JupiterConfig.Url);
UpdateStringValueFromConfig(JupiterConfig.value(),
@@ -496,6 +511,8 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio
if (auto ZenConfig = UpstreamConfig->get<sol::optional<sol::table>>("zen"))
{
+ ServerOptions.UpstreamCacheConfig.ZenConfig.Name = ZenConfig.value().get_or("name", std::string("Zen"));
+
if (auto Url = ZenConfig.value().get<sol::optional<std::string>>("url"))
{
ServerOptions.UpstreamCacheConfig.ZenConfig.Urls.push_back(Url.value());
diff --git a/zenserver/config.h b/zenserver/config.h
index 2c31e7bd9..e38b5c704 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -21,6 +21,7 @@
struct ZenUpstreamJupiterConfig
{
+ std::string Name;
std::string Url;
std::string OAuthProvider;
std::string OAuthClientId;
@@ -34,6 +35,7 @@ struct ZenUpstreamJupiterConfig
struct ZenUpstreamZenConfig
{
+ std::string Name;
std::vector<std::string> Urls;
std::vector<std::string> Dns;
};
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 68e7edfab..206787bf7 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -272,14 +272,14 @@ namespace detail {
return Result;
}
- virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override
+ virtual GetUpstreamCacheResult GetCacheValue(const CacheKey&, const IoHash& ValueContentId) override
{
- ZEN_TRACE_CPU("Upstream::Horde::GetSingleCachePayload");
+ ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue");
try
{
CloudCacheSession Session(m_Client);
- const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId);
+ const CloudCacheResult Result = Session.GetCompressedBlob(ValueContentId);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -303,11 +303,11 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCachePayloadGetComplete&& OnComplete) override final
+ virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCacheValueGetComplete&& OnComplete) override final
{
- ZEN_TRACE_CPU("Upstream::Horde::GetCachePayloads");
+ ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues");
CloudCacheSession Session(m_Client);
GetUpstreamCacheResult Result;
@@ -327,7 +327,7 @@ namespace detail {
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
}
- OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload});
+ OnComplete({.Request = Request, .RequestIndex = Index, .Value = Payload});
}
return Result;
@@ -335,11 +335,11 @@ namespace detail {
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
- std::span<IoBuffer const> Payloads) override
+ std::span<IoBuffer const> Values) override
{
ZEN_TRACE_CPU("Upstream::Horde::PutCacheRecord");
- ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
+ ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size());
const int32_t MaxAttempts = 3;
try
@@ -373,30 +373,31 @@ namespace detail {
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- const auto PutBlobs = [&](std::span<IoHash> PayloadIds, std::string& OutReason) -> bool {
- for (const IoHash& PayloadId : PayloadIds)
+ const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
+ for (const IoHash& ValueContentId : ValueContentIds)
{
- const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId);
+ const auto It =
+ std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId);
- if (It == std::end(CacheRecord.PayloadIds))
+ if (It == std::end(CacheRecord.ValueContentIds))
{
- OutReason = fmt::format("payload '{}' MISSING from local cache", PayloadId);
+ OutReason = fmt::format("value '{}' MISSING from local cache", ValueContentId);
return false;
}
- const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It);
+ const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It);
CloudCacheResult BlobResult;
for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
{
- BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ BlobResult = Session.PutCompressedBlob(CacheRecord.ValueContentIds[Idx], Values[Idx]);
}
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
if (!BlobResult.Success)
{
- OutReason = fmt::format("upload payload '{}' FAILED, reason '{}'", PayloadId, BlobResult.Reason);
+ OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason);
return false;
}
@@ -475,7 +476,7 @@ namespace detail {
Sb << MissingHash.ToHexString() << ",";
}
- return {.Reason = fmt::format("finalize '{}/{}' FAILED, still needs payload(s) '{}'",
+ return {.Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'",
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
Sb.ToString()),
@@ -538,10 +539,12 @@ namespace detail {
public:
ZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options)
: m_Log(zen::logging::Get("upstream"))
- , m_Info({.Name = std::string(Options.Name)})
, m_ConnectTimeout(Options.ConnectTimeout)
, m_Timeout(Options.Timeout)
{
+ ZEN_ASSERT(!Options.Name.empty());
+ m_Info.Name = Options.Name;
+
for (const auto& Url : Options.Urls)
{
m_Endpoints.push_back({.Url = Url});
@@ -649,9 +652,8 @@ namespace detail {
}
BatchRequest.EndArray();
- BatchRequest.BeginObject("Policy"sv);
- CacheRecordPolicy::Save(Policy, BatchRequest);
- BatchRequest.EndObject();
+ BatchRequest.SetName("Policy"sv);
+ Policy.Save(BatchRequest);
}
BatchRequest.EndObject();
@@ -687,14 +689,14 @@ namespace detail {
return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
}
- virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override
+ virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override
{
- ZEN_TRACE_CPU("Upstream::Zen::GetSingleCachePayload");
+ ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue");
try
{
ZenStructuredCacheSession Session(*m_Client);
- const ZenCacheResult Result = Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId);
+ const ZenCacheResult Result = Session.GetCacheValue(CacheKey.Bucket, CacheKey.Hash, ValueContentId);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -718,18 +720,18 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCachePayloadGetComplete&& OnComplete) override final
+ virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCacheValueGetComplete&& OnComplete) override final
{
- ZEN_TRACE_CPU("Upstream::Zen::GetCachePayloads");
+ ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues");
std::vector<size_t> IndexMap;
IndexMap.reserve(RequestIndex.size());
CbObjectWriter BatchRequest;
BatchRequest << "Method"sv
- << "GetCachePayloads";
+ << "GetCacheValues";
BatchRequest.BeginObject("Params"sv);
{
@@ -746,12 +748,11 @@ namespace detail {
BatchRequest << "Bucket"sv << Request.Key.Bucket;
BatchRequest << "Hash"sv << Request.Key.Hash;
BatchRequest.EndObject();
-
- BatchRequest.AddObjectId("PayloadId"sv, Request.PayloadId);
+ BatchRequest.AddObjectId("ValueId"sv, Request.ValueId);
BatchRequest << "ChunkId"sv << Request.ChunkId;
BatchRequest << "RawOffset"sv << Request.RawOffset;
BatchRequest << "RawSize"sv << Request.RawSize;
- BatchRequest << "Policy"sv << static_cast<uint32_t>(Request.Policy);
+ BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView();
}
BatchRequest.EndObject();
}
@@ -787,7 +788,7 @@ namespace detail {
}
}
- OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)});
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = std::move(Payload)});
}
return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
@@ -796,7 +797,7 @@ namespace detail {
for (size_t Index : RequestIndex)
{
- OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()});
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()});
}
return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
@@ -804,11 +805,11 @@ namespace detail {
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
- std::span<IoBuffer const> Payloads) override
+ std::span<IoBuffer const> Values) override
{
ZEN_TRACE_CPU("Upstream::Zen::PutCacheRecord");
- ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
+ ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size());
const int32_t MaxAttempts = 3;
try
@@ -823,15 +824,15 @@ namespace detail {
CbPackage Package;
Package.SetObject(CbObject(SharedBuffer(RecordValue)));
- for (const IoBuffer& Payload : Payloads)
+ for (const IoBuffer& Value : Values)
{
- if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload)))
+ if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Value)))
{
Package.AddAttachment(CbAttachment(AttachmentBuffer));
}
else
{
- return {.Reason = std::string("invalid payload buffer"), .Success = false};
+ return {.Reason = std::string("invalid value buffer"), .Success = false};
}
}
@@ -851,15 +852,15 @@ namespace detail {
}
else
{
- for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
+ for (size_t Idx = 0, Count = Values.size(); Idx < Count; Idx++)
{
Result.Success = false;
for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- Result = Session.PutCachePayload(CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- CacheRecord.PayloadIds[Idx],
- Payloads[Idx]);
+ Result = Session.PutCacheValue(CacheRecord.Key.Bucket,
+ CacheRecord.Key.Hash,
+ CacheRecord.ValueContentIds[Idx],
+ Values[Idx]);
}
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -869,7 +870,7 @@ namespace detail {
if (!Result.Success)
{
- return {.Reason = "Failed to upload payload",
+ return {.Reason = "Failed to upload value",
.Bytes = TotalBytes,
.ElapsedSeconds = TotalElapsedSeconds,
.Success = false};
@@ -1049,7 +1050,7 @@ public:
virtual void GetCacheRecords(std::span<CacheKey> CacheKeys,
std::span<size_t> KeyIndex,
- const CacheRecordPolicy& Policy,
+ const CacheRecordPolicy& DownstreamPolicy,
OnCacheRecordGetComplete&& OnComplete) override final
{
ZEN_TRACE_CPU("Upstream::GetCacheRecords");
@@ -1060,6 +1061,8 @@ public:
if (m_Options.ReadUpstream)
{
+ CacheRecordPolicy UpstreamPolicy = DownstreamPolicy.ConvertToUpstream();
+
for (auto& Endpoint : m_Endpoints)
{
if (RemainingKeys.empty())
@@ -1078,18 +1081,19 @@ public:
{
metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
- Result = Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) {
- if (Params.Record)
- {
- OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
+ Result =
+ Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, UpstreamPolicy, [&](CacheRecordGetCompleteParams&& Params) {
+ if (Params.Record)
+ {
+ OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
- Stats.CacheHitCount.Increment(1);
- }
- else
- {
- Missing.push_back(Params.KeyIndex);
- }
- });
+ Stats.CacheHitCount.Increment(1);
+ }
+ else
+ {
+ Missing.push_back(Params.KeyIndex);
+ }
+ });
}
Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size()));
@@ -1115,11 +1119,11 @@ public:
}
}
- virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCachePayloadGetComplete&& OnComplete) override final
+ virtual void GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCacheValueGetComplete&& OnComplete) override final
{
- ZEN_TRACE_CPU("Upstream::GetCachePayloads");
+ ZEN_TRACE_CPU("Upstream::GetCacheValues");
std::shared_lock<std::shared_mutex> _(m_EndpointsMutex);
@@ -1145,10 +1149,10 @@ public:
{
metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming);
- Result = Endpoint->GetCachePayloads(CacheChunkRequests, RemainingKeys, [&](CachePayloadGetCompleteParams&& Params) {
- if (Params.Payload)
+ Result = Endpoint->GetCacheValues(CacheChunkRequests, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) {
+ if (Params.Value)
{
- OnComplete(std::forward<CachePayloadGetCompleteParams>(Params));
+ OnComplete(std::forward<CacheValueGetCompleteParams>(Params));
Stats.CacheHitCount.Increment(1);
}
@@ -1166,7 +1170,7 @@ public:
{
Stats.CacheErrorCount.Increment(1);
- ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
+ ZEN_ERROR("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
Endpoint->GetEndpointInfo().Url,
Result.Error.Reason,
Result.Error.ErrorCode);
@@ -1178,13 +1182,13 @@ public:
for (size_t Index : RemainingKeys)
{
- OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()});
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()});
}
}
- virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override
+ virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override
{
- ZEN_TRACE_CPU("Upstream::GetCachePayload");
+ ZEN_TRACE_CPU("Upstream::GetCacheValue");
if (m_Options.ReadUpstream)
{
@@ -1200,7 +1204,7 @@ public:
{
metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
- Result = Endpoint->GetCachePayload(CacheKey, PayloadId);
+ Result = Endpoint->GetCacheValue(CacheKey, ValueContentId);
}
Stats.CacheGetCount.Increment(1);
@@ -1217,7 +1221,7 @@ public:
{
Stats.CacheErrorCount.Increment(1);
- ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'",
+ ZEN_ERROR("get cache value FAILED, endpoint '{}', reason '{}', error code '{}'",
Endpoint->GetEndpointInfo().Url,
Result.Error.Reason,
Result.Error.ErrorCode);
@@ -1302,18 +1306,18 @@ private:
return;
}
- for (const IoHash& PayloadId : CacheRecord.PayloadIds)
+ for (const IoHash& ValueContentId : CacheRecord.ValueContentIds)
{
- if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId))
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(ValueContentId))
{
Payloads.push_back(Payload);
}
else
{
- ZEN_WARN("process upstream FAILED, '{}/{}/{}', payload doesn't exist in CAS",
+ ZEN_WARN("process upstream FAILED, '{}/{}/{}', ValueContentId doesn't exist in CAS",
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
- PayloadId);
+ ValueContentId);
return;
}
}
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 805786cdf..5bc9f58d7 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -28,7 +28,7 @@ struct UpstreamCacheRecord
{
ZenContentType Type = ZenContentType::kBinary;
CacheKey Key;
- std::vector<IoHash> PayloadIds;
+ std::vector<IoHash> ValueContentIds;
};
struct UpstreamCacheOptions
@@ -74,14 +74,14 @@ struct CacheRecordGetCompleteParams
using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>;
-struct CachePayloadGetCompleteParams
+struct CacheValueGetCompleteParams
{
const CacheChunkRequest& Request;
size_t RequestIndex{~size_t(0)};
- IoBuffer Payload;
+ IoBuffer Value;
};
-using OnCachePayloadGetComplete = std::function<void(CachePayloadGetCompleteParams&&)>;
+using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>;
struct UpstreamEndpointStats
{
@@ -157,11 +157,11 @@ public:
const CacheRecordPolicy& Policy,
OnCacheRecordGetComplete&& OnComplete) = 0;
- virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0;
+ virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& PayloadId) = 0;
- virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCachePayloadGetComplete&& OnComplete) = 0;
+ virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCacheValueGetComplete&& OnComplete) = 0;
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
@@ -191,11 +191,11 @@ public:
const CacheRecordPolicy& RecordPolicy,
OnCacheRecordGetComplete&& OnComplete) = 0;
- virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0;
+ virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) = 0;
- virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
- std::span<size_t> RequestIndex,
- OnCachePayloadGetComplete&& OnComplete) = 0;
+ virtual void GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCacheValueGetComplete&& OnComplete) = 0;
virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index cd7f48334..a2666ac02 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -433,10 +433,10 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas
}
ZenCacheResult
-ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId)
+ZenStructuredCacheSession::GetCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId)
{
ExtendableStringBuilder<256> Uri;
- Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString();
+ Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString();
cpr::Session& Session = m_SessionState->GetSession();
@@ -486,10 +486,10 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas
}
ZenCacheResult
-ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload)
+ZenStructuredCacheSession::PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload)
{
ExtendableStringBuilder<256> Uri;
- Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString();
+ Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString();
cpr::Session& Session = m_SessionState->GetSession();
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index c2be2165a..8cc4c121d 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -123,9 +123,9 @@ public:
ZenCacheResult CheckHealth();
ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type);
- ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId);
+ ZenCacheResult GetCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId);
ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type);
- ZenCacheResult PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload);
+ ZenCacheResult PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload);
ZenCacheResult InvokeRpc(const CbObjectView& Request);
private:
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index e96a4ceaa..bd4a3c1cf 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -156,7 +156,7 @@ namespace utils {
class ZenServer : public IHttpStatusProvider
{
public:
- void Initialize(const ZenServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry)
+ int Initialize(const ZenServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry)
{
m_UseSentry = ServerOptions.NoSentry == false;
m_ServerEntry = ServerEntry;
@@ -201,8 +201,8 @@ public:
// Ok so now we're configured, let's kick things off
- m_Http = zen::CreateHttpServer(ServerOptions.HttpServerClass);
- m_Http->Initialize(ServerOptions.BasePort);
+ m_Http = zen::CreateHttpServer(ServerOptions.HttpServerClass);
+ int EffectiveBasePort = m_Http->Initialize(ServerOptions.BasePort);
m_AuthMgr = MakeAuthMgr({.RootDirectory = m_DataRoot / "auth"});
m_AuthService = std::make_unique<zen::HttpAuthService>(*m_AuthMgr);
@@ -267,7 +267,7 @@ public:
#if ZEN_ENABLE_MESH
if (ServerOptions.MeshEnabled)
{
- StartMesh(BasePort);
+ StartMesh(EffectiveBasePort);
}
else
{
@@ -314,6 +314,8 @@ public:
.Enabled = ServerOptions.GcConfig.Enabled,
};
m_GcScheduler.Initialize(GcConfig);
+
+ return EffectiveBasePort;
}
void InitializeState(const ZenServerOptions& ServerOptions);
@@ -684,7 +686,6 @@ void
ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
{
using namespace std::literals;
- auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; };
ZEN_INFO("instantiating structured cache service");
m_CacheStore = std::make_unique<ZenCacheStore>(m_CasGc, m_DataRoot / "cache");
@@ -728,8 +729,10 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
if (!ZenUrls.empty())
{
+ const auto ZenEndpointName = UpstreamConfig.ZenConfig.Name.empty() ? "Zen"sv : UpstreamConfig.ZenConfig.Name;
+
std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint =
- zen::MakeZenUpstreamEndpoint({.Name = "Zen"sv,
+ zen::MakeZenUpstreamEndpoint({.Name = ZenEndpointName,
.Urls = ZenUrls,
.ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
.Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)});
@@ -743,7 +746,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
if (UpstreamConfig.JupiterConfig.UseProductionSettings)
{
Options =
- zen::CloudCacheClientOptions{.Name = "Horde"sv,
+ zen::CloudCacheClientOptions{.Name = "Jupiter-Prod"sv,
.ServiceUrl = "https://jupiter.devtools.epicgames.com"sv,
.DdcNamespace = "ue.ddc"sv,
.BlobStoreNamespace = "ue.ddc"sv,
@@ -757,7 +760,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings)
{
Options =
- zen::CloudCacheClientOptions{.Name = "Horde"sv,
+ zen::CloudCacheClientOptions{.Name = "Jupiter-Dev"sv,
.ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv,
.DdcNamespace = "ue.ddc"sv,
.BlobStoreNamespace = "ue.ddc"sv,
@@ -768,14 +771,23 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
.Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds),
.UseLegacyDdc = false};
}
+ else
+ {
+ const auto JupiterEndpointName =
+ UpstreamConfig.JupiterConfig.Name.empty() ? "Jupiter"sv : UpstreamConfig.JupiterConfig.Name;
- Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl);
- Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace);
- Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace);
- Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider);
- Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId);
- Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret);
- Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc;
+ Options =
+ zen::CloudCacheClientOptions{.Name = JupiterEndpointName,
+ .ServiceUrl = UpstreamConfig.JupiterConfig.Url,
+ .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace,
+ .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace,
+ .OAuthProvider = UpstreamConfig.JupiterConfig.OAuthProvider,
+ .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId,
+ .OAuthSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds),
+ .UseLegacyDdc = false};
+ }
if (!Options.ServiceUrl.empty())
{
@@ -885,6 +897,20 @@ ZenEntryPoint::Run()
Entry->AddSponsorProcess(ServerOptions.OwnerPid);
}
+ ZenServer Server;
+ Server.SetDataRoot(ServerOptions.DataDir);
+ Server.SetContentRoot(ServerOptions.ContentDir);
+ Server.SetTestMode(ServerOptions.IsTest);
+ Server.SetDedicatedMode(ServerOptions.IsDedicated);
+ int EffectiveBasePort = Server.Initialize(ServerOptions, Entry);
+
+ Entry->EffectiveListenPort = uint16_t(EffectiveBasePort);
+ if (EffectiveBasePort != ServerOptions.BasePort)
+ {
+ ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort);
+ ServerOptions.BasePort = EffectiveBasePort;
+ }
+
std::unique_ptr<std::thread> ShutdownThread;
std::unique_ptr<zen::NamedEvent> ShutdownEvent;
@@ -892,13 +918,6 @@ ZenEntryPoint::Run()
ShutdownEventName << "Zen_" << ServerOptions.BasePort << "_Shutdown";
ShutdownEvent.reset(new zen::NamedEvent{ShutdownEventName});
- ZenServer Server;
- Server.SetDataRoot(ServerOptions.DataDir);
- Server.SetContentRoot(ServerOptions.ContentDir);
- Server.SetTestMode(ServerOptions.IsTest);
- Server.SetDedicatedMode(ServerOptions.IsDedicated);
- Server.Initialize(ServerOptions, Entry);
-
// Monitor shutdown signals
ShutdownThread.reset(new std::thread{[&] {
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp
index 0e93f1c3d..7be93b4af 100644
--- a/zenstore/gc.cpp
+++ b/zenstore/gc.cpp
@@ -404,7 +404,10 @@ GcScheduler::Shutdown()
m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped);
m_GcSignal.notify_one();
- m_GcThread.join();
+ if (m_GcThread.joinable())
+ {
+ m_GcThread.join();
+ }
}
}
diff --git a/zenutil/cache/cachepolicy.cpp b/zenutil/cache/cachepolicy.cpp
index e1c31d885..3bf7a0c67 100644
--- a/zenutil/cache/cachepolicy.cpp
+++ b/zenutil/cache/cachepolicy.cpp
@@ -4,64 +4,115 @@
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
+#include <zencore/enumflags.h>
#include <zencore/string.h>
+#include <algorithm>
+#include <unordered_map>
+
namespace zen {
using namespace std::literals;
-namespace detail { namespace cacheopt {
- constexpr std::string_view Local = "local"sv;
- constexpr std::string_view Remote = "remote"sv;
- constexpr std::string_view Data = "data"sv;
- constexpr std::string_view Meta = "meta"sv;
- constexpr std::string_view Value = "value"sv;
- constexpr std::string_view Attachments = "attachments"sv;
-}} // namespace detail::cacheopt
+namespace detail::CachePolicyImpl {
+ constexpr char DelimiterChar = ',';
+ constexpr std::string_view None = "None"sv;
+ constexpr std::string_view QueryLocal = "QueryLocal"sv;
+ constexpr std::string_view QueryRemote = "QueryRemote"sv;
+ constexpr std::string_view Query = "Query"sv;
+ constexpr std::string_view StoreLocal = "StoreLocal"sv;
+ constexpr std::string_view StoreRemote = "StoreRemote"sv;
+ constexpr std::string_view Store = "Store"sv;
+ constexpr std::string_view SkipMeta = "SkipMeta"sv;
+ constexpr std::string_view SkipData = "SkipData"sv;
+ constexpr std::string_view PartialRecord = "PartialRecord"sv;
+ constexpr std::string_view KeepAlive = "KeepAlive"sv;
+ constexpr std::string_view Local = "Local"sv;
+ constexpr std::string_view Remote = "Remote"sv;
+ constexpr std::string_view Default = "Default"sv;
+ constexpr std::string_view Disable = "Disable"sv;
-CachePolicy
-ParseQueryCachePolicy(std::string_view QueryPolicy, CachePolicy Default)
-{
- if (QueryPolicy.empty())
- {
- return Default;
- }
+ using TextToPolicyMap = std::unordered_map<std::string_view, CachePolicy>;
+ const TextToPolicyMap TextToPolicy = {{None, CachePolicy::None},
+ {QueryLocal, CachePolicy::QueryLocal},
+ {QueryRemote, CachePolicy::QueryRemote},
+ {Query, CachePolicy::Query},
+ {StoreLocal, CachePolicy::StoreLocal},
+ {StoreRemote, CachePolicy::StoreRemote},
+ {Store, CachePolicy::Store},
+ {SkipMeta, CachePolicy::SkipMeta},
+ {SkipData, CachePolicy::SkipData},
+ {PartialRecord, CachePolicy::PartialRecord},
+ {KeepAlive, CachePolicy::KeepAlive},
+ {Local, CachePolicy::Local},
+ {Remote, CachePolicy::Remote},
+ {Default, CachePolicy::Default},
+ {Disable, CachePolicy::Disable}};
- CachePolicy Result = CachePolicy::None;
+ using PolicyTextPair = std::pair<CachePolicy, std::string_view>;
+ const PolicyTextPair FlagsToString[]{
+ // Order of these Flags is important: we want the aliases before the atomic values,
+ // and the bigger aliases first, to reduce the number of tokens we add
+ {CachePolicy::Default, Default},
+ {CachePolicy::Remote, Remote},
+ {CachePolicy::Local, Local},
+ {CachePolicy::Store, Store},
+ {CachePolicy::Query, Query},
- ForEachStrTok(QueryPolicy, ',', [&Result](const std::string_view& Token) {
- if (Token == detail::cacheopt::Local)
- {
- Result |= CachePolicy::QueryLocal;
- }
- if (Token == detail::cacheopt::Remote)
+ // Order of Atomics doesn't matter, so arbitrarily we list them in enum order
+ {CachePolicy::QueryLocal, QueryLocal},
+ {CachePolicy::QueryRemote, QueryRemote},
+ {CachePolicy::StoreLocal, StoreLocal},
+ {CachePolicy::StoreRemote, StoreRemote},
+ {CachePolicy::SkipMeta, SkipMeta},
+ {CachePolicy::SkipData, SkipData},
+ {CachePolicy::PartialRecord, PartialRecord},
+ {CachePolicy::KeepAlive, KeepAlive},
+
+ // None must come at the end of the array, to write out only if no others exist
+ {CachePolicy::None, None},
+ };
+ constexpr CachePolicy KnownFlags =
+ CachePolicy::Default | CachePolicy::SkipMeta | CachePolicy::SkipData | CachePolicy::KeepAlive | CachePolicy::PartialRecord;
+} // namespace detail::CachePolicyImpl
+
+StringBuilderBase&
+AppendToBuilderImpl(StringBuilderBase& Builder, CachePolicy Policy)
+{
+ // Remove any bits we don't recognize; write None if there are not any bits we recognize
+ Policy = Policy & detail::CachePolicyImpl::KnownFlags;
+ for (const detail::CachePolicyImpl::PolicyTextPair& Pair : detail::CachePolicyImpl::FlagsToString)
+ {
+ if (EnumHasAllFlags(Policy, Pair.first))
{
- Result |= CachePolicy::QueryRemote;
+ EnumRemoveFlags(Policy, Pair.first);
+ Builder << Pair.second << detail::CachePolicyImpl::DelimiterChar;
+ if (Policy == CachePolicy::None)
+ {
+ break;
+ }
}
- return true;
- });
-
- return Result;
+ }
+ Builder.RemoveSuffix(1); // Text will have been added by CachePolicy::None if not by anything else
+ return Builder;
+}
+StringBuilderBase&
+operator<<(StringBuilderBase& Builder, CachePolicy Policy)
+{
+ return AppendToBuilderImpl(Builder, Policy);
}
CachePolicy
-ParseStoreCachePolicy(std::string_view StorePolicy, CachePolicy Default)
+ParseCachePolicy(std::string_view Text)
{
- if (StorePolicy.empty())
- {
- return Default;
- }
+ ZEN_ASSERT(!Text.empty()); // Empty string is not valid input to ParseCachePolicy
CachePolicy Result = CachePolicy::None;
-
- ForEachStrTok(StorePolicy, ',', [&Result](const std::string_view& Token) {
- if (Token == detail::cacheopt::Local)
- {
- Result |= CachePolicy::StoreLocal;
- }
- if (Token == detail::cacheopt::Remote)
+ ForEachStrTok(Text, detail::CachePolicyImpl::DelimiterChar, [&Result](const std::string_view& Token) {
+ auto it = detail::CachePolicyImpl::TextToPolicy.find(Token);
+ if (it != detail::CachePolicyImpl::TextToPolicy.end())
{
- Result |= CachePolicy::StoreRemote;
+ Result |= it->second;
}
return true;
});
@@ -69,101 +120,139 @@ ParseStoreCachePolicy(std::string_view StorePolicy, CachePolicy Default)
return Result;
}
-CachePolicy
-ParseSkipCachePolicy(std::string_view SkipPolicy, CachePolicy Default)
-{
- if (SkipPolicy.empty())
+namespace Private {
+
+ class CacheRecordPolicyShared final : public ICacheRecordPolicyShared
{
- return Default;
- }
+ public:
+ inline std::span<const CacheValuePolicy> GetValuePolicies() const final { return Values; }
- CachePolicy Result = CachePolicy::None;
+ inline void AddValuePolicy(const CacheValuePolicy& Policy) final { Values.push_back(Policy); }
- ForEachStrTok(SkipPolicy, ',', [&Result](const std::string_view& Token) {
- if (Token == detail::cacheopt::Meta)
- {
- Result |= CachePolicy::SkipMeta;
- }
- if (Token == detail::cacheopt::Value)
+ inline void Build() final
{
- Result |= CachePolicy::SkipValue;
+ std::sort(Values.begin(), Values.end(), [](const CacheValuePolicy& A, const CacheValuePolicy& B) { return A.Id < B.Id; });
}
- if (Token == detail::cacheopt::Attachments)
- {
- Result |= CachePolicy::SkipAttachments;
- }
- if (Token == detail::cacheopt::Data)
- {
- Result |= CachePolicy::SkipData;
- }
- return true;
- });
- return Result;
-}
+ private:
+ std::vector<CacheValuePolicy> Values;
+ };
-CacheRecordPolicy::CacheRecordPolicy(const CachePolicy RecordPolicy, const CachePolicy PayloadPolicy)
-: m_RecordPolicy(RecordPolicy)
-, m_DefaultPayloadPolicy(PayloadPolicy)
-{
-}
+} // namespace Private
CachePolicy
-CacheRecordPolicy::GetPayloadPolicy(const Oid& PayloadId) const
+CacheRecordPolicy::GetValuePolicy(const Oid& Id) const
{
- if (const auto It = m_PayloadPolicies.find(PayloadId); It != m_PayloadPolicies.end())
+ if (Shared)
{
- return It->second;
+ if (std::span<const CacheValuePolicy> Values = Shared->GetValuePolicies(); !Values.empty())
+ {
+ auto Iter =
+ std::lower_bound(Values.begin(), Values.end(), Id, [](const CacheValuePolicy& A, const Oid& B) { return A.Id < B; });
+ if (Iter != Values.end() && Iter->Id == Id)
+ {
+ return Iter->Policy;
+ }
+ }
}
+ return DefaultValuePolicy;
+}
- return m_DefaultPayloadPolicy;
+void
+CacheRecordPolicy::Save(CbWriter& Writer) const
+{
+ Writer.BeginObject();
+ {
+ // The RecordPolicy is calculated from the ValuePolicies and does not need to be saved separately.
+ Writer << "DefaultValuePolicy"sv << WriteToString<128>(GetDefaultValuePolicy());
+ if (!IsUniform())
+ {
+ // FCacheRecordPolicyBuilder guarantees IsUniform -> non-empty GetValuePolicies. Small size penalty here if not.
+ Writer.BeginArray("ValuePolicies"sv);
+ {
+ for (const CacheValuePolicy& ValuePolicy : GetValuePolicies())
+ {
+ // FCacheRecordPolicyBuilder is responsible for ensuring that each ValuePolicy != DefaultValuePolicy
+ // If it lets any duplicates through we will incur a small serialization size penalty here
+ Writer.BeginObject();
+ Writer << "Id"sv << ValuePolicy.Id;
+ Writer << "Policy"sv << WriteToString<128>(ValuePolicy.Policy);
+ Writer.EndObject();
+ }
+ }
+ Writer.EndArray();
+ }
+ }
+ Writer.EndObject();
}
-bool
-CacheRecordPolicy::Load(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy)
+CacheRecordPolicy
+CacheRecordPolicy::Load(CbObjectView Object, CachePolicy DefaultPolicy)
{
- using namespace std::literals;
+ std::string_view PolicyText = Object["DefaultValuePolicy"sv].AsString();
+ CachePolicy DefaultValuePolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- const uint32_t RecordPolicy = RecordPolicyObject["RecordPolicy"sv].AsUInt32(static_cast<uint32_t>(CachePolicy::Default));
- const uint32_t DefaultPayloadPolicy =
- RecordPolicyObject["DefaultPayloadPolicy"sv].AsUInt32(static_cast<uint32_t>(CachePolicy::Default));
+ CacheRecordPolicyBuilder Builder(DefaultValuePolicy);
+ for (CbFieldView ValueObjectField : Object["ValuePolicies"sv])
+ {
+ CbObjectView ValueObject = ValueObjectField.AsObjectView();
+ const Oid ValueId = ValueObject["Id"sv].AsObjectId();
+ PolicyText = ValueObject["Policy"sv].AsString();
+ CachePolicy ValuePolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultValuePolicy;
+ // FCacheRecordPolicyBuilder should guarantee that FValueId(ValueId).IsValid and ValuePolicy != DefaultValuePolicy
+ // If it lets any through we will have unused data in the record we create.
+ Builder.AddValuePolicy(ValueId, ValuePolicy);
+ }
- OutRecordPolicy.m_RecordPolicy = static_cast<CachePolicy>(RecordPolicy);
- OutRecordPolicy.m_DefaultPayloadPolicy = static_cast<CachePolicy>(DefaultPayloadPolicy);
+ return Builder.Build();
+}
- for (CbFieldView PayloadPolicyView : RecordPolicyObject["PayloadPolicies"sv])
+CacheRecordPolicy
+CacheRecordPolicy::ConvertToUpstream() const
+{
+ auto DownstreamToUpstream = [](CachePolicy P) {
+ // Remote|Local -> Set Remote
+ // Delete Skip Flags
+ // Maintain Remaining Flags
+ return (EnumHasAllFlags(P, CachePolicy::QueryRemote) ? CachePolicy::QueryLocal : CachePolicy::None) |
+ (EnumHasAllFlags(P, CachePolicy::StoreRemote) ? CachePolicy::StoreLocal : CachePolicy::None) |
+ (P & ~(CachePolicy::SkipData | CachePolicy::SkipMeta));
+ };
+ CacheRecordPolicyBuilder Builder(DownstreamToUpstream(GetDefaultValuePolicy()));
+ for (const CacheValuePolicy& ValuePolicy : GetValuePolicies())
{
- CbObjectView PayloadPolicyObject = PayloadPolicyView.AsObjectView();
- const Oid PayloadId = PayloadPolicyObject["Id"sv].AsObjectId();
- const uint32_t PayloadPolicy = PayloadPolicyObject["Policy"sv].AsUInt32();
-
- if (PayloadId != Oid::Zero && PayloadPolicy != 0)
- {
- OutRecordPolicy.m_PayloadPolicies.emplace(PayloadId, static_cast<CachePolicy>(PayloadPolicy));
- }
+ Builder.AddValuePolicy(ValuePolicy.Id, DownstreamToUpstream(ValuePolicy.Policy));
}
-
- return true;
+ return Builder.Build();
}
void
-CacheRecordPolicy::Save(const CacheRecordPolicy& Policy, CbWriter& Writer)
+CacheRecordPolicyBuilder::AddValuePolicy(const CacheValuePolicy& Policy)
{
- Writer << "RecordPolicy"sv << static_cast<uint32_t>(Policy.GetRecordPolicy());
- Writer << "DefaultPayloadPolicy"sv << static_cast<uint32_t>(Policy.GetDefaultPayloadPolicy());
+ if (!Shared)
+ {
+ Shared = new Private::CacheRecordPolicyShared;
+ }
+ Shared->AddValuePolicy(Policy);
+}
- if (!Policy.m_PayloadPolicies.empty())
+CacheRecordPolicy
+CacheRecordPolicyBuilder::Build()
+{
+ CacheRecordPolicy Policy(BasePolicy);
+ if (Shared)
{
- Writer.BeginArray("PayloadPolicies"sv);
- for (const auto& Kv : Policy.m_PayloadPolicies)
+ Shared->Build();
+ const auto PolicyOr = [](CachePolicy A, CachePolicy B) { return A | (B & ~CachePolicy::SkipData); };
+ const std::span<const CacheValuePolicy> Values = Shared->GetValuePolicies();
+ Policy.RecordPolicy = BasePolicy;
+ for (const CacheValuePolicy& ValuePolicy : Values)
{
- Writer.BeginObject();
- Writer.AddObjectId("Id"sv, Kv.first);
- Writer << "Policy"sv << static_cast<uint32_t>(Kv.second);
- Writer.EndObject();
+ Policy.RecordPolicy = PolicyOr(Policy.RecordPolicy, ValuePolicy.Policy);
}
- Writer.EndArray();
+ Policy.Shared = std::move(Shared);
}
+ return Policy;
}
} // namespace zen
diff --git a/zenutil/include/zenutil/cache/cachekey.h b/zenutil/include/zenutil/cache/cachekey.h
index fb36c7759..a0a83a883 100644
--- a/zenutil/include/zenutil/cache/cachekey.h
+++ b/zenutil/include/zenutil/cache/cachekey.h
@@ -44,7 +44,7 @@ struct CacheChunkRequest
{
CacheKey Key;
IoHash ChunkId;
- Oid PayloadId;
+ Oid ValueId;
uint64_t RawOffset = 0ull;
uint64_t RawSize = ~uint64_t(0);
CachePolicy Policy = CachePolicy::Default;
@@ -69,11 +69,11 @@ operator<(const CacheChunkRequest& A, const CacheChunkRequest& B)
{
return false;
}
- if (A.PayloadId < B.PayloadId)
+ if (A.ValueId < B.ValueId)
{
return true;
}
- if (B.PayloadId < A.PayloadId)
+ if (B.ValueId < A.ValueId)
{
return false;
}
diff --git a/zenutil/include/zenutil/cache/cachepolicy.h b/zenutil/include/zenutil/cache/cachepolicy.h
index 5675ccf4d..b3602edbd 100644
--- a/zenutil/include/zenutil/cache/cachepolicy.h
+++ b/zenutil/include/zenutil/cache/cachepolicy.h
@@ -2,10 +2,14 @@
#pragma once
+#include <zencore/compactbinary.h>
+#include <zencore/enumflags.h>
+#include <zencore/refcount.h>
#include <zencore/string.h>
#include <zencore/uid.h>
#include <gsl/gsl-lite.hpp>
+#include <span>
#include <unordered_map>
namespace zen {
@@ -34,16 +38,21 @@ enum class CachePolicy : uint32_t
/** Skip fetching the metadata for record requests. */
SkipMeta = 1 << 4,
- /** Skip fetching the value for record, chunk, or value requests. */
- SkipValue = 1 << 5,
- /** Skip fetching the attachments for record requests. */
- SkipAttachments = 1 << 6,
+ /** Skip fetching the data for values. */
+ SkipData = 1 << 5,
+
/**
- * Skip fetching the data for any requests.
+ * Partial output will be provided with the error status when a required value is missing.
+ *
+ * This is meant for cases when the missing values can be individually recovered, or rebuilt,
+ * without rebuilding the whole record. The cache automatically adds this flag when there are
+ * other cache stores that it may be able to recover missing values from.
+ *
+ * Missing values will be returned in the records or chunks, but with only the hash and size.
*
- * Put requests with skip flags may assume that record existence implies payload existence.
+ * Applying this flag for a put of a record allows a partial record to be stored.
*/
- SkipData = SkipMeta | SkipValue | SkipAttachments,
+ PartialRecord = 1 << 6,
/**
* Keep records in the cache for at least the duration of the session.
@@ -53,18 +62,6 @@ enum class CachePolicy : uint32_t
*/
KeepAlive = 1 << 7,
- /**
- * Partial output will be provided with the error status when a required payload is missing.
- *
- * This is meant for cases when the missing payloads can be individually recovered or rebuilt
- * without rebuilding the whole record. The cache automatically adds this flag when there are
- * other cache stores that it may be able to recover missing payloads from.
- *
- * Requests for records would return records where the missing payloads have a hash and size,
- * but no data. Requests for chunks or values would return the hash and size, but no data.
- */
- PartialOnError = 1 << 8,
-
/** Allow cache requests to query and store records and values in local caches. */
Local = QueryLocal | StoreLocal,
/** Allow cache requests to query and store records and values in remote caches. */
@@ -78,35 +75,107 @@ enum class CachePolicy : uint32_t
};
gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy);
+/** Serialize Policy to text and append to Builder. Appended text will not be empty. */
+StringBuilderBase& operator<<(StringBuilderBase& Builder, CachePolicy Policy);
+/** Parse text written by operator<< back into an ECachePolicy. Text must not be empty. */
+CachePolicy ParseCachePolicy(std::string_view Text);
-CachePolicy ParseQueryCachePolicy(std::string_view QueryPolicy, CachePolicy Default = CachePolicy::Query);
-
-CachePolicy ParseStoreCachePolicy(std::string_view StorePolicy, CachePolicy Default = CachePolicy::Store);
-
-CachePolicy ParseSkipCachePolicy(std::string_view SkipPolicy, CachePolicy Default = CachePolicy::None);
+/** A value ID and the cache policy to use for that value. */
+struct CacheValuePolicy
+{
+ Oid Id;
+ CachePolicy Policy = CachePolicy::Default;
+};
+namespace Private {
+ /** Interface for the private implementation of the cache record policy. */
+ class ICacheRecordPolicyShared : public RefCounted
+ {
+ public:
+ virtual ~ICacheRecordPolicyShared() = default;
+ virtual std::span<const CacheValuePolicy> GetValuePolicies() const = 0;
+ virtual void AddValuePolicy(const CacheValuePolicy& Policy) = 0;
+ virtual void Build() = 0;
+ };
+} // namespace Private
+
+/**
+ * Flags to control the behavior of cache record requests, with optional overrides by value.
+ *
+ * Examples:
+ * - A base policy of Disable, with value policy overrides of Default, will fetch those values if
+ * they exist in the record, and skip data for any other values.
+ * - A base policy of Default, with value policy overrides of (Query | SkipData), will skip those
+ * values, but still check if they exist, and will load any other values.
+ */
class CacheRecordPolicy
{
public:
+ /** Construct a cache record policy that uses the default policy. */
CacheRecordPolicy() = default;
- CacheRecordPolicy(const CachePolicy RecordPolicy, const CachePolicy DefaultPayloadPolicy = CachePolicy::Default);
- CachePolicy GetRecordPolicy() const { return m_RecordPolicy; }
- CachePolicy GetPayloadPolicy(const Oid& PayloadId) const;
- CachePolicy GetDefaultPayloadPolicy() const { return m_DefaultPayloadPolicy; }
+ /** Construct a cache record policy with a uniform policy for the record and every value. */
+ inline CacheRecordPolicy(CachePolicy Policy) : RecordPolicy(Policy), DefaultValuePolicy(Policy) {}
+
+ /** Returns true if the record and every value use the same cache policy. */
+ inline bool IsUniform() const { return !Shared && RecordPolicy == DefaultValuePolicy; }
+
+ /** Returns the cache policy to use for the record. */
+ inline CachePolicy GetRecordPolicy() const { return RecordPolicy; }
- bool HasRecordPolicy(const CachePolicy Policy) const { return (m_RecordPolicy & Policy) == Policy; }
- bool HasPayloadPolicy(const Oid& PayloadId, const CachePolicy Policy) const { return (GetPayloadPolicy(PayloadId) & Policy) == Policy; }
+ /** Returns the cache policy to use for the value. */
+ CachePolicy GetValuePolicy(const Oid& Id) const;
- static bool Load(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy);
- static void Save(const CacheRecordPolicy& Policy, CbWriter& Writer);
+ /** Returns the cache policy to use for values with no override. */
+ inline CachePolicy GetDefaultValuePolicy() const { return DefaultValuePolicy; }
+
+ /** Returns the array of cache policy overrides for values, sorted by ID. */
+ inline std::span<const CacheValuePolicy> GetValuePolicies() const
+ {
+ return Shared ? Shared->GetValuePolicies() : std::span<const CacheValuePolicy>();
+ }
+
+ /** Save the values from *this into the given writer. */
+ void Save(CbWriter& Writer) const;
+
+ /**
+ * Returns a policy loaded from values on Object.
+ * Invalid data will result in a uniform CacheRecordPolicy with defaultValuePolicy == DefaultPolicy.
+ */
+ static CacheRecordPolicy Load(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default);
+
+ /** Return *this converted into the equivalent policy that the upstream should use when forwarding a put or get to an upstream server.
+ */
+ CacheRecordPolicy ConvertToUpstream() const;
private:
- using PayloadPolicyMap = std::unordered_map<Oid, CachePolicy, Oid::Hasher>;
+ friend class CacheRecordPolicyBuilder;
+
+ CachePolicy RecordPolicy = CachePolicy::Default;
+ CachePolicy DefaultValuePolicy = CachePolicy::Default;
+ RefPtr<const Private::ICacheRecordPolicyShared> Shared;
+};
+
+/** A cache record policy builder is used to construct a cache record policy. */
+class CacheRecordPolicyBuilder
+{
+public:
+ /** Construct a policy builder that uses the default policy as its base policy. */
+ CacheRecordPolicyBuilder() = default;
- CachePolicy m_RecordPolicy = CachePolicy::Default;
- CachePolicy m_DefaultPayloadPolicy = CachePolicy::Default;
- PayloadPolicyMap m_PayloadPolicies;
+ /** Construct a policy builder that uses the provided policy for the record and values with no override. */
+ inline explicit CacheRecordPolicyBuilder(CachePolicy Policy) : BasePolicy(Policy) {}
+
+ /** Adds a cache policy override for a value. */
+ void AddValuePolicy(const CacheValuePolicy& Policy);
+ inline void AddValuePolicy(const Oid& Id, CachePolicy Policy) { AddValuePolicy({Id, Policy}); }
+
+ /** Build a cache record policy, which makes this builder subsequently unusable. */
+ CacheRecordPolicy Build();
+
+private:
+ CachePolicy BasePolicy = CachePolicy::Default;
+ RefPtr<Private::ICacheRecordPolicyShared> Shared;
};
} // namespace zen
diff --git a/zenutil/include/zenutil/zenserverprocess.h b/zenutil/include/zenutil/zenserverprocess.h
index 55b9a50cd..2a3146e2d 100644
--- a/zenutil/include/zenutil/zenserverprocess.h
+++ b/zenutil/include/zenutil/zenserverprocess.h
@@ -100,11 +100,12 @@ public:
// additional state. For example, you can use the session ID
// to introduce additional named objects
std::atomic<uint32_t> Pid;
- std::atomic<uint16_t> ListenPort;
+ std::atomic<uint16_t> DesiredListenPort;
std::atomic<uint16_t> Flags;
uint8_t SessionId[12];
std::atomic<uint32_t> SponsorPids[8];
- uint8_t Padding[12];
+ std::atomic<uint16_t> EffectiveListenPort;
+ uint8_t Padding[10];
enum class FlagsEnum : uint16_t
{
@@ -125,8 +126,8 @@ public:
void Initialize();
[[nodiscard]] bool InitializeReadOnly();
- [[nodiscard]] ZenServerEntry* Lookup(int ListenPort);
- ZenServerEntry* Register(int ListenPort);
+ [[nodiscard]] ZenServerEntry* Lookup(int DesiredListenPort);
+ ZenServerEntry* Register(int DesiredListenPort);
void Sweep();
void Snapshot(std::function<void(const ZenServerEntry&)>&& Callback);
inline bool IsReadOnly() const { return m_IsReadOnly; }
diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp
index fe6236d18..5bddc72bc 100644
--- a/zenutil/zenserverprocess.cpp
+++ b/zenutil/zenserverprocess.cpp
@@ -230,11 +230,11 @@ ZenServerState::InitializeReadOnly()
}
ZenServerState::ZenServerEntry*
-ZenServerState::Lookup(int ListenPort)
+ZenServerState::Lookup(int DesiredListenPort)
{
for (int i = 0; i < m_MaxEntryCount; ++i)
{
- if (m_Data[i].ListenPort == ListenPort)
+ if (m_Data[i].DesiredListenPort == DesiredListenPort)
{
return &m_Data[i];
}
@@ -244,7 +244,7 @@ ZenServerState::Lookup(int ListenPort)
}
ZenServerState::ZenServerEntry*
-ZenServerState::Register(int ListenPort)
+ZenServerState::Register(int DesiredListenPort)
{
if (m_Data == nullptr)
{
@@ -259,17 +259,18 @@ ZenServerState::Register(int ListenPort)
{
ZenServerEntry& Entry = m_Data[i];
- if (Entry.ListenPort.load(std::memory_order_relaxed) == 0)
+ if (Entry.DesiredListenPort.load(std::memory_order_relaxed) == 0)
{
uint16_t Expected = 0;
- if (Entry.ListenPort.compare_exchange_strong(Expected, uint16_t(ListenPort)))
+ if (Entry.DesiredListenPort.compare_exchange_strong(Expected, uint16_t(DesiredListenPort)))
{
// Successfully allocated entry
m_OurEntry = &Entry;
- Entry.Pid = Pid;
- Entry.Flags = 0;
+ Entry.Pid = Pid;
+ Entry.EffectiveListenPort = 0;
+ Entry.Flags = 0;
const Oid SesId = GetSessionId();
memcpy(Entry.SessionId, &SesId, sizeof SesId);
@@ -296,11 +297,11 @@ ZenServerState::Sweep()
{
ZenServerEntry& Entry = m_Data[i];
- if (Entry.ListenPort)
+ if (Entry.DesiredListenPort)
{
if (IsProcessRunning(Entry.Pid) == false)
{
- ZEN_DEBUG("Sweep - pid {} not running, reclaiming entry (port {})", Entry.Pid, Entry.ListenPort);
+ ZEN_DEBUG("Sweep - pid {} not running, reclaiming entry (port {})", Entry.Pid, Entry.DesiredListenPort);
Entry.Reset();
}
@@ -320,7 +321,7 @@ ZenServerState::Snapshot(std::function<void(const ZenServerEntry&)>&& Callback)
{
ZenServerEntry& Entry = m_Data[i];
- if (Entry.ListenPort)
+ if (Entry.DesiredListenPort)
{
Callback(Entry);
}
@@ -330,9 +331,10 @@ ZenServerState::Snapshot(std::function<void(const ZenServerEntry&)>&& Callback)
void
ZenServerState::ZenServerEntry::Reset()
{
- Pid = 0;
- ListenPort = 0;
- Flags = 0;
+ Pid = 0;
+ DesiredListenPort = 0;
+ Flags = 0;
+ EffectiveListenPort = 0;
}
void