aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--scripts/remote_build.py88
-rw-r--r--zen/internalfile.cpp6
-rw-r--r--zencore/filesystem.cpp6
-rw-r--r--zencore/thread.cpp24
-rw-r--r--zenhttp/httpasio.cpp20
-rw-r--r--zenserver-test/zenserver-test.cpp631
-rw-r--r--zenserver/cache/structuredcache.cpp827
-rw-r--r--zenserver/cache/structuredcache.h62
-rw-r--r--zenserver/upstream/upstreamcache.cpp337
-rw-r--r--zenserver/upstream/zen.cpp37
-rw-r--r--zenserver/upstream/zen.h1
-rw-r--r--zenstore/basicfile.cpp9
-rw-r--r--zenstore/filecas.cpp9
-rw-r--r--zenutil/cache/cachepolicy.cpp12
-rw-r--r--zenutil/include/zenutil/cache/cachepolicy.h8
-rw-r--r--zenutil/zenserverprocess.cpp5
16 files changed, 1387 insertions, 695 deletions
diff --git a/scripts/remote_build.py b/scripts/remote_build.py
index c5787f635..08e44c9ad 100644
--- a/scripts/remote_build.py
+++ b/scripts/remote_build.py
@@ -53,14 +53,11 @@ def _find_binary(name):
raise EnvironmentError(f"Unable to find '{name}' in the path")
#-------------------------------------------------------------------------------
-class _AutoKill(object):
- def __init__(self, proc):
- self._proc = proc
-
+class _DaemonAutoKill(object):
def __del__(self):
- self._proc.kill()
- self._proc.wait()
- pass
+ # Roundabout way to avoid orphaned git-daemon processes
+ if os.name == "nt":
+ os.system("taskkill /f /im git-daemon.exe")
@@ -73,7 +70,10 @@ def _local(args):
parser = argparse.ArgumentParser(description=desc)
parser.add_argument("remotehost", help="")
parser.add_argument("action", default="build", nargs="?", help="")
+ parser.add_argument("--commit", default=None, help="Commit to act on")
parser.add_argument("--keyfile", default=None, help="SSH key file")
+ parser.add_argument("--gitport", default=None, help="Use an exist git daemon at the given port")
+ parser.add_argument("--outdir", default=None, help="Put .zip bundles here")
args = parser.parse_args(args)
# Find the binaries we'll need
@@ -135,28 +135,38 @@ def _local(args):
print(f"Using host '{host}'")
# Start a git daemon to use as a transfer mechanism
- _header("Starting a git daemon")
- print("Port: 4493")
- print("Base-path: ", zen_dir)
- print("Host: ", _get_ip())
- daemon = subprocess.Popen(
- ( git_bin,
- "daemon",
- "--port=4493",
- "--export-all",
- "--reuseaddr",
- "--verbose",
- "--informative-errors",
- "--base-path=" + str(zen_dir) ),
- #stdout = daemon_log,
- stderr = subprocess.STDOUT
- )
- daemon_killer = _AutoKill(daemon)
+ _header("Git daemon")
+ daemon_port = args.gitport
+ if not daemon_port:
+ daemon_port = 4493
+
+ print("Starting out own one up")
+ print("Port:", daemon_port)
+ print("Base-path: ", zen_dir)
+ print("Host: ", _get_ip())
+ daemon = subprocess.Popen(
+ ( git_bin,
+ "daemon",
+ "--port=" + str(daemon_port),
+ "--export-all",
+ "--reuseaddr",
+ "--verbose",
+ "--informative-errors",
+ "--base-path=" + str(zen_dir) ),
+ #stdout = daemon_log,
+ stderr = subprocess.STDOUT
+ )
+ daemon_killer = _DaemonAutoKill()
+ else:
+ print("Using existing instance")
+ print("Port:", daemon_port)
# Run this script on the remote machine
_header("Running SSH")
- remote_zen_dir = "%s_%s" % (os.getlogin(), _get_ip())
+ commit = args.commit if args.commit else "origin/main"
+
+ remote_zen_dir = "%s_%s_%s" % (os.getlogin(), _get_ip(), str(daemon_port))
print(f"Using zen '~/{remote_zen_dir}'")
print(f"Running {__file__} remotely")
@@ -166,12 +176,12 @@ def _local(args):
*identity,
"-tA",
host,
- f"python3 -u - !remote {_get_ip()} '{remote_zen_dir}' main '{args.action}'",
+ f"python3 -u - !remote {_get_ip()}:{daemon_port} '{remote_zen_dir}' {commit} '{args.action}'",
stdin=self_file)
# If we're bundling, collect zip files from the remote machine
if args.action == "bundle":
- build_dir = zen_dir / "build"
+ build_dir = Path(args.outdir) or (zen_dir / "build")
build_dir.mkdir(exist_ok=True)
scp_args = (*identity, host + f":zen/{remote_zen_dir}/build/*.zip", build_dir)
_run_checked("scp", *scp_args)
@@ -185,9 +195,9 @@ def _remote(args):
# Parse arguments
desc = "Build Zen on a remote host"
parser = argparse.ArgumentParser(description=desc)
- parser.add_argument("ip", help="Host's IP address")
+ parser.add_argument("gitaddr", help="Host's Git daemon address")
parser.add_argument("reponame", help="Repository name clone into and work in")
- parser.add_argument("branch", help="Zen branch to operate on")
+ parser.add_argument("commit", help="Zen commit to operate on")
parser.add_argument("action", help="The action to do")
args = parser.parse_args(args)
@@ -203,14 +213,15 @@ def _remote(args):
"""
# Check for a clone, create it, chdir to it
- _header("REMOTE:", f"Clone/pull from {args.ip}")
+ _header("REMOTE:", f"Clone/pull from {args.gitaddr}")
clone_dir = zen_dir / args.reponame
if not clone_dir.is_dir():
- _run_checked("git", "clone", f"git://{args.ip}:4493/", clone_dir)
+ _run_checked("git", "clone", f"git://{args.gitaddr}/", clone_dir)
os.chdir(clone_dir)
- _run_checked("git", "checkout", args.branch)
- _run_checked("git", "pull", "-r")
+ _run_checked("git", "clean", "-fd")
+ _run_checked("git", "fetch", "origin")
+ _run_checked("git", "checkout", args.commit)
_header("REMOTE:", f"Performing action '{args.action}'")
@@ -257,14 +268,7 @@ if __name__ == "__main__":
ret = _remote(sys.argv[2:])
raise SystemExit(ret)
- try:
- ret = _local(sys.argv[1:])
- raise SystemExit(ret)
- except:
- raise
- finally:
- # Roundabout way to avoid orphaned git-daemon processes
- if os.name == "nt":
- os.system("taskkill /f /im git-daemon.exe")
+ ret = _local(sys.argv[1:])
+ raise SystemExit(ret)
# vim: expandtab foldlevel=1 foldmethod=marker
diff --git a/zen/internalfile.cpp b/zen/internalfile.cpp
index 804375ce2..b3b587a41 100644
--- a/zen/internalfile.cpp
+++ b/zen/internalfile.cpp
@@ -179,12 +179,16 @@ InternalFile::OpenWrite(std::filesystem::path FileName, bool IsCreate)
HRESULT hRes = m_File.Create(FileName.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, dwCreationDisposition);
Success = SUCCEEDED(hRes);
#else
- int OpenFlags = O_RDWR;
+ int OpenFlags = O_RDWR | O_CLOEXEC;
OpenFlags |= IsCreate ? O_CREAT | O_TRUNC : 0;
int Fd = open(FileName.c_str(), OpenFlags, 0666);
if (Fd >= 0)
{
+ if (IsCreate)
+ {
+ fchmod(Fd, 0666);
+ }
Success = true;
m_File = (void*)(intptr_t(Fd));
}
diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp
index ab606301c..79563190c 100644
--- a/zencore/filesystem.cpp
+++ b/zencore/filesystem.cpp
@@ -441,6 +441,7 @@ CloneFile(std::filesystem::path FromPath, std::filesystem::path ToPath)
{
return false;
}
+ fchmod(ToFd, 0666);
ScopedFd $To = { FromFd };
ioctl(ToFd, FICLONE, FromFd);
@@ -502,11 +503,12 @@ CopyFile(std::filesystem::path FromPath, std::filesystem::path ToPath, const Cop
ScopedFd $From = {FromFd};
// To file
- int ToFd = open(ToPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0644);
+ int ToFd = open(ToPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666);
if (ToFd < 0)
{
ThrowLastError(fmt::format("failed to create file {}", ToPath));
}
+ fchmod(ToFd, 0666);
ScopedFd $To = {ToFd};
// Copy impl
@@ -569,6 +571,8 @@ WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t Buffer
{
ThrowLastError(fmt::format("File open failed for '{}'", Path));
}
+
+ fchmod(Fd, 0666);
#endif
// TODO: this should be block-enlightened
diff --git a/zencore/thread.cpp b/zencore/thread.cpp
index a123eec82..527938cf3 100644
--- a/zencore/thread.cpp
+++ b/zencore/thread.cpp
@@ -28,6 +28,7 @@
# include <signal.h>
# include <sys/file.h>
# include <sys/sem.h>
+# include <sys/stat.h>
# include <sys/wait.h>
# include <time.h>
# include <unistd.h>
@@ -73,6 +74,19 @@ SetNameInternal(DWORD thread_id, const char* name)
}
#endif
+#if ZEN_PLATFORM_LINUX
+const bool bNoZombieChildren = [] () {
+ // When a child process exits it is put into a zombie state until the parent
+ // collects its result. This doesn't fit the Windows-like model that Zen uses
+ // where there is a less strict familial model and no zombification. Ignoring
+ // SIGCHLD siganals removes the need to call wait() on zombies. Another option
+ // would be for the child to call setsid() but that would detatch the child
+ // from the terminal.
+ sigignore(SIGCHLD);
+ return true;
+} ();
+#endif
+
void
SetCurrentThreadName([[maybe_unused]] std::string_view ThreadName)
{
@@ -257,11 +271,12 @@ NamedEvent::NamedEvent(std::string_view EventName)
ExtendableStringBuilder<64> EventPath;
EventPath << "/tmp/" << EventName;
- int Fd = open(EventPath.c_str(), O_RDWR | O_CREAT, 0644);
+ int Fd = open(EventPath.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, 0666);
if (Fd < 0)
{
ThrowLastError(fmt::format("Failed to create '{}' for named event", EventPath));
}
+ fchmod(Fd, 0666);
// Use the file path to generate an IPC key
key_t IpcKey = ftok(EventPath.c_str(), 1);
@@ -433,11 +448,12 @@ NamedMutex::Create(std::string_view MutexName)
ExtendableStringBuilder<64> Name;
Name << "/tmp/" << MutexName;
- int Inner = open(Name.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, 0644);
+ int Inner = open(Name.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, 0666);
if (Inner < 0)
{
return false;
}
+ fchmod(Inner, 0666);
if (flock(Inner, LOCK_EX) != 0)
{
@@ -476,7 +492,7 @@ NamedMutex::Exists(std::string_view MutexName)
Name << "/tmp/" << MutexName;
bool bExists = false;
- int Fd = open(Name.c_str(), O_RDWR, 0644);
+ int Fd = open(Name.c_str(), O_RDWR | O_CLOEXEC);
if (Fd >= 0)
{
if (flock(Fd, LOCK_EX | LOCK_NB) == 0)
@@ -628,8 +644,10 @@ ProcessHandle::Wait(int TimeoutMs)
timespec SleepTime = {0, SleepMs * 1000 * 1000};
for (int i = 0;; i += SleepMs)
{
+#if ZEN_PLATFORM_MAC
int WaitState = 0;
waitpid(m_Pid, &WaitState, WNOHANG | WCONTINUED | WUNTRACED);
+#endif
if (kill(m_Pid, 0) < 0)
{
diff --git a/zenhttp/httpasio.cpp b/zenhttp/httpasio.cpp
index 318f47eff..45994bb67 100644
--- a/zenhttp/httpasio.cpp
+++ b/zenhttp/httpasio.cpp
@@ -949,10 +949,18 @@ struct HttpAcceptor
// This must be used by both the client and server side, and is only effective in the absence of
// Windows Filtering Platform (WFP) callouts which can be installed by security software.
// https://docs.microsoft.com/en-us/windows/win32/winsock/sio-loopback-fast-path
- SOCKET NativeSocket = m_Acceptor.native_handle();
- int LoopbackOptionValue = 1;
- DWORD OptionNumberOfBytesReturned = 0;
- WSAIoctl(NativeSocket, SIO_LOOPBACK_FAST_PATH, &LoopbackOptionValue, sizeof(LoopbackOptionValue), NULL, 0, &OptionNumberOfBytesReturned, 0, 0);
+ SOCKET NativeSocket = m_Acceptor.native_handle();
+ int LoopbackOptionValue = 1;
+ DWORD OptionNumberOfBytesReturned = 0;
+ WSAIoctl(NativeSocket,
+ SIO_LOOPBACK_FAST_PATH,
+ &LoopbackOptionValue,
+ sizeof(LoopbackOptionValue),
+ NULL,
+ 0,
+ &OptionNumberOfBytesReturned,
+ 0,
+ 0);
#endif
m_Acceptor.listen();
}
@@ -983,8 +991,8 @@ struct HttpAcceptor
// reference to the callbacks.
Socket->set_option(asio::ip::tcp::no_delay(true));
- Socket->set_option(asio::socket_base::receive_buffer_size(128*1024));
- Socket->set_option(asio::socket_base::send_buffer_size(256*1024));
+ Socket->set_option(asio::socket_base::receive_buffer_size(128 * 1024));
+ Socket->set_option(asio::socket_base::send_buffer_size(256 * 1024));
auto Conn = std::make_shared<HttpServerConnection>(m_Server, std::move(Socket));
Conn->HandleNewRequest();
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index aac43f43a..6a1b54b79 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -798,7 +798,8 @@ TEST_CASE("zcache.basic")
{
zen::IoHash Key = zen::IoHash::HashBuffer(&i, sizeof i);
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)});
+ cpr::Response Result =
+ cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
CHECK(Result.status_code == 200);
}
@@ -836,7 +837,8 @@ TEST_CASE("zcache.basic")
{
zen::IoHash Key = HashKey(i);
- cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)});
+ cpr::Response Result =
+ cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
CHECK(Result.status_code == 200);
}
@@ -1349,7 +1351,7 @@ TEST_CASE("zcache.policy")
// Get record
{
cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)},
- cpr::Header{{"Accept", "application/x-ue-cbobject"}});
+ cpr::Header{{"Accept", "application/x-ue-cb"}});
CHECK(IsHttpSuccessCode(Result.status_code));
IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size());
CbObject ResponseObject = zen::LoadCompactBinaryObject(Buffer);
@@ -1713,6 +1715,629 @@ TEST_CASE("zcache.rpc")
}
}
+TEST_CASE("zcache.rpc.allpolicies")
+{
+ using namespace std::literals;
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamServer(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalServer(TestEnv);
+ const uint16_t LocalPortNumber = 13337;
+ const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
+
+ SpawnServer(UpstreamServer, UpstreamCfg);
+ SpawnServer(LocalServer, LocalCfg);
+
+ std::string_view TestVersion = "F72150A02AE34B57A9EC91D36BA1CE08"sv;
+ std::string_view TestBucket = "allpoliciestest"sv;
+
+ // NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local)
+ // *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type)
+ constexpr int NumKeys = 256;
+ constexpr int NumValues = 4;
+ Oid ValueIds[NumValues];
+ IoHash Hash;
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ ExtendableStringBuilder<16> ValueName;
+ ValueName << "ValueId_"sv << ValueIndex;
+ static_assert(sizeof(IoHash) >= sizeof(Oid));
+ ValueIds[ValueIndex] = Oid::FromMemory(IoHash::HashBuffer(ValueName.Data(), ValueName.Size() * sizeof(ValueName.Data()[0])).Hash);
+ }
+
+ struct KeyData;
+ struct UserData
+ {
+ UserData& Set(KeyData* InKeyData, int InValueIndex)
+ {
+ Data = InKeyData;
+ ValueIndex = InValueIndex;
+ return *this;
+ }
+ KeyData* Data = nullptr;
+ int ValueIndex = 0;
+ };
+ struct KeyData
+ {
+ CompressedBuffer BufferValues[NumValues];
+ uint64_t IntValues[NumValues];
+ UserData ValueUserData[NumValues];
+ bool ReceivedChunk[NumValues];
+ CacheKey Key;
+ UserData KeyUserData;
+ uint32_t KeyIndex = 0;
+ bool GetRequestsData = true;
+ bool UseValueAPI = false;
+ bool UseValuePolicy = false;
+ bool ForceMiss = false;
+ bool UseLocal = true;
+ bool UseRemote = true;
+ bool ShouldBeHit = true;
+ bool ReceivedPut = false;
+ bool ReceivedGet = false;
+ bool ReceivedPutValue = false;
+ bool ReceivedGetValue = false;
+ };
+ struct CachePutRequest
+ {
+ CacheKey Key;
+ CbObject Record;
+ CacheRecordPolicy Policy;
+ KeyData* Values;
+ UserData* Data;
+ };
+ struct CachePutValueRequest
+ {
+ CacheKey Key;
+ CompressedBuffer Value;
+ CachePolicy Policy;
+ UserData* Data;
+ };
+ struct CacheGetRequest
+ {
+ CacheKey Key;
+ CacheRecordPolicy Policy;
+ UserData* Data;
+ };
+ struct CacheGetValueRequest
+ {
+ CacheKey Key;
+ CachePolicy Policy;
+ UserData* Data;
+ };
+ struct CacheGetChunkRequest
+ {
+ CacheKey Key;
+ Oid ValueId;
+ uint64_t RawOffset;
+ uint64_t RawSize;
+ IoHash RawHash;
+ CachePolicy Policy;
+ UserData* Data;
+ };
+
+ KeyData KeyDatas[NumKeys];
+ std::vector<CachePutRequest> PutRequests;
+ std::vector<CachePutValueRequest> PutValueRequests;
+ std::vector<CacheGetRequest> GetRequests;
+ std::vector<CacheGetValueRequest> GetValueRequests;
+ std::vector<CacheGetChunkRequest> ChunkRequests;
+
+ for (uint32_t KeyIndex = 0; KeyIndex < NumKeys; ++KeyIndex)
+ {
+ IoHashStream KeyWriter;
+ KeyWriter.Append(TestVersion.data(), TestVersion.length() * sizeof(TestVersion.data()[0]));
+ KeyWriter.Append(&KeyIndex, sizeof(KeyIndex));
+ IoHash KeyHash = KeyWriter.GetHash();
+ KeyData& KeyData = KeyDatas[KeyIndex];
+
+ KeyData.Key = CacheKey::Create(TestBucket, KeyHash);
+ KeyData.KeyIndex = KeyIndex;
+ KeyData.GetRequestsData = (KeyIndex & (1 << 1)) == 0;
+ KeyData.UseValueAPI = (KeyIndex & (1 << 2)) != 0;
+ KeyData.UseValuePolicy = (KeyIndex & (1 << 3)) != 0;
+ KeyData.ForceMiss = (KeyIndex & (1 << 4)) == 0;
+ KeyData.UseLocal = (KeyIndex & (1 << 5)) == 0;
+ KeyData.UseRemote = (KeyIndex & (1 << 6)) == 0;
+ KeyData.ShouldBeHit = !KeyData.ForceMiss && (KeyData.UseLocal || KeyData.UseRemote);
+ CachePolicy SharedPolicy = KeyData.UseLocal ? CachePolicy::Local : CachePolicy::None;
+ SharedPolicy |= KeyData.UseRemote ? CachePolicy::Remote : CachePolicy::None;
+ CachePolicy PutPolicy = SharedPolicy;
+ CachePolicy GetPolicy = SharedPolicy;
+ GetPolicy |= !KeyData.GetRequestsData ? CachePolicy::SkipData : CachePolicy::None;
+ CacheKey& Key = KeyData.Key;
+
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ KeyData.IntValues[ValueIndex] = static_cast<uint64_t>(KeyIndex) | (static_cast<uint64_t>(ValueIndex) << 32);
+ KeyData.BufferValues[ValueIndex] =
+ CompressedBuffer::Compress(SharedBuffer::MakeView(&KeyData.IntValues[ValueIndex], sizeof(KeyData.IntValues[ValueIndex])));
+ KeyData.ReceivedChunk[ValueIndex] = false;
+ }
+
+ UserData& KeyUserData = KeyData.KeyUserData.Set(&KeyData, -1);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ KeyData.ValueUserData[ValueIndex].Set(&KeyData, ValueIndex);
+ }
+ if (!KeyData.UseValueAPI)
+ {
+ CbObjectWriter Builder;
+ Builder.BeginObject("key"sv);
+ Builder << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash;
+ Builder.EndObject();
+ Builder.BeginArray("Values"sv);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ Builder.BeginObject();
+ Builder.AddObjectId("Id"sv, ValueIds[ValueIndex]);
+ Builder.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(KeyData.BufferValues[ValueIndex].GetRawHash()));
+ Builder.AddInteger("RawSize"sv, KeyData.BufferValues[ValueIndex].GetRawSize());
+ Builder.EndObject();
+ }
+ Builder.EndArray();
+
+ CacheRecordPolicy PutRecordPolicy;
+ CacheRecordPolicy GetRecordPolicy;
+ if (!KeyData.UseValuePolicy)
+ {
+ PutRecordPolicy = CacheRecordPolicy(PutPolicy);
+ GetRecordPolicy = CacheRecordPolicy(GetPolicy);
+ }
+ else
+ {
+ // Switch the SkipData field in the Record policy so that if the CacheStore ignores the ValuePolicies
+ // it will use the wrong value for SkipData and fail our tests.
+ CacheRecordPolicyBuilder PutBuilder(PutPolicy ^ CachePolicy::SkipData);
+ CacheRecordPolicyBuilder GetBuilder(GetPolicy ^ CachePolicy::SkipData);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ PutBuilder.AddValuePolicy(ValueIds[ValueIndex], PutPolicy);
+ GetBuilder.AddValuePolicy(ValueIds[ValueIndex], GetPolicy);
+ }
+ PutRecordPolicy = PutBuilder.Build();
+ GetRecordPolicy = GetBuilder.Build();
+ }
+ if (!KeyData.ForceMiss)
+ {
+ PutRequests.push_back({Key, Builder.Save(), PutRecordPolicy, &KeyData, &KeyUserData});
+ }
+ GetRequests.push_back({Key, GetRecordPolicy, &KeyUserData});
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ UserData& ValueUserData = KeyData.ValueUserData[ValueIndex];
+ ChunkRequests.push_back({Key, ValueIds[ValueIndex], 0, UINT64_MAX, IoHash(), GetPolicy, &ValueUserData});
+ }
+ }
+ else
+ {
+ if (!KeyData.ForceMiss)
+ {
+ PutValueRequests.push_back({Key, KeyData.BufferValues[0], PutPolicy, &KeyUserData});
+ }
+ GetValueRequests.push_back({Key, GetPolicy, &KeyUserData});
+ ChunkRequests.push_back({Key, Oid(), 0, UINT64_MAX, IoHash(), GetPolicy, &KeyUserData});
+ }
+ }
+
+ // PutCacheRecords
+ {
+ CbPackage Package;
+ CbObjectWriter Writer;
+ Writer << "Method"sv
+ << "PutCacheRecords"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("Requests"sv);
+ for (CachePutRequest& Request : PutRequests)
+ {
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Record"sv);
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash;
+ }
+ Writer.EndObject();
+ Writer.BeginArray("Values"sv);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ Writer.BeginObject();
+ {
+ CompressedBuffer Buffer = Request.Values->BufferValues[ValueIndex];
+ Writer.AddObjectId("Id"sv, ValueIds[ValueIndex]);
+ Writer.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Buffer.GetRawHash()));
+ Package.AddAttachment(CbAttachment(Buffer));
+ Writer.AddInteger("RawSize"sv, Buffer.GetRawSize());
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Writer.SetName("Policy"sv);
+ Request.Policy.Save(Writer);
+ }
+ Writer.EndObject();
+ Request.Data->Data->ReceivedPut = true;
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save());
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ }
+
+ // PutCacheValues
+ {
+ CbPackage Package;
+ CbObjectWriter Writer;
+ Writer << "Method"sv
+ << "PutCacheValues"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("Requests"sv);
+ for (CachePutValueRequest& Request : PutValueRequests)
+ {
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash;
+ }
+ Writer.EndObject();
+ CompressedBuffer Buffer = Request.Value;
+ Writer.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Buffer.GetRawHash()));
+ Package.AddAttachment(CbAttachment(Buffer));
+ Writer.AddString("Policy"sv, WriteToString<128>(Request.Policy));
+ }
+ Writer.EndObject();
+ Request.Data->Data->ReceivedPutValue = true;
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save());
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ }
+
+ for (KeyData& KeyData : KeyDatas)
+ {
+ if (!KeyData.ForceMiss)
+ {
+ if (!KeyData.UseValueAPI)
+ {
+ CHECK(KeyData.ReceivedPut);
+ }
+ else
+ {
+ CHECK(KeyData.ReceivedPutValue);
+ }
+ }
+ }
+
+ // GetCacheRecords
+ {
+ CbPackage Package;
+ CbObjectWriter Writer;
+ Writer << "Method"sv
+ << "GetCacheRecords"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("Requests"sv);
+ for (CacheGetRequest& Request : GetRequests)
+ {
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash;
+ }
+ Writer.EndObject();
+ Writer.SetName("Policy"sv);
+ Request.Policy.Save(Writer);
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save());
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ CbPackage Response;
+ bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(Loaded);
+ CbObjectView ResponseObject = Response.GetObject();
+ CbArrayView Responses = ResponseObject["Result"sv].AsArrayView();
+ CHECK(Responses.Num() == GetRequests.size());
+ int Index = 0;
+ for (CbFieldView ResponseField : Responses)
+ {
+ CbObjectView RecordView = ResponseField.AsObjectView();
+ bool Succeeded = !ResponseField.HasError();
+
+ CacheGetRequest& Request = GetRequests[Index++];
+ KeyData* KeyData = Request.Data->Data;
+ KeyData->ReceivedGet = true;
+
+ if (KeyData->ShouldBeHit)
+ {
+ CHECK(Succeeded);
+ }
+ else if (KeyData->ForceMiss)
+ {
+ CHECK(!Succeeded);
+ }
+ if (!KeyData->ForceMiss && Succeeded)
+ {
+ CbArrayView ValuesArray = RecordView["Values"sv].AsArrayView();
+ CHECK(ValuesArray.Num() == NumValues);
+
+ for (CbFieldView ValueField : ValuesArray)
+ {
+ CbObjectView ValueObject = ValueField.AsObjectView();
+ Oid ActualValueId = ValueObject["Id"sv].AsObjectId();
+ int ExpectedValueIndex = 0;
+ for (; ExpectedValueIndex < NumValues; ++ExpectedValueIndex)
+ {
+ if (ValueIds[ExpectedValueIndex] == ActualValueId)
+ {
+ break;
+ }
+ }
+ CHECK(ExpectedValueIndex < NumValues);
+ IoHash ActualRawHash = ValueObject["RawHash"sv].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(ActualRawHash);
+ CompressedBuffer ActualBuffer = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer();
+ uint64_t ActualRawSize = UINT64_MAX;
+ if (ActualBuffer)
+ {
+ ActualRawSize = ActualBuffer.GetRawSize();
+ }
+ else
+ {
+ ActualRawSize = ValueObject["RawSize"sv].AsUInt64(UINT64_MAX);
+ }
+ CompressedBuffer ExpectedValue = KeyData->BufferValues[ExpectedValueIndex];
+ CHECK(ActualRawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash()));
+ CHECK(ActualRawSize == ExpectedValue.GetRawSize());
+
+ if (KeyData->GetRequestsData)
+ {
+ SharedBuffer Buffer = ActualBuffer.Decompress();
+ CHECK(Buffer.GetSize() == ActualRawSize);
+ uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
+ uint64_t ExpectedIntValue = KeyData->IntValues[ExpectedValueIndex];
+ CHECK(ActualIntValue == ExpectedIntValue);
+ }
+ }
+ }
+ }
+ }
+
+ // GetCacheValues
+ {
+ CbPackage Package;
+ CbObjectWriter Writer;
+ Writer << "Method"sv
+ << "GetCacheValues"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("Requests"sv);
+ for (CacheGetValueRequest& Request : GetValueRequests)
+ {
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash;
+ }
+ Writer.EndObject();
+ Writer.AddString("Policy"sv, WriteToString<128>(Request.Policy));
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save());
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ CbPackage Response;
+ bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(Loaded);
+ CbObjectView ResponseObject = Response.GetObject();
+ CbArrayView Responses = ResponseObject["Result"sv].AsArrayView();
+ CHECK(Responses.Num() == GetValueRequests.size());
+ int Index = 0;
+ for (CbFieldView RequestResultView : Responses)
+ {
+ CbObjectView RequestResultObject = RequestResultView.AsObjectView();
+ CbFieldView RawHashField = RequestResultObject["RawHash"sv];
+ IoHash ActualRawHash = RawHashField.AsHash();
+ bool Succeeded = !RawHashField.HasError();
+
+ CacheGetValueRequest& Request = GetValueRequests[Index++];
+ KeyData* KeyData = Request.Data->Data;
+ KeyData->ReceivedGetValue = true;
+
+ if (KeyData->ShouldBeHit)
+ {
+ CHECK(Succeeded);
+ }
+ else if (KeyData->ForceMiss)
+ {
+ CHECK(!Succeeded);
+ }
+ if (!KeyData->ForceMiss && Succeeded)
+ {
+ const CbAttachment* Attachment = Response.FindAttachment(ActualRawHash);
+ CompressedBuffer ActualBuffer = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer();
+ uint64_t ActualRawSize = UINT64_MAX;
+ if (ActualBuffer)
+ {
+ ActualRawSize = ActualBuffer.GetRawSize();
+ }
+ else
+ {
+ ActualRawSize = RequestResultObject["RawSize"sv].AsUInt64(UINT64_MAX);
+ }
+ CompressedBuffer ExpectedValue = KeyData->BufferValues[0];
+ CHECK(ActualRawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash()));
+ CHECK(ActualRawSize == ExpectedValue.GetRawSize());
+
+ if (KeyData->GetRequestsData)
+ {
+ SharedBuffer Buffer = ActualBuffer.Decompress();
+ CHECK(Buffer.GetSize() == ActualRawSize);
+ uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
+ uint64_t ExpectedIntValue = KeyData->IntValues[0];
+ CHECK(ActualIntValue == ExpectedIntValue);
+ }
+ }
+ }
+ }
+
+ // GetCacheChunks
+ {
+ CbPackage Package;
+ CbObjectWriter Writer;
+ std::sort(ChunkRequests.begin(), ChunkRequests.end(), [](CacheGetChunkRequest& A, CacheGetChunkRequest& B) {
+ return A.Key.Hash < B.Key.Hash;
+ });
+ Writer << "Method"sv
+ << "GetCacheChunks"sv;
+ Writer.BeginObject("Params"sv);
+ {
+ CachePolicy BatchDefaultPolicy = CachePolicy::Default;
+ Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy);
+ Writer.BeginArray("ChunkRequests"sv);
+ for (CacheGetChunkRequest& Request : ChunkRequests)
+ {
+ Writer.BeginObject();
+ {
+ Writer.BeginObject("Key"sv);
+ {
+ Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash;
+ }
+ Writer.EndObject();
+ Writer.AddObjectId("ValueId"sv, ValueIds[Request.Data->ValueIndex]);
+ Writer.AddInteger("RawOffset"sv, Request.RawOffset);
+ Writer.AddInteger("RawSize"sv, Request.RawSize);
+ Writer.AddHash("ChunkId"sv, IoHash());
+ Writer.AddString("Policy"sv, WriteToString<128>(Request.Policy));
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ Writer.EndObject();
+ Package.SetObject(Writer.Save());
+ IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
+ cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+ CHECK(Result.status_code == 200);
+ CbPackage Response;
+ bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
+ CHECK(Loaded);
+ CbObjectView ResponseObject = Response.GetObject();
+ CbArrayView Responses = ResponseObject["Result"sv].AsArrayView();
+ CHECK(Responses.Num() == ChunkRequests.size());
+ int Index = 0;
+ for (CbFieldView RequestResultView : Responses)
+ {
+ CbObjectView RequestResultObject = RequestResultView.AsObjectView();
+ CbFieldView RawHashField = RequestResultObject["RawHash"sv];
+ IoHash ActualRawHash = RawHashField.AsHash();
+ bool Succeeded = !RawHashField.HasError();
+
+ CacheGetChunkRequest& Request = ChunkRequests[Index++];
+ KeyData* KeyData = Request.Data->Data;
+ int ValueIndex = Request.Data->ValueIndex >= 0 ? Request.Data->ValueIndex : 0;
+ KeyData->ReceivedChunk[ValueIndex] = true;
+
+ if (KeyData->ShouldBeHit)
+ {
+ CHECK(Succeeded);
+ }
+ else if (KeyData->ForceMiss)
+ {
+ CHECK(!Succeeded);
+ }
+ if (KeyData->ShouldBeHit && Succeeded)
+ {
+ const CbAttachment* Attachment = Response.FindAttachment(ActualRawHash);
+ CompressedBuffer ActualBuffer = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer();
+ uint64_t ActualRawSize = UINT64_MAX;
+ if (ActualBuffer)
+ {
+ ActualRawSize = ActualBuffer.GetRawSize();
+ }
+ else
+ {
+ ActualRawSize = RequestResultObject["RawSize"sv].AsUInt64(UINT64_MAX);
+ }
+ CompressedBuffer ExpectedValue = KeyData->BufferValues[ValueIndex];
+ CHECK(ActualRawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash()));
+ CHECK(ActualRawSize == ExpectedValue.GetRawSize());
+
+ if (KeyData->GetRequestsData)
+ {
+ SharedBuffer Buffer = ActualBuffer.Decompress();
+ CHECK(Buffer.GetSize() == ActualRawSize);
+ uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0];
+ uint64_t ExpectedIntValue = KeyData->IntValues[ValueIndex];
+ CHECK(ActualIntValue == ExpectedIntValue);
+ }
+ }
+ }
+ }
+
+ for (KeyData& KeyData : KeyDatas)
+ {
+ if (!KeyData.UseValueAPI)
+ {
+ CHECK(KeyData.ReceivedGet);
+ for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
+ {
+ CHECK(KeyData.ReceivedChunk[ValueIndex]);
+ }
+ }
+ else
+ {
+ CHECK(KeyData.ReceivedGetValue);
+ CHECK(KeyData.ReceivedChunk[0]);
+ }
+ }
+}
+
# if ZEN_USE_EXEC
struct RemoteExecutionRequest
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 49e5896d1..8ae531720 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -227,45 +227,61 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
if (EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal) && m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, ClientResultValue))
{
- Success = true;
+ Success = true;
+ ZenContentType ContentType = ClientResultValue.Value.GetContentType();
if (AcceptType == ZenContentType::kCbPackage)
{
- CbPackage Package;
- uint32_t MissingCount = 0;
+ if (ContentType == ZenContentType::kCbObject)
+ {
+ CbPackage Package;
+ uint32_t MissingCount = 0;
- CbObjectView CacheRecord(ClientResultValue.Value.Data());
- CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) {
- if (SkipData)
- {
- MissingCount += m_CidStore.ContainsChunk(AttachmentHash.AsHash()) ? 0 : 1;
- }
- else
- {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ CbObjectView CacheRecord(ClientResultValue.Value.Data());
+ CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) {
+ if (SkipData)
{
- Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash()))
+ {
+ MissingCount++;
+ }
}
else
{
- MissingCount++;
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ }
+ else
+ {
+ MissingCount++;
+ }
}
- }
- });
+ });
- Success = MissingCount == 0 || PartialRecord;
+ Success = MissingCount == 0 || PartialRecord;
- if (Success)
- {
- Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value));
+ if (Success)
+ {
+ Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value));
- BinaryWriter MemStream;
- Package.Save(MemStream);
+ BinaryWriter MemStream;
+ Package.Save(MemStream);
- ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage);
+ ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage);
+ }
+ }
+ else
+ {
+ Success = false;
}
}
+ else if (AcceptType != ClientResultValue.Value.GetContentType() && AcceptType != ZenContentType::kUnknownContentType &&
+ AcceptType != ZenContentType::kBinary)
+ {
+ Success = false;
+ }
}
if (Success)
@@ -277,13 +293,13 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
ToString(ClientResultValue.Value.GetContentType()));
m_CacheStats.HitCount++;
- if (SkipData && AcceptType == ZenContentType::kBinary)
+ if (SkipData && AcceptType != ZenContentType::kCbPackage && AcceptType != ZenContentType::kCbObject)
{
return Request.WriteResponse(HttpResponseCode::OK);
}
else
{
- // Other types handled SkipData when constructing the ClientResultValue
+ // kCbPackage handled SkipData when constructing the ClientResultValue, kcbObject ignores SkipData
return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value);
}
}
@@ -996,155 +1012,9 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
return PutResult::Success;
}
-#if BACKWARDS_COMPATABILITY_JAN2022
-void
-HttpStructuredCacheService::HandleRpcGetCacheRecordsLegacy(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
-{
- ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
-
- CbPackage RpcResponse;
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- CacheRecordPolicy BatchPolicy = LoadCacheRecordPolicy(Params["Policy"sv].AsObjectView());
- std::vector<CacheKey> CacheKeys;
- std::vector<IoBuffer> CacheValues;
- std::vector<size_t> UpstreamRequests;
-
- ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
-
- for (CbFieldView KeyView : Params["CacheKeys"sv])
- {
- CbObjectView KeyObject = KeyView.AsObjectView();
- CacheKeys.push_back(CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()));
- }
-
- if (CacheKeys.empty())
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest);
- }
-
- CacheValues.resize(CacheKeys.size());
-
- for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys)
- {
- ZenCacheValue CacheValue;
- uint32_t MissingCount = 0;
- uint32_t MissingReadFromUpstreamCount = 0;
-
- if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue) &&
- CacheValue.Value.GetContentType() == ZenContentType::kCbObject)
- {
- CbObjectView CacheRecord(CacheValue.Value.Data());
- CacheRecord.IterateAttachments(
- [this, &MissingCount, &MissingReadFromUpstreamCount, &RpcResponse, &BatchPolicy](CbFieldView AttachmentHash) {
- CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy();
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal))
- {
- // A value that is requested without the Query flag (such as None/Disable) does not count as missing, because we
- // didn't ask for it and thus the record is complete in its absence.
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- MissingReadFromUpstreamCount++;
- MissingCount++;
- }
- }
- else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
- {
- if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash()))
- {
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- MissingReadFromUpstreamCount++;
- }
- MissingCount++;
- }
- }
- else
- {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
- {
- ZEN_ASSERT(Chunk.GetSize() > 0);
- RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
- }
- else
- {
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- MissingReadFromUpstreamCount++;
- }
- MissingCount++;
- }
- }
- });
- }
-
- // Searching upstream is not implemented in this legacy support function
- if (CacheValue.Value && (MissingCount == 0 || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)))
- {
- ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL) {}",
- Key.Bucket,
- Key.Hash,
- NiceBytes(CacheValue.Value.Size()),
- ToString(CacheValue.Value.GetContentType()),
- MissingCount ? "(PARTIAL)" : ""sv);
-
- CacheValues[KeyIndex] = std::move(CacheValue.Value);
- m_CacheStats.HitCount++;
- }
- else
- {
- if (!EnumHasAnyFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::Query))
- {
- // If they requested no query, do not record this as a miss
- ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash);
- }
- else
- {
- ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAL)"sv : ""sv);
- m_CacheStats.MissCount++;
- }
- }
-
- ++KeyIndex;
- }
-
- CbObjectWriter ResponseObject;
-
- ResponseObject.BeginArray("Result"sv);
- for (const IoBuffer& Value : CacheValues)
- {
- if (Value)
- {
- CbObjectView Record(Value.Data());
- ResponseObject << Record;
- }
- else
- {
- ResponseObject.AddNull();
- }
- }
- ResponseObject.EndArray();
-
- RpcResponse.SetObject(ResponseObject.Save());
-
- BinaryWriter MemStream;
- RpcResponse.Save(MemStream);
-
- Request.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
-}
-#endif
-
void
HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest)
{
-#if BACKWARDS_COMPATABILITY_JAN2022
- // Backwards compatability;
- if (RpcRequest["Params"sv].AsObjectView()["CacheKeys"sv])
- {
- return HandleRpcGetCacheRecordsLegacy(HttpRequest, RpcRequest);
- }
-#endif
ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
@@ -1527,7 +1397,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
{
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kBinary, .Key = Key});
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Key = Key});
}
Results.push_back(Succeeded);
ZEN_DEBUG("PUTCACHEVALUES - '{}/{}' {}, '{}'", Key.Bucket, Key.Hash, NiceBytes(TransferredSize), Succeeded ? "Added"sv : "Invalid");
@@ -1559,13 +1429,6 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
void
HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest)
{
-#if BACKWARDS_COMPATABILITY_JAN2022
- if (RpcRequest["Params"sv].AsObjectView()["ChunkRequests"])
- {
- return HandleRpcGetCacheChunks(HttpRequest, RpcRequest);
- }
-#endif
-
ZEN_TRACE_CPU("Z$::RpcGetCacheValues");
CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
@@ -1615,7 +1478,8 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
}
if (!Result && EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
{
- GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kBinary);
+ GetUpstreamCacheResult UpstreamResult =
+ m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kCompressedBinary);
if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType()))
{
Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value));
@@ -1690,466 +1554,469 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
}
-namespace GetCacheChunks::detail {
+namespace cache::detail {
- struct ValueData
+ struct RecordValue
{
Oid ValueId;
IoHash ContentId;
uint64_t RawSize;
};
- struct KeyRequestData
- {
- CacheKeyRequest Upstream;
- IoBuffer CacheValue;
- std::vector<ValueData> Values;
- CachePolicy DownstreamRecordPolicy;
- CachePolicy DownstreamPolicy;
- std::string_view Source;
- bool Exists = false;
- bool HasRequest = false;
- bool HasRecordRequest = false;
- bool HasValueRequest = false;
- bool ValuesRead = false;
+ struct RecordBody
+ {
+ IoBuffer CacheValue;
+ std::vector<RecordValue> Values;
+ std::string_view Source;
+ CachePolicy DownstreamPolicy;
+ bool Exists = false;
+ bool HasRequest = false;
+ bool ValuesRead = false;
};
- struct ChunkRequestData
- {
- CacheChunkRequest Upstream;
- KeyRequestData* KeyRequest;
- size_t KeyRequestIndex;
- CachePolicy DownstreamPolicy;
- CompressedBuffer Value;
- std::string_view Source;
- uint64_t TotalSize = 0;
- bool Exists = false;
- bool IsRecordRequest = false;
- bool TotalSizeKnown = false;
+ struct ChunkRequest
+ {
+ CacheChunkRequest* Key = nullptr;
+ RecordBody* Record = nullptr;
+ CompressedBuffer Value;
+ std::string_view Source;
+ uint64_t TotalSize = 0;
+ uint64_t RequestedSize = 0;
+ uint64_t RequestedOffset = 0;
+ CachePolicy DownstreamPolicy;
+ bool Exists = false;
+ bool TotalSizeKnown = false;
+ bool IsRecordRequest = false;
};
-} // namespace GetCacheChunks::detail
+} // namespace cache::detail
void
HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest)
{
- using namespace GetCacheChunks::detail;
+ using namespace cache::detail;
ZEN_TRACE_CPU("Z$::RpcGetCacheChunks");
- std::vector<KeyRequestData> KeyRequests;
- std::vector<ChunkRequestData> Chunks;
- BACKWARDS_COMPATABILITY_JAN2022_CODE(bool SendValueOnly = false;)
- if (!TryGetCacheChunks_Parse(KeyRequests, Chunks BACKWARDS_COMPATABILITY_JAN2022_CODE(, SendValueOnly), RpcRequest))
+ std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream
+ std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests
+ std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream
+ std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest
+ std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key
+ std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key
+ std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream
+
+ // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests
+ if (!ParseGetCacheChunksRequest(RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
{
return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
}
- GetCacheChunks_LoadKeys(KeyRequests);
- GetCacheChunks_LoadChunks(Chunks);
- GetCacheChunks_SendResults(Chunks, HttpRequest BACKWARDS_COMPATABILITY_JAN2022_CODE(, SendValueOnly));
+
+ // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we
+ // have it locally, and otherwise append a request for the payload to UpstreamChunks
+ GetLocalCacheRecords(RecordKeys, Records, RecordRequests, UpstreamChunks);
+
+ // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks
+ GetLocalCacheValues(ValueRequests, UpstreamChunks);
+
+ // Call GetCacheChunks on the upstream for any payloads we do not have locally
+ GetUpstreamCacheChunks(UpstreamChunks, RequestKeys, Requests);
+
+ // Send the payload and descriptive data about each chunk to the client
+ WriteGetCacheChunksResponse(Requests, HttpRequest);
}
bool
-HttpStructuredCacheService::TryGetCacheChunks_Parse(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests,
- std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks,
- BACKWARDS_COMPATABILITY_JAN2022_CODE(bool& SendValueOnly, ) CbObjectView RpcRequest)
+HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::RecordBody>& Records,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ CbObjectView RpcRequest)
{
- using namespace GetCacheChunks::detail;
+ using namespace cache::detail;
-#if BACKWARDS_COMPATABILITY_JAN2022
- SendValueOnly = RpcRequest["MethodVersion"sv].AsInt32() < 1;
-#else
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv);
-#endif
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
+ CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
+ size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num());
+
+ // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed,
+ // we will need to change the pointers to indexes to handle reallocations.
+ RecordKeys.reserve(NumRequests);
+ Records.reserve(NumRequests);
+ RequestKeys.reserve(NumRequests);
+ Requests.reserve(NumRequests);
+ RecordRequests.reserve(NumRequests);
+ ValueRequests.reserve(NumRequests);
+
+ CacheKeyRequest* PreviousRecordKey = nullptr;
+ RecordBody* PreviousRecord = nullptr;
- KeyRequestData* PreviousKeyRequest = nullptr;
- CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
- Chunks.reserve(ChunkRequestsArray.Num());
for (CbFieldView RequestView : ChunkRequestsArray)
{
- ChunkRequestData& Chunk = Chunks.emplace_back();
- CbObjectView RequestObject = RequestView.AsObjectView();
+ CbObjectView RequestObject = RequestView.AsObjectView();
+ CacheChunkRequest& RequestKey = RequestKeys.emplace_back();
+ ChunkRequest& Request = Requests.emplace_back();
+ Request.Key = &RequestKey;
CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
CbFieldView HashField = KeyObject["Hash"sv];
- Chunk.Upstream.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash());
- if (Chunk.Upstream.Key.Bucket.empty() || HashField.HasError())
+ RequestKey.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash());
+ if (RequestKey.Key.Bucket.empty() || HashField.HasError())
{
ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest.");
return false;
}
- KeyRequestData* KeyRequest = nullptr;
- if (!PreviousKeyRequest || PreviousKeyRequest->Upstream.Key < Chunk.Upstream.Key)
- {
- KeyRequest = &KeyRequests.emplace_back();
- KeyRequest->Upstream.Key = Chunk.Upstream.Key;
- PreviousKeyRequest = KeyRequest;
- }
- else if (!(Chunk.Upstream.Key < PreviousKeyRequest->Upstream.Key))
+ RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash();
+ RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId();
+ RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64();
+ RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX);
+ Request.RequestedSize = RequestKey.RawSize;
+ Request.RequestedOffset = RequestKey.RawOffset;
+ std::string_view PolicyText = RequestObject["Policy"sv].AsString();
+ Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
+ Request.IsRecordRequest = (bool)RequestKey.ValueId;
+
+ if (!Request.IsRecordRequest)
{
- KeyRequest = PreviousKeyRequest;
+ ValueRequests.push_back(&Request);
}
else
{
- ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.",
- Chunk.Upstream.Key.Bucket,
- Chunk.Upstream.Key.Hash,
- PreviousKeyRequest->Upstream.Key.Bucket,
- PreviousKeyRequest->Upstream.Key.Hash);
- return false;
- }
- Chunk.KeyRequestIndex = std::distance(KeyRequests.data(), KeyRequest);
-
- Chunk.Upstream.ChunkId = RequestObject["ChunkId"sv].AsHash();
- Chunk.Upstream.ValueId = RequestObject["ValueId"sv].AsObjectId();
- Chunk.Upstream.RawOffset = RequestObject["RawOffset"sv].AsUInt64();
- Chunk.Upstream.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX);
- std::string_view PolicyText = RequestObject["Policy"sv].AsString();
- Chunk.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
-#if BACKWARDS_COMPATABILITY_JAN2022
- if (SendValueOnly)
- {
- Chunk.DownstreamPolicy = Chunk.DownstreamPolicy & (~CachePolicy::SkipData);
- }
-#endif
- Chunk.IsRecordRequest = (bool)Chunk.Upstream.ValueId;
+ RecordRequests.push_back(&Request);
+ CacheKeyRequest* RecordKey = nullptr;
+ RecordBody* Record = nullptr;
- if (!Chunk.IsRecordRequest || Chunk.Upstream.ChunkId == IoHash::Zero)
- {
- KeyRequest->DownstreamPolicy =
- KeyRequest->HasRequest ? Union(KeyRequest->DownstreamPolicy, Chunk.DownstreamPolicy) : Chunk.DownstreamPolicy;
- KeyRequest->HasRequest = true;
- (Chunk.IsRecordRequest ? KeyRequest->HasRecordRequest : KeyRequest->HasValueRequest) = true;
+ if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key)
+ {
+ RecordKey = &RecordKeys.emplace_back();
+ PreviousRecordKey = RecordKey;
+ Record = &Records.emplace_back();
+ PreviousRecord = Record;
+ RecordKey->Key = RequestKey.Key;
+ }
+ else if (RequestKey.Key == PreviousRecordKey->Key)
+ {
+ RecordKey = PreviousRecordKey;
+ Record = PreviousRecord;
+ }
+ else
+ {
+ ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.",
+ RequestKey.Key.Bucket,
+ RequestKey.Key.Hash,
+ PreviousRecordKey->Key.Bucket,
+ PreviousRecordKey->Key.Hash);
+ return false;
+ }
+ Request.Record = Record;
+ if (RequestKey.ChunkId == RequestKey.ChunkId.Zero)
+ {
+ Record->DownstreamPolicy =
+ Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy;
+ Record->HasRequest = true;
+ }
}
}
- if (Chunks.empty())
+ if (Requests.empty())
{
return false;
}
- for (ChunkRequestData& Chunk : Chunks)
- {
- Chunk.KeyRequest = &KeyRequests[Chunk.KeyRequestIndex];
- }
return true;
}
void
-HttpStructuredCacheService::GetCacheChunks_LoadKeys(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests)
+HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::RecordBody>& Records,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks)
{
- using namespace GetCacheChunks::detail;
+ using namespace cache::detail;
std::vector<CacheKeyRequest*> UpstreamRecordRequests;
- std::vector<KeyRequestData*> UpstreamValueRequests;
- for (KeyRequestData& KeyRequest : KeyRequests)
+ for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
{
- if (KeyRequest.HasRequest)
+ CacheKeyRequest& RecordKey = RecordKeys[RecordIndex];
+ RecordBody& Record = Records[RecordIndex];
+ if (Record.HasRequest)
{
- if (KeyRequest.HasRecordRequest)
- {
- KeyRequest.DownstreamRecordPolicy = KeyRequest.DownstreamPolicy | CachePolicy::SkipData | CachePolicy::SkipMeta;
- }
+ Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta;
- if (!KeyRequest.Exists && EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::QueryLocal))
+ if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal))
{
- // There's currently no interface for checking only whether a CacheValue exists without loading it,
- // so we load it here even if SkipData is true and its a CacheValue request.
ZenCacheValue CacheValue;
- if (m_CacheStore.Get(KeyRequest.Upstream.Key.Bucket, KeyRequest.Upstream.Key.Hash, CacheValue))
+ if (m_CacheStore.Get(RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue))
{
- KeyRequest.Exists = true;
- KeyRequest.CacheValue = std::move(CacheValue.Value);
- KeyRequest.Source = "LOCAL"sv;
+ Record.Exists = true;
+ Record.CacheValue = std::move(CacheValue.Value);
+ Record.Source = "LOCAL"sv;
}
}
- if (!KeyRequest.Exists)
+ if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote))
{
- // At most one of RecordRequest or ValueRequest will succeed for the upstream request of the key a given key, but we don't
- // know which,
- // and if the requests (from arbitrary Unreal Class code) includes both types of request for a key, we want to ask for both
- // kinds and pass the request that uses the one that succeeds.
- if (KeyRequest.HasRecordRequest && EnumHasAllFlags(KeyRequest.DownstreamRecordPolicy, CachePolicy::QueryRemote))
- {
- KeyRequest.Upstream.Policy = CacheRecordPolicy(ConvertToUpstream(KeyRequest.DownstreamRecordPolicy));
- UpstreamRecordRequests.push_back(&KeyRequest.Upstream);
- }
- if (KeyRequest.HasValueRequest && EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::QueryRemote))
- {
- UpstreamValueRequests.push_back(&KeyRequest);
- }
+ RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy));
+ UpstreamRecordRequests.push_back(&RecordKey);
}
}
}
if (!UpstreamRecordRequests.empty())
{
- const auto OnCacheRecordGetComplete = [this](CacheRecordGetCompleteParams&& Params) {
+ const auto OnCacheRecordGetComplete = [this, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) {
if (!Params.Record)
{
return;
}
+ CacheKeyRequest& RecordKey = Params.Request;
+ size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
+ RecordBody& Record = Records[RecordIndex];
- KeyRequestData& KeyRequest =
- *reinterpret_cast<KeyRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(KeyRequestData, Upstream));
- const CacheKey& Key = KeyRequest.Upstream.Key;
- KeyRequest.Exists = true;
+ const CacheKey& Key = RecordKey.Key;
+ Record.Exists = true;
CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- KeyRequest.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- KeyRequest.CacheValue.SetContentType(ZenContentType::kCbObject);
- KeyRequest.Source = "UPSTREAM"sv;
+ Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
+ Record.CacheValue.SetContentType(ZenContentType::kCbObject);
+ Record.Source = "UPSTREAM"sv;
- if (EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::StoreLocal))
+ if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal))
{
- m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = KeyRequest.CacheValue});
+ m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = Record.CacheValue});
}
};
m_UpstreamCache.GetCacheRecords(UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
}
- if (!UpstreamValueRequests.empty())
- {
- for (KeyRequestData* KeyRequestPtr : UpstreamValueRequests)
- {
- KeyRequestData& KeyRequest = *KeyRequestPtr;
- CacheKey& Key = KeyRequest.Upstream.Key;
- GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kBinary);
- if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType()))
- {
- CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value));
- if (Result)
- {
- KeyRequest.CacheValue = std::move(UpstreamResult.Value);
- KeyRequest.CacheValue.SetContentType(ZenContentType::kCompressedBinary);
- KeyRequest.Exists = true;
- KeyRequest.Source = "UPSTREAM"sv;
- // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement
- // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive
- // for us to keep the data only on the upstream server.
- // if (EnumHasAllFlags(KeyRequest->DownstreamValuePolicy, CachePolicy::StoreLocal))
- {
- m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = KeyRequest.CacheValue});
- }
- }
- }
- }
- }
-}
-
-void
-HttpStructuredCacheService::GetCacheChunks_LoadChunks(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks)
-{
- using namespace GetCacheChunks::detail;
-
std::vector<CacheChunkRequest*> UpstreamPayloadRequests;
- for (ChunkRequestData& Chunk : Chunks)
+ for (ChunkRequest* Request : RecordRequests)
{
- if (Chunk.IsRecordRequest)
+ if (Request->Key->ChunkId == IoHash::Zero)
{
- if (Chunk.Upstream.ChunkId == IoHash::Zero)
+ // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
+ // is missing, parse the cache record and try to find the raw hash from the ValueId.
+ RecordBody& Record = *Request->Record;
+ if (!Record.ValuesRead)
{
- // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
- // is missing, parse the cache record and try to find the raw hash from the ValueId.
- KeyRequestData& KeyRequest = *Chunk.KeyRequest;
- if (!KeyRequest.ValuesRead)
+ Record.ValuesRead = true;
+ if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject)
{
- KeyRequest.ValuesRead = true;
- if (KeyRequest.CacheValue && KeyRequest.CacheValue.GetContentType() == ZenContentType::kCbObject)
+ CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData());
+ CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
+ Record.Values.reserve(ValuesArray.Num());
+ for (CbFieldView ValueField : ValuesArray)
{
- CbObjectView RecordObject = CbObjectView(KeyRequest.CacheValue.GetData());
- CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
- KeyRequest.Values.reserve(ValuesArray.Num());
- for (CbFieldView ValueField : ValuesArray)
+ CbObjectView ValueObject = ValueField.AsObjectView();
+ Oid ValueId = ValueObject["Id"sv].AsObjectId();
+ CbFieldView RawHashField = ValueObject["RawHash"sv];
+ IoHash RawHash = RawHashField.AsBinaryAttachment();
+ if (ValueId && !RawHashField.HasError())
{
- CbObjectView ValueObject = ValueField.AsObjectView();
- Oid ValueId = ValueObject["Id"sv].AsObjectId();
- CbFieldView RawHashField = ValueObject["RawHash"sv];
- IoHash RawHash = RawHashField.AsBinaryAttachment();
- if (ValueId && !RawHashField.HasError())
- {
- KeyRequest.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()});
- }
+ Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()});
}
}
}
+ }
- for (const ValueData& Value : KeyRequest.Values)
+ for (const RecordValue& Value : Record.Values)
+ {
+ if (Value.ValueId == Request->Key->ValueId)
{
- if (Value.ValueId == Chunk.Upstream.ValueId)
- {
- Chunk.Upstream.ChunkId = Value.ContentId;
- Chunk.TotalSize = Value.RawSize;
- Chunk.TotalSizeKnown = true;
- break;
- }
+ Request->Key->ChunkId = Value.ContentId;
+ Request->TotalSize = Value.RawSize;
+ Request->TotalSizeKnown = true;
+ break;
}
}
+ }
- // Now load the ContentId from the local ContentIdStore or from the upstream
- if (Chunk.Upstream.ChunkId != IoHash::Zero)
+ // Now load the ContentId from the local ContentIdStore or from the upstream
+ if (Request->Key->ChunkId != IoHash::Zero)
+ {
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
{
- if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::QueryLocal))
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->TotalSizeKnown)
{
- if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData) && Chunk.TotalSizeKnown)
+ if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
{
- if (m_CidStore.ContainsChunk(Chunk.Upstream.ChunkId))
- {
- Chunk.Exists = true;
- Chunk.Source = "LOCAL"sv;
- }
+ Request->Exists = true;
+ Request->Source = "LOCAL"sv;
}
- else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Chunk.Upstream.ChunkId))
+ }
+ else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ if (Compressed)
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
- if (Compressed)
+ if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
- if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
- {
- Chunk.Value = Compressed;
- }
- Chunk.Exists = true;
- Chunk.TotalSize = Compressed.GetRawSize();
- Chunk.TotalSizeKnown = true;
- Chunk.Source = "LOCAL"sv;
+ Request->Value = Compressed;
}
+ Request->Exists = true;
+ Request->TotalSize = Compressed.GetRawSize();
+ Request->TotalSizeKnown = true;
+ Request->Source = "LOCAL"sv;
}
}
- if (!Chunk.Exists && EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::QueryRemote))
- {
- Chunk.Upstream.Policy = ConvertToUpstream(Chunk.DownstreamPolicy);
- UpstreamPayloadRequests.push_back(&Chunk.Upstream);
- }
+ }
+ if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
+ {
+ Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy);
+ OutUpstreamChunks.push_back(Request->Key);
}
}
- else
+ }
+}
+
+void
+HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks)
+{
+ using namespace cache::detail;
+
+ for (ChunkRequest* Request : ValueRequests)
+ {
+ if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
{
- if (Chunk.KeyRequest->Exists)
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
{
- if (Chunk.KeyRequest->CacheValue && IsCompressedBinary(Chunk.KeyRequest->CacheValue.GetContentType()))
+ if (IsCompressedBinary(CacheValue.Value.GetContentType()))
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk.KeyRequest->CacheValue));
- if (Compressed)
+ CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value));
+ if (Result)
{
- if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
+ if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
{
- Chunk.Value = Compressed;
+ Request->Value = Result;
}
- Chunk.Exists = true;
- Chunk.TotalSize = Compressed.GetRawSize();
- Chunk.TotalSizeKnown = true;
- Chunk.Source = Chunk.KeyRequest->Source;
- Chunk.Upstream.ChunkId = IoHash::FromBLAKE3(Compressed.GetRawHash());
+ Request->Key->ChunkId = IoHash::FromBLAKE3(Result.GetRawHash());
+ Request->Exists = true;
+ Request->TotalSize = Result.GetRawSize();
+ Request->TotalSizeKnown = true;
+ Request->Source = "LOCAL"sv;
}
}
}
}
+ if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
+ {
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal))
+ {
+ // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it locally
+ Request->Key->RawOffset = 0;
+ Request->Key->RawSize = UINT64_MAX;
+ }
+ OutUpstreamChunks.push_back(Request->Key);
+ }
}
+}
- if (!UpstreamPayloadRequests.empty())
+void
+HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests)
+{
+ using namespace cache::detail;
+
+ if (!UpstreamChunks.empty())
{
- const auto OnCacheValueGetComplete = [this](CacheValueGetCompleteParams&& Params) {
+ const auto OnCacheValueGetComplete = [this, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) {
if (Params.RawHash == Params.RawHash.Zero)
{
return;
}
- ChunkRequestData& Chunk =
- *reinterpret_cast<ChunkRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(ChunkRequestData, Upstream));
- if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::StoreLocal) ||
- !EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
+ CacheChunkRequest& Key = Params.Request;
+ size_t RequestIndex = std::distance(RequestKeys.data(), &Key);
+ ChunkRequest& Request = Requests[RequestIndex];
+ if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) ||
+ !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
{
CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value));
- if (!Compressed || Compressed.GetRawSize() != Params.RawSize)
+ if (!Compressed || Compressed.GetRawSize() != Params.RawSize ||
+ IoHash::FromBLAKE3(Compressed.GetRawHash()) != Params.RawHash)
{
return;
}
- if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::StoreLocal))
+ if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal))
{
- m_CidStore.AddChunk(Compressed);
+ if (Request.IsRecordRequest)
+ {
+ m_CidStore.AddChunk(Compressed);
+ }
+ else
+ {
+ m_CacheStore.Put(Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value});
+ }
}
- if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
+ if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
{
- Chunk.Value = std::move(Compressed);
+ Request.Value = std::move(Compressed);
}
}
- Chunk.Exists = true;
- Chunk.TotalSize = Params.RawSize;
- Chunk.TotalSizeKnown = true;
- Chunk.Source = "UPSTREAM"sv;
+ Key.ChunkId = Params.RawHash;
+ Request.Exists = true;
+ Request.TotalSize = Params.RawSize;
+ Request.TotalSizeKnown = true;
+ Request.Source = "UPSTREAM"sv;
m_CacheStats.UpstreamHitCount++;
};
- m_UpstreamCache.GetCacheValues(UpstreamPayloadRequests, std::move(OnCacheValueGetComplete));
+ m_UpstreamCache.GetCacheValues(UpstreamChunks, std::move(OnCacheValueGetComplete));
}
}
void
-HttpStructuredCacheService::GetCacheChunks_SendResults(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks,
- zen::HttpServerRequest& HttpRequest
- BACKWARDS_COMPATABILITY_JAN2022_CODE(, bool SendValueOnly))
+HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests,
+ zen::HttpServerRequest& HttpRequest)
{
- using namespace GetCacheChunks::detail;
+ using namespace cache::detail;
CbPackage RpcResponse;
CbObjectWriter Writer;
Writer.BeginArray("Result"sv);
- for (ChunkRequestData& Chunk : Chunks)
+ for (ChunkRequest& Request : Requests)
{
-#if BACKWARDS_COMPATABILITY_JAN2022
- if (SendValueOnly)
- {
- if (Chunk.Value)
- {
- Writer << Chunk.Upstream.ChunkId;
- RpcResponse.AddAttachment(CbAttachment(Chunk.Value));
- }
- else
- {
- Writer << IoHash::Zero;
- }
- }
- else
-#endif
+ Writer.BeginObject();
{
- Writer.BeginObject();
+ if (Request.Exists)
{
- if (Chunk.Exists)
+ Writer.AddHash("RawHash"sv, Request.Key->ChunkId);
+ if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
{
- Writer.AddHash("RawHash"sv, Chunk.Upstream.ChunkId);
- if (Chunk.Value && !EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
- {
- RpcResponse.AddAttachment(CbAttachment(Chunk.Value));
- }
- else
- {
- Writer.AddInteger("RawSize"sv, Chunk.TotalSize);
- }
-
- ZEN_DEBUG("CHUNKHIT - '{}/{}/{}' {} '{}' ({})",
- Chunk.Upstream.Key.Bucket,
- Chunk.Upstream.Key.Hash,
- Chunk.Upstream.ValueId,
- NiceBytes(Chunk.TotalSize),
- Chunk.IsRecordRequest ? "Record"sv : "Value"sv,
- Chunk.Source);
- m_CacheStats.HitCount++;
- }
- else if (!EnumHasAnyFlags(Chunk.DownstreamPolicy, CachePolicy::Query))
- {
- ZEN_DEBUG("CHUNKSKIP - '{}/{}/{}'", Chunk.Upstream.Key.Bucket, Chunk.Upstream.Key.Hash, Chunk.Upstream.ValueId);
+ RpcResponse.AddAttachment(CbAttachment(Request.Value));
}
else
{
- ZEN_DEBUG("MISS - '{}/{}/{}'", Chunk.Upstream.Key.Bucket, Chunk.Upstream.Key.Hash, Chunk.Upstream.ValueId);
- m_CacheStats.MissCount++;
+ Writer.AddInteger("RawSize"sv, Request.TotalSize);
}
+
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
+ Request.Key->Key.Bucket,
+ Request.Key->Key.Hash,
+ Request.Key->ValueId,
+ NiceBytes(Request.TotalSize),
+ Request.IsRecordRequest ? "Record"sv : "Value"sv,
+ Request.Source);
+ m_CacheStats.HitCount++;
+ }
+ else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query))
+ {
+ ZEN_DEBUG("SKIP - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId);
+ }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId);
+ m_CacheStats.MissCount++;
}
- Writer.EndObject();
}
+ Writer.EndObject();
}
Writer.EndArray();
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 14b001e48..00c4260aa 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -11,15 +11,14 @@
#include <memory>
#include <vector>
-// Include the define for BACKWARDS_COMPATABILITY_JAN2022
-#include <zenutil/cache/cachepolicy.h>
-
namespace spdlog {
class logger;
}
namespace zen {
+struct CacheChunkRequest;
+struct CacheKeyRequest;
class CasStore;
class CidStore;
class CbObjectView;
@@ -29,10 +28,10 @@ class UpstreamCache;
class ZenCacheStore;
enum class CachePolicy : uint32_t;
-namespace GetCacheChunks::detail {
- struct KeyRequestData;
- struct ChunkRequestData;
-} // namespace GetCacheChunks::detail
+namespace cache::detail {
+ struct RecordBody;
+ struct ChunkRequest;
+} // namespace cache::detail
/**
* Structured cache service. Imposes constraints on keys, supports blobs and
@@ -108,25 +107,36 @@ private:
void HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
void HandleRpcRequest(zen::HttpServerRequest& Request);
void HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest);
-#if BACKWARDS_COMPATABILITY_JAN2022
- void HandleRpcGetCacheRecordsLegacy(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
-#endif
- void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
- void HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest);
- void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
- void HandleRpcGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
- void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
- virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
- virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;
- PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
-
- bool TryGetCacheChunks_Parse(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests,
- std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks,
- BACKWARDS_COMPATABILITY_JAN2022_CODE(bool& SendValueOnly, ) CbObjectView RpcRequest);
- void GetCacheChunks_LoadKeys(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests);
- void GetCacheChunks_LoadChunks(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks);
- void GetCacheChunks_SendResults(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks,
- zen::HttpServerRequest& HttpRequest BACKWARDS_COMPATABILITY_JAN2022_CODE(, bool SendValueOnly));
+ void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
+ void HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest);
+ void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
+ void HandleRpcGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
+ void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
+ virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
+ virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;
+ PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
+
+ /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
+ bool ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::RecordBody>& Records,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ CbObjectView RpcRequest);
+ /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */
+ void GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::RecordBody>& Records,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks);
+ /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */
+ void GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks);
+ /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */
+ void GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests);
+ /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
+ void WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests, zen::HttpServerRequest& HttpRequest);
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 5990536a9..da0743f0a 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -177,6 +177,43 @@ namespace detail {
{
Result = Session.GetDerivedData(CacheKey.Bucket, CacheKey.Hash);
}
+ else if (Type == ZenContentType::kCompressedBinary)
+ {
+ Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
+
+ if (Result.Success)
+ {
+ const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All);
+ if (Result.Success = ValidationResult == CbValidateError::None; Result.Success)
+ {
+ CbObject CacheRecord = LoadCompactBinaryObject(Result.Response);
+ IoBuffer ContentBuffer;
+ int NumAttachments = 0;
+
+ CacheRecord.IterateAttachments(
+ [&Session, &Result, &ContentBuffer, &NumAttachments](CbFieldView AttachmentHash) {
+ CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
+ Result.Bytes += AttachmentResult.Bytes;
+ Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
+ Result.ErrorCode = AttachmentResult.ErrorCode;
+
+ if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
+ {
+ Result.Response = AttachmentResult.Response;
+ ++NumAttachments;
+ }
+ else
+ {
+ Result.Success = false;
+ }
+ });
+ if (NumAttachments != 1)
+ {
+ Result.Success = false;
+ }
+ }
+ }
+ }
else
{
const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type;
@@ -402,14 +439,45 @@ namespace detail {
.ElapsedSeconds = Result.ElapsedSeconds,
.Success = Result.Success};
}
- else
+ else if (CacheRecord.Type == ZenContentType::kCompressedBinary)
{
- int64_t TotalBytes = 0ull;
- double TotalElapsedSeconds = 0.0;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue));
+ if (!Compressed)
+ {
+ return {.Reason = std::string("Invalid compressed value buffer"), .Success = false};
+ }
- const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
- for (const IoHash& ValueContentId : ValueContentIds)
- {
+ IoHash RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
+
+ CbObjectWriter ReferencingObject;
+ ReferencingObject.AddBinaryAttachment("RawHash", RawHash);
+ ReferencingObject.AddInteger("RawSize", Compressed.GetRawSize());
+
+ return PerformStructuredPut(
+ Session,
+ CacheRecord.Key,
+ ReferencingObject.Save().GetBuffer().AsIoBuffer(),
+ MaxAttempts,
+ [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) {
+ if (ValueContentId != RawHash)
+ {
+ OutReason =
+ fmt::format("Value '{}' MISMATCHED from compressed buffer raw hash {}", ValueContentId, RawHash);
+ return false;
+ }
+
+ OutBuffer = RecordValue;
+ return true;
+ });
+ }
+ else
+ {
+ return PerformStructuredPut(
+ Session,
+ CacheRecord.Key,
+ RecordValue,
+ MaxAttempts,
+ [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) {
const auto It =
std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId);
@@ -421,131 +489,145 @@ namespace detail {
const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It);
- CloudCacheResult BlobResult;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
- {
- BlobResult = Session.PutCompressedBlob(CacheRecord.ValueContentIds[Idx], Values[Idx]);
- }
+ OutBuffer = Values[Idx];
+ return true;
+ });
+ }
+ }
+ catch (std::exception& Err)
+ {
+ m_Status.Set(UpstreamEndpointState::kError, Err.what());
- m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
+ return {.Reason = std::string(Err.what()), .Success = false};
+ }
+ }
- if (!BlobResult.Success)
- {
- OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason);
- return false;
- }
+ virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
- TotalBytes += BlobResult.Bytes;
- TotalElapsedSeconds += BlobResult.ElapsedSeconds;
- }
+ private:
+ static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out)
+ {
+ Out.Success &= Result.Success;
+ Out.Bytes += Result.Bytes;
+ Out.ElapsedSeconds += Result.ElapsedSeconds;
- return true;
- };
+ if (Result.ErrorCode)
+ {
+ Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)};
+ }
+ };
+
+ PutUpstreamCacheResult PerformStructuredPut(
+ CloudCacheSession& Session,
+ const CacheKey& Key,
+ IoBuffer ObjectBuffer,
+ const int32_t MaxAttempts,
+ std::function<bool(const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason)>&& BlobFetchFn)
+ {
+ int64_t TotalBytes = 0ull;
+ double TotalElapsedSeconds = 0.0;
- PutRefResult RefResult;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++)
+ const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool {
+ for (const IoHash& ValueContentId : ValueContentIds)
+ {
+ IoBuffer BlobBuffer;
+ if (!BlobFetchFn(ValueContentId, BlobBuffer, OutReason))
{
- RefResult = Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kCbObject);
+ return false;
}
- m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason);
-
- if (!RefResult.Success)
+ CloudCacheResult BlobResult;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
{
- return {.Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- RefResult.Reason),
- .Success = false};
+ BlobResult = Session.PutCompressedBlob(ValueContentId, BlobBuffer);
}
- TotalBytes += RefResult.Bytes;
- TotalElapsedSeconds += RefResult.ElapsedSeconds;
+ m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
- std::string Reason;
- if (!PutBlobs(RefResult.Needs, Reason))
+ if (!BlobResult.Success)
{
- return {.Reason = std::move(Reason), .Success = false};
+ OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason);
+ return false;
}
- const IoHash RefHash = IoHash::HashBuffer(RecordValue);
- FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash);
-
- m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
+ TotalBytes += BlobResult.Bytes;
+ TotalElapsedSeconds += BlobResult.ElapsedSeconds;
+ }
- if (!FinalizeResult.Success)
- {
- return {.Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- FinalizeResult.Reason),
- .Success = false};
- }
+ return true;
+ };
- if (!FinalizeResult.Needs.empty())
- {
- if (!PutBlobs(FinalizeResult.Needs, Reason))
- {
- return {.Reason = std::move(Reason), .Success = false};
- }
+ PutRefResult RefResult;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++)
+ {
+ RefResult = Session.PutRef(Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject);
+ }
- FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash);
+ m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason);
- m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
+ if (!RefResult.Success)
+ {
+ return {.Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, RefResult.Reason),
+ .Success = false};
+ }
- if (!FinalizeResult.Success)
- {
- return {.Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- FinalizeResult.Reason),
- .Success = false};
- }
+ TotalBytes += RefResult.Bytes;
+ TotalElapsedSeconds += RefResult.ElapsedSeconds;
- if (!FinalizeResult.Needs.empty())
- {
- ExtendableStringBuilder<256> Sb;
- for (const IoHash& MissingHash : FinalizeResult.Needs)
- {
- Sb << MissingHash.ToHexString() << ",";
- }
+ std::string Reason;
+ if (!PutBlobs(RefResult.Needs, Reason))
+ {
+ return {.Reason = std::move(Reason), .Success = false};
+ }
- return {.Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'",
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
- Sb.ToString()),
- .Success = false};
- }
- }
+ const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer);
+ FinalizeRefResult FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash);
- TotalBytes += FinalizeResult.Bytes;
- TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+ m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
- return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true};
- }
+ if (!FinalizeResult.Success)
+ {
+ return {
+ .Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, FinalizeResult.Reason),
+ .Success = false};
}
- catch (std::exception& Err)
+
+ if (!FinalizeResult.Needs.empty())
{
- m_Status.Set(UpstreamEndpointState::kError, Err.what());
+ if (!PutBlobs(FinalizeResult.Needs, Reason))
+ {
+ return {.Reason = std::move(Reason), .Success = false};
+ }
- return {.Reason = std::string(Err.what()), .Success = false};
- }
- }
+ FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash);
- virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
+ m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason);
- private:
- static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out)
- {
- Out.Success &= Result.Success;
- Out.Bytes += Result.Bytes;
- Out.ElapsedSeconds += Result.ElapsedSeconds;
+ if (!FinalizeResult.Success)
+ {
+ return {.Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, FinalizeResult.Reason),
+ .Success = false};
+ }
- if (Result.ErrorCode)
- {
- Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)};
+ if (!FinalizeResult.Needs.empty())
+ {
+ ExtendableStringBuilder<256> Sb;
+ for (const IoHash& MissingHash : FinalizeResult.Needs)
+ {
+ Sb << MissingHash.ToHexString() << ",";
+ }
+
+ return {
+ .Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'", Key.Bucket, Key.Hash, Sb.ToString()),
+ .Success = false};
+ }
}
- };
+
+ TotalBytes += FinalizeResult.Bytes;
+ TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+
+ return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true};
+ }
spdlog::logger& Log() { return m_Log; }
@@ -776,9 +858,6 @@ namespace detail {
BatchRequest << "Method"sv
<< "GetCacheChunks";
-#if BACKWARDS_COMPATABILITY_JAN2022
- BatchRequest.AddInteger("MethodVersion"sv, 1);
-#endif
BatchRequest.BeginObject("Params"sv);
{
CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy;
@@ -921,7 +1000,7 @@ namespace detail {
}
else
{
- return {.Reason = std::string("invalid value buffer"), .Success = false};
+ return {.Reason = std::string("Invalid value buffer"), .Success = false};
}
}
@@ -939,6 +1018,56 @@ namespace detail {
TotalBytes = Result.Bytes;
TotalElapsedSeconds = Result.ElapsedSeconds;
}
+ else if (CacheRecord.Type == ZenContentType::kCompressedBinary)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue));
+ if (!Compressed)
+ {
+ return {.Reason = std::string("Invalid value compressed buffer"), .Success = false};
+ }
+
+ CbPackage BatchPackage;
+ CbObjectWriter BatchWriter;
+ BatchWriter << "Method"sv
+ << "PutCacheValues";
+
+ BatchWriter.BeginObject("Params"sv);
+ {
+ // DefaultPolicy unspecified and expected to be Default
+
+ BatchWriter.BeginArray("Requests"sv);
+ {
+ BatchWriter.BeginObject();
+ {
+ const CacheKey& Key = CacheRecord.Key;
+ BatchWriter.BeginObject("Key"sv);
+ {
+ BatchWriter << "Bucket"sv << Key.Bucket;
+ BatchWriter << "Hash"sv << Key.Hash;
+ }
+ BatchWriter.EndObject();
+ // Policy unspecified and expected to be Default
+ BatchWriter.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Compressed.GetRawHash()));
+ BatchPackage.AddAttachment(CbAttachment(Compressed));
+ }
+ BatchWriter.EndObject();
+ }
+ BatchWriter.EndArray();
+ }
+ BatchWriter.EndObject();
+ BatchPackage.SetObject(BatchWriter.Save());
+
+ Result.Success = false;
+ for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.InvokeRpc(BatchPackage);
+ }
+
+ m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
+
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+ }
else
{
for (size_t Idx = 0, Count = Values.size(); Idx < Count; Idx++)
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index 0570dd316..1ac4afe5c 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -3,10 +3,13 @@
#include "zen.h"
#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
#include <zencore/session.h>
#include <zencore/stream.h>
+#include <zenhttp/httpcommon.h>
+#include <zenhttp/httpshared.h>
#include "cache/structuredcachestore.h"
#include "diag/formatters.h"
@@ -413,11 +416,7 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas
cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetHeader(cpr::Header{{"Accept",
- Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg"
- : Type == ZenContentType::kCbObject ? "application/x-ue-cb"
- : "application/octet-stream"}});
-
+ Session.SetHeader(cpr::Header{{"Accept", std::string{MapContentTypeToString(Type)}}});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -539,4 +538,32 @@ ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request)
return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
}
+ZenCacheResult
+ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request)
+{
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client.ServiceUrl() << "/z$/$rpc";
+
+ SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten();
+
+ cpr::Session& Session = m_SessionState->GetSession();
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}});
+ Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+
+ return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+}
+
} // namespace zen
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index bc8fd3c56..f70d9d06f 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -133,6 +133,7 @@ public:
ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type);
ZenCacheResult PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload);
ZenCacheResult InvokeRpc(const CbObjectView& Request);
+ ZenCacheResult InvokeRpc(const CbPackage& Package);
private:
inline spdlog::logger& Log() { return m_Log; }
diff --git a/zenstore/basicfile.cpp b/zenstore/basicfile.cpp
index dcd9a8575..895db6cee 100644
--- a/zenstore/basicfile.cpp
+++ b/zenstore/basicfile.cpp
@@ -72,7 +72,7 @@ BasicFile::Open(std::filesystem::path FileName, bool IsCreate, std::error_code&
return;
}
#else
- int OpenFlags = O_RDWR;
+ int OpenFlags = O_RDWR | O_CLOEXEC;
OpenFlags |= IsCreate ? O_CREAT | O_TRUNC : 0;
int Fd = open(FileName.c_str(), OpenFlags, 0666);
@@ -81,6 +81,10 @@ BasicFile::Open(std::filesystem::path FileName, bool IsCreate, std::error_code&
Ec = zen::MakeErrorCodeFromLastError();
return;
}
+ if (IsCreate)
+ {
+ fchmod(Fd, 0666);
+ }
void* FileHandle = (void*)(uintptr_t(Fd));
#endif
@@ -366,12 +370,13 @@ LockFile::Create(std::filesystem::path FileName, CbObject Payload, std::error_co
return;
}
#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- int Fd = open(FileName.c_str(), O_RDWR | O_CREAT, 0666);
+ int Fd = open(FileName.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, 0666);
if (Fd < 0)
{
Ec = zen::MakeErrorCodeFromLastError();
return;
}
+ fchmod(Fd, 0666);
int LockRet = flock(Fd, LOCK_EX | LOCK_NB);
if (LockRet < 0)
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index 6c137e128..758c0665b 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -388,7 +388,14 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
}
#else
// Attempt to exclusively create the file.
- auto InternalCreateFile = [&] { return open(Name.ShardedPath.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0666); };
+ auto InternalCreateFile = [&] {
+ int Fd = open(Name.ShardedPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666);
+ if (Fd >= 0)
+ {
+ fchmod(Fd, 0666);
+ }
+ return Fd;
+ };
int Fd = InternalCreateFile();
if (Fd < 0)
{
diff --git a/zenutil/cache/cachepolicy.cpp b/zenutil/cache/cachepolicy.cpp
index 8c10ea674..3bca363bb 100644
--- a/zenutil/cache/cachepolicy.cpp
+++ b/zenutil/cache/cachepolicy.cpp
@@ -122,7 +122,7 @@ ConvertToUpstream(CachePolicy Policy)
// Use the downstream value for all other flags.
CachePolicy UpstreamPolicy = CachePolicy::None;
-
+
if (EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
{
UpstreamPolicy |= CachePolicy::QueryLocal;
@@ -206,12 +206,6 @@ OptionalCacheRecordPolicy
CacheRecordPolicy::Load(const CbObjectView Object)
{
std::string_view BasePolicyText = Object["BasePolicy"sv].AsString();
-#if BACKWARDS_COMPATABILITY_JAN2022
- if (BasePolicyText.empty())
- {
- BasePolicyText = Object["DefaultValuePolicy"sv].AsString();
- }
-#endif
if (BasePolicyText.empty())
{
return {};
@@ -228,14 +222,10 @@ CacheRecordPolicy::Load(const CbObjectView Object)
return {};
}
CachePolicy Policy = ParseCachePolicy(PolicyText);
-#if BACKWARDS_COMPATABILITY_JAN2022
- Policy = Policy & CacheValuePolicy::PolicyMask;
-#else
if (EnumHasAnyFlags(Policy, ~CacheValuePolicy::PolicyMask))
{
return {};
}
-#endif
Builder.AddValuePolicy(Id, Policy);
}
diff --git a/zenutil/include/zenutil/cache/cachepolicy.h b/zenutil/include/zenutil/cache/cachepolicy.h
index efcc4fb49..9a745e42c 100644
--- a/zenutil/include/zenutil/cache/cachepolicy.h
+++ b/zenutil/include/zenutil/cache/cachepolicy.h
@@ -10,14 +10,6 @@
#include <gsl/gsl-lite.hpp>
#include <span>
-
-#define BACKWARDS_COMPATABILITY_JAN2022 1
-#if BACKWARDS_COMPATABILITY_JAN2022
-# define BACKWARDS_COMPATABILITY_JAN2022_CODE(...) __VA_ARGS__
-#else
-# define BACKWARDS_COMPATABILITY_JAN2022_CODE(...)
-#endif
-
namespace zen::Private {
class ICacheRecordPolicyShared;
}
diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp
index 5bddc72bc..f49d5f6d8 100644
--- a/zenutil/zenserverprocess.cpp
+++ b/zenutil/zenserverprocess.cpp
@@ -159,11 +159,12 @@ ZenServerState::Initialize()
ThrowLastError("Could not map view of Zen server state");
}
#else
- int Fd = shm_open("UnrealEngineZen", O_RDWR | O_CREAT, 0666);
+ int Fd = shm_open("/UnrealEngineZen", O_RDWR | O_CREAT | O_CLOEXEC, 0666);
if (Fd < 0)
{
ThrowLastError("Could not open a shared memory object");
}
+ fchmod(Fd, 0666);
void* hMap = (void*)intptr_t(Fd);
int Result = ftruncate(Fd, MapSize);
@@ -209,7 +210,7 @@ ZenServerState::InitializeReadOnly()
ThrowLastError("Could not map view of Zen server state");
}
#else
- int Fd = shm_open("UnrealEngineZen", O_RDONLY, 0666);
+ int Fd = shm_open("/UnrealEngineZen", O_RDONLY | O_CLOEXEC, 0666);
if (Fd < 0)
{
return false;