diff options
| -rw-r--r-- | scripts/remote_build.py | 88 | ||||
| -rw-r--r-- | zen/internalfile.cpp | 6 | ||||
| -rw-r--r-- | zencore/filesystem.cpp | 6 | ||||
| -rw-r--r-- | zencore/thread.cpp | 24 | ||||
| -rw-r--r-- | zenhttp/httpasio.cpp | 20 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 631 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 827 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 62 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 337 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 37 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 1 | ||||
| -rw-r--r-- | zenstore/basicfile.cpp | 9 | ||||
| -rw-r--r-- | zenstore/filecas.cpp | 9 | ||||
| -rw-r--r-- | zenutil/cache/cachepolicy.cpp | 12 | ||||
| -rw-r--r-- | zenutil/include/zenutil/cache/cachepolicy.h | 8 | ||||
| -rw-r--r-- | zenutil/zenserverprocess.cpp | 5 |
16 files changed, 1387 insertions, 695 deletions
diff --git a/scripts/remote_build.py b/scripts/remote_build.py index c5787f635..08e44c9ad 100644 --- a/scripts/remote_build.py +++ b/scripts/remote_build.py @@ -53,14 +53,11 @@ def _find_binary(name): raise EnvironmentError(f"Unable to find '{name}' in the path") #------------------------------------------------------------------------------- -class _AutoKill(object): - def __init__(self, proc): - self._proc = proc - +class _DaemonAutoKill(object): def __del__(self): - self._proc.kill() - self._proc.wait() - pass + # Roundabout way to avoid orphaned git-daemon processes + if os.name == "nt": + os.system("taskkill /f /im git-daemon.exe") @@ -73,7 +70,10 @@ def _local(args): parser = argparse.ArgumentParser(description=desc) parser.add_argument("remotehost", help="") parser.add_argument("action", default="build", nargs="?", help="") + parser.add_argument("--commit", default=None, help="Commit to act on") parser.add_argument("--keyfile", default=None, help="SSH key file") + parser.add_argument("--gitport", default=None, help="Use an exist git daemon at the given port") + parser.add_argument("--outdir", default=None, help="Put .zip bundles here") args = parser.parse_args(args) # Find the binaries we'll need @@ -135,28 +135,38 @@ def _local(args): print(f"Using host '{host}'") # Start a git daemon to use as a transfer mechanism - _header("Starting a git daemon") - print("Port: 4493") - print("Base-path: ", zen_dir) - print("Host: ", _get_ip()) - daemon = subprocess.Popen( - ( git_bin, - "daemon", - "--port=4493", - "--export-all", - "--reuseaddr", - "--verbose", - "--informative-errors", - "--base-path=" + str(zen_dir) ), - #stdout = daemon_log, - stderr = subprocess.STDOUT - ) - daemon_killer = _AutoKill(daemon) + _header("Git daemon") + daemon_port = args.gitport + if not daemon_port: + daemon_port = 4493 + + print("Starting out own one up") + print("Port:", daemon_port) + print("Base-path: ", zen_dir) + print("Host: ", _get_ip()) + daemon = subprocess.Popen( + ( git_bin, + "daemon", + "--port=" + str(daemon_port), + "--export-all", + "--reuseaddr", + "--verbose", + "--informative-errors", + "--base-path=" + str(zen_dir) ), + #stdout = daemon_log, + stderr = subprocess.STDOUT + ) + daemon_killer = _DaemonAutoKill() + else: + print("Using existing instance") + print("Port:", daemon_port) # Run this script on the remote machine _header("Running SSH") - remote_zen_dir = "%s_%s" % (os.getlogin(), _get_ip()) + commit = args.commit if args.commit else "origin/main" + + remote_zen_dir = "%s_%s_%s" % (os.getlogin(), _get_ip(), str(daemon_port)) print(f"Using zen '~/{remote_zen_dir}'") print(f"Running {__file__} remotely") @@ -166,12 +176,12 @@ def _local(args): *identity, "-tA", host, - f"python3 -u - !remote {_get_ip()} '{remote_zen_dir}' main '{args.action}'", + f"python3 -u - !remote {_get_ip()}:{daemon_port} '{remote_zen_dir}' {commit} '{args.action}'", stdin=self_file) # If we're bundling, collect zip files from the remote machine if args.action == "bundle": - build_dir = zen_dir / "build" + build_dir = Path(args.outdir) or (zen_dir / "build") build_dir.mkdir(exist_ok=True) scp_args = (*identity, host + f":zen/{remote_zen_dir}/build/*.zip", build_dir) _run_checked("scp", *scp_args) @@ -185,9 +195,9 @@ def _remote(args): # Parse arguments desc = "Build Zen on a remote host" parser = argparse.ArgumentParser(description=desc) - parser.add_argument("ip", help="Host's IP address") + parser.add_argument("gitaddr", help="Host's Git daemon address") parser.add_argument("reponame", help="Repository name clone into and work in") - parser.add_argument("branch", help="Zen branch to operate on") + parser.add_argument("commit", help="Zen commit to operate on") parser.add_argument("action", help="The action to do") args = parser.parse_args(args) @@ -203,14 +213,15 @@ def _remote(args): """ # Check for a clone, create it, chdir to it - _header("REMOTE:", f"Clone/pull from {args.ip}") + _header("REMOTE:", f"Clone/pull from {args.gitaddr}") clone_dir = zen_dir / args.reponame if not clone_dir.is_dir(): - _run_checked("git", "clone", f"git://{args.ip}:4493/", clone_dir) + _run_checked("git", "clone", f"git://{args.gitaddr}/", clone_dir) os.chdir(clone_dir) - _run_checked("git", "checkout", args.branch) - _run_checked("git", "pull", "-r") + _run_checked("git", "clean", "-fd") + _run_checked("git", "fetch", "origin") + _run_checked("git", "checkout", args.commit) _header("REMOTE:", f"Performing action '{args.action}'") @@ -257,14 +268,7 @@ if __name__ == "__main__": ret = _remote(sys.argv[2:]) raise SystemExit(ret) - try: - ret = _local(sys.argv[1:]) - raise SystemExit(ret) - except: - raise - finally: - # Roundabout way to avoid orphaned git-daemon processes - if os.name == "nt": - os.system("taskkill /f /im git-daemon.exe") + ret = _local(sys.argv[1:]) + raise SystemExit(ret) # vim: expandtab foldlevel=1 foldmethod=marker diff --git a/zen/internalfile.cpp b/zen/internalfile.cpp index 804375ce2..b3b587a41 100644 --- a/zen/internalfile.cpp +++ b/zen/internalfile.cpp @@ -179,12 +179,16 @@ InternalFile::OpenWrite(std::filesystem::path FileName, bool IsCreate) HRESULT hRes = m_File.Create(FileName.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, dwCreationDisposition); Success = SUCCEEDED(hRes); #else - int OpenFlags = O_RDWR; + int OpenFlags = O_RDWR | O_CLOEXEC; OpenFlags |= IsCreate ? O_CREAT | O_TRUNC : 0; int Fd = open(FileName.c_str(), OpenFlags, 0666); if (Fd >= 0) { + if (IsCreate) + { + fchmod(Fd, 0666); + } Success = true; m_File = (void*)(intptr_t(Fd)); } diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp index ab606301c..79563190c 100644 --- a/zencore/filesystem.cpp +++ b/zencore/filesystem.cpp @@ -441,6 +441,7 @@ CloneFile(std::filesystem::path FromPath, std::filesystem::path ToPath) { return false; } + fchmod(ToFd, 0666); ScopedFd $To = { FromFd }; ioctl(ToFd, FICLONE, FromFd); @@ -502,11 +503,12 @@ CopyFile(std::filesystem::path FromPath, std::filesystem::path ToPath, const Cop ScopedFd $From = {FromFd}; // To file - int ToFd = open(ToPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0644); + int ToFd = open(ToPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666); if (ToFd < 0) { ThrowLastError(fmt::format("failed to create file {}", ToPath)); } + fchmod(ToFd, 0666); ScopedFd $To = {ToFd}; // Copy impl @@ -569,6 +571,8 @@ WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t Buffer { ThrowLastError(fmt::format("File open failed for '{}'", Path)); } + + fchmod(Fd, 0666); #endif // TODO: this should be block-enlightened diff --git a/zencore/thread.cpp b/zencore/thread.cpp index a123eec82..527938cf3 100644 --- a/zencore/thread.cpp +++ b/zencore/thread.cpp @@ -28,6 +28,7 @@ # include <signal.h> # include <sys/file.h> # include <sys/sem.h> +# include <sys/stat.h> # include <sys/wait.h> # include <time.h> # include <unistd.h> @@ -73,6 +74,19 @@ SetNameInternal(DWORD thread_id, const char* name) } #endif +#if ZEN_PLATFORM_LINUX +const bool bNoZombieChildren = [] () { + // When a child process exits it is put into a zombie state until the parent + // collects its result. This doesn't fit the Windows-like model that Zen uses + // where there is a less strict familial model and no zombification. Ignoring + // SIGCHLD siganals removes the need to call wait() on zombies. Another option + // would be for the child to call setsid() but that would detatch the child + // from the terminal. + sigignore(SIGCHLD); + return true; +} (); +#endif + void SetCurrentThreadName([[maybe_unused]] std::string_view ThreadName) { @@ -257,11 +271,12 @@ NamedEvent::NamedEvent(std::string_view EventName) ExtendableStringBuilder<64> EventPath; EventPath << "/tmp/" << EventName; - int Fd = open(EventPath.c_str(), O_RDWR | O_CREAT, 0644); + int Fd = open(EventPath.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, 0666); if (Fd < 0) { ThrowLastError(fmt::format("Failed to create '{}' for named event", EventPath)); } + fchmod(Fd, 0666); // Use the file path to generate an IPC key key_t IpcKey = ftok(EventPath.c_str(), 1); @@ -433,11 +448,12 @@ NamedMutex::Create(std::string_view MutexName) ExtendableStringBuilder<64> Name; Name << "/tmp/" << MutexName; - int Inner = open(Name.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, 0644); + int Inner = open(Name.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, 0666); if (Inner < 0) { return false; } + fchmod(Inner, 0666); if (flock(Inner, LOCK_EX) != 0) { @@ -476,7 +492,7 @@ NamedMutex::Exists(std::string_view MutexName) Name << "/tmp/" << MutexName; bool bExists = false; - int Fd = open(Name.c_str(), O_RDWR, 0644); + int Fd = open(Name.c_str(), O_RDWR | O_CLOEXEC); if (Fd >= 0) { if (flock(Fd, LOCK_EX | LOCK_NB) == 0) @@ -628,8 +644,10 @@ ProcessHandle::Wait(int TimeoutMs) timespec SleepTime = {0, SleepMs * 1000 * 1000}; for (int i = 0;; i += SleepMs) { +#if ZEN_PLATFORM_MAC int WaitState = 0; waitpid(m_Pid, &WaitState, WNOHANG | WCONTINUED | WUNTRACED); +#endif if (kill(m_Pid, 0) < 0) { diff --git a/zenhttp/httpasio.cpp b/zenhttp/httpasio.cpp index 318f47eff..45994bb67 100644 --- a/zenhttp/httpasio.cpp +++ b/zenhttp/httpasio.cpp @@ -949,10 +949,18 @@ struct HttpAcceptor // This must be used by both the client and server side, and is only effective in the absence of // Windows Filtering Platform (WFP) callouts which can be installed by security software. // https://docs.microsoft.com/en-us/windows/win32/winsock/sio-loopback-fast-path - SOCKET NativeSocket = m_Acceptor.native_handle(); - int LoopbackOptionValue = 1; - DWORD OptionNumberOfBytesReturned = 0; - WSAIoctl(NativeSocket, SIO_LOOPBACK_FAST_PATH, &LoopbackOptionValue, sizeof(LoopbackOptionValue), NULL, 0, &OptionNumberOfBytesReturned, 0, 0); + SOCKET NativeSocket = m_Acceptor.native_handle(); + int LoopbackOptionValue = 1; + DWORD OptionNumberOfBytesReturned = 0; + WSAIoctl(NativeSocket, + SIO_LOOPBACK_FAST_PATH, + &LoopbackOptionValue, + sizeof(LoopbackOptionValue), + NULL, + 0, + &OptionNumberOfBytesReturned, + 0, + 0); #endif m_Acceptor.listen(); } @@ -983,8 +991,8 @@ struct HttpAcceptor // reference to the callbacks. Socket->set_option(asio::ip::tcp::no_delay(true)); - Socket->set_option(asio::socket_base::receive_buffer_size(128*1024)); - Socket->set_option(asio::socket_base::send_buffer_size(256*1024)); + Socket->set_option(asio::socket_base::receive_buffer_size(128 * 1024)); + Socket->set_option(asio::socket_base::send_buffer_size(256 * 1024)); auto Conn = std::make_shared<HttpServerConnection>(m_Server, std::move(Socket)); Conn->HandleNewRequest(); diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index aac43f43a..6a1b54b79 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -798,7 +798,8 @@ TEST_CASE("zcache.basic") { zen::IoHash Key = zen::IoHash::HashBuffer(&i, sizeof i); - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}); + cpr::Response Result = + cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); } @@ -836,7 +837,8 @@ TEST_CASE("zcache.basic") { zen::IoHash Key = HashKey(i); - cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}); + cpr::Response Result = + cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result.status_code == 200); } @@ -1349,7 +1351,7 @@ TEST_CASE("zcache.policy") // Get record { cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)}, - cpr::Header{{"Accept", "application/x-ue-cbobject"}}); + cpr::Header{{"Accept", "application/x-ue-cb"}}); CHECK(IsHttpSuccessCode(Result.status_code)); IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size()); CbObject ResponseObject = zen::LoadCompactBinaryObject(Buffer); @@ -1713,6 +1715,629 @@ TEST_CASE("zcache.rpc") } } +TEST_CASE("zcache.rpc.allpolicies") +{ + using namespace std::literals; + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamServer(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalServer(TestEnv); + const uint16_t LocalPortNumber = 13337; + const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); + + SpawnServer(UpstreamServer, UpstreamCfg); + SpawnServer(LocalServer, LocalCfg); + + std::string_view TestVersion = "F72150A02AE34B57A9EC91D36BA1CE08"sv; + std::string_view TestBucket = "allpoliciestest"sv; + + // NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local) + // *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type) + constexpr int NumKeys = 256; + constexpr int NumValues = 4; + Oid ValueIds[NumValues]; + IoHash Hash; + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + ExtendableStringBuilder<16> ValueName; + ValueName << "ValueId_"sv << ValueIndex; + static_assert(sizeof(IoHash) >= sizeof(Oid)); + ValueIds[ValueIndex] = Oid::FromMemory(IoHash::HashBuffer(ValueName.Data(), ValueName.Size() * sizeof(ValueName.Data()[0])).Hash); + } + + struct KeyData; + struct UserData + { + UserData& Set(KeyData* InKeyData, int InValueIndex) + { + Data = InKeyData; + ValueIndex = InValueIndex; + return *this; + } + KeyData* Data = nullptr; + int ValueIndex = 0; + }; + struct KeyData + { + CompressedBuffer BufferValues[NumValues]; + uint64_t IntValues[NumValues]; + UserData ValueUserData[NumValues]; + bool ReceivedChunk[NumValues]; + CacheKey Key; + UserData KeyUserData; + uint32_t KeyIndex = 0; + bool GetRequestsData = true; + bool UseValueAPI = false; + bool UseValuePolicy = false; + bool ForceMiss = false; + bool UseLocal = true; + bool UseRemote = true; + bool ShouldBeHit = true; + bool ReceivedPut = false; + bool ReceivedGet = false; + bool ReceivedPutValue = false; + bool ReceivedGetValue = false; + }; + struct CachePutRequest + { + CacheKey Key; + CbObject Record; + CacheRecordPolicy Policy; + KeyData* Values; + UserData* Data; + }; + struct CachePutValueRequest + { + CacheKey Key; + CompressedBuffer Value; + CachePolicy Policy; + UserData* Data; + }; + struct CacheGetRequest + { + CacheKey Key; + CacheRecordPolicy Policy; + UserData* Data; + }; + struct CacheGetValueRequest + { + CacheKey Key; + CachePolicy Policy; + UserData* Data; + }; + struct CacheGetChunkRequest + { + CacheKey Key; + Oid ValueId; + uint64_t RawOffset; + uint64_t RawSize; + IoHash RawHash; + CachePolicy Policy; + UserData* Data; + }; + + KeyData KeyDatas[NumKeys]; + std::vector<CachePutRequest> PutRequests; + std::vector<CachePutValueRequest> PutValueRequests; + std::vector<CacheGetRequest> GetRequests; + std::vector<CacheGetValueRequest> GetValueRequests; + std::vector<CacheGetChunkRequest> ChunkRequests; + + for (uint32_t KeyIndex = 0; KeyIndex < NumKeys; ++KeyIndex) + { + IoHashStream KeyWriter; + KeyWriter.Append(TestVersion.data(), TestVersion.length() * sizeof(TestVersion.data()[0])); + KeyWriter.Append(&KeyIndex, sizeof(KeyIndex)); + IoHash KeyHash = KeyWriter.GetHash(); + KeyData& KeyData = KeyDatas[KeyIndex]; + + KeyData.Key = CacheKey::Create(TestBucket, KeyHash); + KeyData.KeyIndex = KeyIndex; + KeyData.GetRequestsData = (KeyIndex & (1 << 1)) == 0; + KeyData.UseValueAPI = (KeyIndex & (1 << 2)) != 0; + KeyData.UseValuePolicy = (KeyIndex & (1 << 3)) != 0; + KeyData.ForceMiss = (KeyIndex & (1 << 4)) == 0; + KeyData.UseLocal = (KeyIndex & (1 << 5)) == 0; + KeyData.UseRemote = (KeyIndex & (1 << 6)) == 0; + KeyData.ShouldBeHit = !KeyData.ForceMiss && (KeyData.UseLocal || KeyData.UseRemote); + CachePolicy SharedPolicy = KeyData.UseLocal ? CachePolicy::Local : CachePolicy::None; + SharedPolicy |= KeyData.UseRemote ? CachePolicy::Remote : CachePolicy::None; + CachePolicy PutPolicy = SharedPolicy; + CachePolicy GetPolicy = SharedPolicy; + GetPolicy |= !KeyData.GetRequestsData ? CachePolicy::SkipData : CachePolicy::None; + CacheKey& Key = KeyData.Key; + + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + KeyData.IntValues[ValueIndex] = static_cast<uint64_t>(KeyIndex) | (static_cast<uint64_t>(ValueIndex) << 32); + KeyData.BufferValues[ValueIndex] = + CompressedBuffer::Compress(SharedBuffer::MakeView(&KeyData.IntValues[ValueIndex], sizeof(KeyData.IntValues[ValueIndex]))); + KeyData.ReceivedChunk[ValueIndex] = false; + } + + UserData& KeyUserData = KeyData.KeyUserData.Set(&KeyData, -1); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + KeyData.ValueUserData[ValueIndex].Set(&KeyData, ValueIndex); + } + if (!KeyData.UseValueAPI) + { + CbObjectWriter Builder; + Builder.BeginObject("key"sv); + Builder << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash; + Builder.EndObject(); + Builder.BeginArray("Values"sv); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + Builder.BeginObject(); + Builder.AddObjectId("Id"sv, ValueIds[ValueIndex]); + Builder.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(KeyData.BufferValues[ValueIndex].GetRawHash())); + Builder.AddInteger("RawSize"sv, KeyData.BufferValues[ValueIndex].GetRawSize()); + Builder.EndObject(); + } + Builder.EndArray(); + + CacheRecordPolicy PutRecordPolicy; + CacheRecordPolicy GetRecordPolicy; + if (!KeyData.UseValuePolicy) + { + PutRecordPolicy = CacheRecordPolicy(PutPolicy); + GetRecordPolicy = CacheRecordPolicy(GetPolicy); + } + else + { + // Switch the SkipData field in the Record policy so that if the CacheStore ignores the ValuePolicies + // it will use the wrong value for SkipData and fail our tests. + CacheRecordPolicyBuilder PutBuilder(PutPolicy ^ CachePolicy::SkipData); + CacheRecordPolicyBuilder GetBuilder(GetPolicy ^ CachePolicy::SkipData); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + PutBuilder.AddValuePolicy(ValueIds[ValueIndex], PutPolicy); + GetBuilder.AddValuePolicy(ValueIds[ValueIndex], GetPolicy); + } + PutRecordPolicy = PutBuilder.Build(); + GetRecordPolicy = GetBuilder.Build(); + } + if (!KeyData.ForceMiss) + { + PutRequests.push_back({Key, Builder.Save(), PutRecordPolicy, &KeyData, &KeyUserData}); + } + GetRequests.push_back({Key, GetRecordPolicy, &KeyUserData}); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + UserData& ValueUserData = KeyData.ValueUserData[ValueIndex]; + ChunkRequests.push_back({Key, ValueIds[ValueIndex], 0, UINT64_MAX, IoHash(), GetPolicy, &ValueUserData}); + } + } + else + { + if (!KeyData.ForceMiss) + { + PutValueRequests.push_back({Key, KeyData.BufferValues[0], PutPolicy, &KeyUserData}); + } + GetValueRequests.push_back({Key, GetPolicy, &KeyUserData}); + ChunkRequests.push_back({Key, Oid(), 0, UINT64_MAX, IoHash(), GetPolicy, &KeyUserData}); + } + } + + // PutCacheRecords + { + CbPackage Package; + CbObjectWriter Writer; + Writer << "Method"sv + << "PutCacheRecords"sv; + Writer.BeginObject("Params"sv); + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer.BeginArray("Requests"sv); + for (CachePutRequest& Request : PutRequests) + { + Writer.BeginObject(); + { + Writer.BeginObject("Record"sv); + { + Writer.BeginObject("Key"sv); + { + Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash; + } + Writer.EndObject(); + Writer.BeginArray("Values"sv); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + Writer.BeginObject(); + { + CompressedBuffer Buffer = Request.Values->BufferValues[ValueIndex]; + Writer.AddObjectId("Id"sv, ValueIds[ValueIndex]); + Writer.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Buffer.GetRawHash())); + Package.AddAttachment(CbAttachment(Buffer)); + Writer.AddInteger("RawSize"sv, Buffer.GetRawSize()); + } + Writer.EndObject(); + } + Writer.EndArray(); + } + Writer.EndObject(); + Writer.SetName("Policy"sv); + Request.Policy.Save(Writer); + } + Writer.EndObject(); + Request.Data->Data->ReceivedPut = true; + } + Writer.EndArray(); + } + Writer.EndObject(); + Package.SetObject(Writer.Save()); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK(Result.status_code == 200); + } + + // PutCacheValues + { + CbPackage Package; + CbObjectWriter Writer; + Writer << "Method"sv + << "PutCacheValues"sv; + Writer.BeginObject("Params"sv); + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer.BeginArray("Requests"sv); + for (CachePutValueRequest& Request : PutValueRequests) + { + Writer.BeginObject(); + { + Writer.BeginObject("Key"sv); + { + Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash; + } + Writer.EndObject(); + CompressedBuffer Buffer = Request.Value; + Writer.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Buffer.GetRawHash())); + Package.AddAttachment(CbAttachment(Buffer)); + Writer.AddString("Policy"sv, WriteToString<128>(Request.Policy)); + } + Writer.EndObject(); + Request.Data->Data->ReceivedPutValue = true; + } + Writer.EndArray(); + } + Writer.EndObject(); + Package.SetObject(Writer.Save()); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK(Result.status_code == 200); + } + + for (KeyData& KeyData : KeyDatas) + { + if (!KeyData.ForceMiss) + { + if (!KeyData.UseValueAPI) + { + CHECK(KeyData.ReceivedPut); + } + else + { + CHECK(KeyData.ReceivedPutValue); + } + } + } + + // GetCacheRecords + { + CbPackage Package; + CbObjectWriter Writer; + Writer << "Method"sv + << "GetCacheRecords"sv; + Writer.BeginObject("Params"sv); + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer.BeginArray("Requests"sv); + for (CacheGetRequest& Request : GetRequests) + { + Writer.BeginObject(); + { + Writer.BeginObject("Key"sv); + { + Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash; + } + Writer.EndObject(); + Writer.SetName("Policy"sv); + Request.Policy.Save(Writer); + } + Writer.EndObject(); + } + Writer.EndArray(); + } + Writer.EndObject(); + Package.SetObject(Writer.Save()); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK(Result.status_code == 200); + CbPackage Response; + bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(Loaded); + CbObjectView ResponseObject = Response.GetObject(); + CbArrayView Responses = ResponseObject["Result"sv].AsArrayView(); + CHECK(Responses.Num() == GetRequests.size()); + int Index = 0; + for (CbFieldView ResponseField : Responses) + { + CbObjectView RecordView = ResponseField.AsObjectView(); + bool Succeeded = !ResponseField.HasError(); + + CacheGetRequest& Request = GetRequests[Index++]; + KeyData* KeyData = Request.Data->Data; + KeyData->ReceivedGet = true; + + if (KeyData->ShouldBeHit) + { + CHECK(Succeeded); + } + else if (KeyData->ForceMiss) + { + CHECK(!Succeeded); + } + if (!KeyData->ForceMiss && Succeeded) + { + CbArrayView ValuesArray = RecordView["Values"sv].AsArrayView(); + CHECK(ValuesArray.Num() == NumValues); + + for (CbFieldView ValueField : ValuesArray) + { + CbObjectView ValueObject = ValueField.AsObjectView(); + Oid ActualValueId = ValueObject["Id"sv].AsObjectId(); + int ExpectedValueIndex = 0; + for (; ExpectedValueIndex < NumValues; ++ExpectedValueIndex) + { + if (ValueIds[ExpectedValueIndex] == ActualValueId) + { + break; + } + } + CHECK(ExpectedValueIndex < NumValues); + IoHash ActualRawHash = ValueObject["RawHash"sv].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(ActualRawHash); + CompressedBuffer ActualBuffer = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer(); + uint64_t ActualRawSize = UINT64_MAX; + if (ActualBuffer) + { + ActualRawSize = ActualBuffer.GetRawSize(); + } + else + { + ActualRawSize = ValueObject["RawSize"sv].AsUInt64(UINT64_MAX); + } + CompressedBuffer ExpectedValue = KeyData->BufferValues[ExpectedValueIndex]; + CHECK(ActualRawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash())); + CHECK(ActualRawSize == ExpectedValue.GetRawSize()); + + if (KeyData->GetRequestsData) + { + SharedBuffer Buffer = ActualBuffer.Decompress(); + CHECK(Buffer.GetSize() == ActualRawSize); + uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; + uint64_t ExpectedIntValue = KeyData->IntValues[ExpectedValueIndex]; + CHECK(ActualIntValue == ExpectedIntValue); + } + } + } + } + } + + // GetCacheValues + { + CbPackage Package; + CbObjectWriter Writer; + Writer << "Method"sv + << "GetCacheValues"sv; + Writer.BeginObject("Params"sv); + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer.BeginArray("Requests"sv); + for (CacheGetValueRequest& Request : GetValueRequests) + { + Writer.BeginObject(); + { + Writer.BeginObject("Key"sv); + { + Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash; + } + Writer.EndObject(); + Writer.AddString("Policy"sv, WriteToString<128>(Request.Policy)); + } + Writer.EndObject(); + } + Writer.EndArray(); + } + Writer.EndObject(); + Package.SetObject(Writer.Save()); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK(Result.status_code == 200); + CbPackage Response; + bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(Loaded); + CbObjectView ResponseObject = Response.GetObject(); + CbArrayView Responses = ResponseObject["Result"sv].AsArrayView(); + CHECK(Responses.Num() == GetValueRequests.size()); + int Index = 0; + for (CbFieldView RequestResultView : Responses) + { + CbObjectView RequestResultObject = RequestResultView.AsObjectView(); + CbFieldView RawHashField = RequestResultObject["RawHash"sv]; + IoHash ActualRawHash = RawHashField.AsHash(); + bool Succeeded = !RawHashField.HasError(); + + CacheGetValueRequest& Request = GetValueRequests[Index++]; + KeyData* KeyData = Request.Data->Data; + KeyData->ReceivedGetValue = true; + + if (KeyData->ShouldBeHit) + { + CHECK(Succeeded); + } + else if (KeyData->ForceMiss) + { + CHECK(!Succeeded); + } + if (!KeyData->ForceMiss && Succeeded) + { + const CbAttachment* Attachment = Response.FindAttachment(ActualRawHash); + CompressedBuffer ActualBuffer = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer(); + uint64_t ActualRawSize = UINT64_MAX; + if (ActualBuffer) + { + ActualRawSize = ActualBuffer.GetRawSize(); + } + else + { + ActualRawSize = RequestResultObject["RawSize"sv].AsUInt64(UINT64_MAX); + } + CompressedBuffer ExpectedValue = KeyData->BufferValues[0]; + CHECK(ActualRawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash())); + CHECK(ActualRawSize == ExpectedValue.GetRawSize()); + + if (KeyData->GetRequestsData) + { + SharedBuffer Buffer = ActualBuffer.Decompress(); + CHECK(Buffer.GetSize() == ActualRawSize); + uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; + uint64_t ExpectedIntValue = KeyData->IntValues[0]; + CHECK(ActualIntValue == ExpectedIntValue); + } + } + } + } + + // GetCacheChunks + { + CbPackage Package; + CbObjectWriter Writer; + std::sort(ChunkRequests.begin(), ChunkRequests.end(), [](CacheGetChunkRequest& A, CacheGetChunkRequest& B) { + return A.Key.Hash < B.Key.Hash; + }); + Writer << "Method"sv + << "GetCacheChunks"sv; + Writer.BeginObject("Params"sv); + { + CachePolicy BatchDefaultPolicy = CachePolicy::Default; + Writer << "DefaultPolicy"sv << WriteToString<128>(BatchDefaultPolicy); + Writer.BeginArray("ChunkRequests"sv); + for (CacheGetChunkRequest& Request : ChunkRequests) + { + Writer.BeginObject(); + { + Writer.BeginObject("Key"sv); + { + Writer << "Bucket"sv << Request.Key.Bucket << "Hash"sv << Request.Key.Hash; + } + Writer.EndObject(); + Writer.AddObjectId("ValueId"sv, ValueIds[Request.Data->ValueIndex]); + Writer.AddInteger("RawOffset"sv, Request.RawOffset); + Writer.AddInteger("RawSize"sv, Request.RawSize); + Writer.AddHash("ChunkId"sv, IoHash()); + Writer.AddString("Policy"sv, WriteToString<128>(Request.Policy)); + } + Writer.EndObject(); + } + Writer.EndArray(); + } + Writer.EndObject(); + Package.SetObject(Writer.Save()); + IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); + cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + CHECK(Result.status_code == 200); + CbPackage Response; + bool Loaded = Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); + CHECK(Loaded); + CbObjectView ResponseObject = Response.GetObject(); + CbArrayView Responses = ResponseObject["Result"sv].AsArrayView(); + CHECK(Responses.Num() == ChunkRequests.size()); + int Index = 0; + for (CbFieldView RequestResultView : Responses) + { + CbObjectView RequestResultObject = RequestResultView.AsObjectView(); + CbFieldView RawHashField = RequestResultObject["RawHash"sv]; + IoHash ActualRawHash = RawHashField.AsHash(); + bool Succeeded = !RawHashField.HasError(); + + CacheGetChunkRequest& Request = ChunkRequests[Index++]; + KeyData* KeyData = Request.Data->Data; + int ValueIndex = Request.Data->ValueIndex >= 0 ? Request.Data->ValueIndex : 0; + KeyData->ReceivedChunk[ValueIndex] = true; + + if (KeyData->ShouldBeHit) + { + CHECK(Succeeded); + } + else if (KeyData->ForceMiss) + { + CHECK(!Succeeded); + } + if (KeyData->ShouldBeHit && Succeeded) + { + const CbAttachment* Attachment = Response.FindAttachment(ActualRawHash); + CompressedBuffer ActualBuffer = Attachment ? Attachment->AsCompressedBinary() : CompressedBuffer(); + uint64_t ActualRawSize = UINT64_MAX; + if (ActualBuffer) + { + ActualRawSize = ActualBuffer.GetRawSize(); + } + else + { + ActualRawSize = RequestResultObject["RawSize"sv].AsUInt64(UINT64_MAX); + } + CompressedBuffer ExpectedValue = KeyData->BufferValues[ValueIndex]; + CHECK(ActualRawHash == IoHash::FromBLAKE3(ExpectedValue.GetRawHash())); + CHECK(ActualRawSize == ExpectedValue.GetRawSize()); + + if (KeyData->GetRequestsData) + { + SharedBuffer Buffer = ActualBuffer.Decompress(); + CHECK(Buffer.GetSize() == ActualRawSize); + uint64_t ActualIntValue = ((const uint64_t*)Buffer.GetData())[0]; + uint64_t ExpectedIntValue = KeyData->IntValues[ValueIndex]; + CHECK(ActualIntValue == ExpectedIntValue); + } + } + } + } + + for (KeyData& KeyData : KeyDatas) + { + if (!KeyData.UseValueAPI) + { + CHECK(KeyData.ReceivedGet); + for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex) + { + CHECK(KeyData.ReceivedChunk[ValueIndex]); + } + } + else + { + CHECK(KeyData.ReceivedGetValue); + CHECK(KeyData.ReceivedChunk[0]); + } + } +} + # if ZEN_USE_EXEC struct RemoteExecutionRequest diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 49e5896d1..8ae531720 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -227,45 +227,61 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal) && m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, ClientResultValue)) { - Success = true; + Success = true; + ZenContentType ContentType = ClientResultValue.Value.GetContentType(); if (AcceptType == ZenContentType::kCbPackage) { - CbPackage Package; - uint32_t MissingCount = 0; + if (ContentType == ZenContentType::kCbObject) + { + CbPackage Package; + uint32_t MissingCount = 0; - CbObjectView CacheRecord(ClientResultValue.Value.Data()); - CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { - if (SkipData) - { - MissingCount += m_CidStore.ContainsChunk(AttachmentHash.AsHash()) ? 0 : 1; - } - else - { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + CbObjectView CacheRecord(ClientResultValue.Value.Data()); + CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { + if (SkipData) { - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) + { + MissingCount++; + } } else { - MissingCount++; + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + } + else + { + MissingCount++; + } } - } - }); + }); - Success = MissingCount == 0 || PartialRecord; + Success = MissingCount == 0 || PartialRecord; - if (Success) - { - Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value)); + if (Success) + { + Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value)); - BinaryWriter MemStream; - Package.Save(MemStream); + BinaryWriter MemStream; + Package.Save(MemStream); - ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage); + ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage); + } + } + else + { + Success = false; } } + else if (AcceptType != ClientResultValue.Value.GetContentType() && AcceptType != ZenContentType::kUnknownContentType && + AcceptType != ZenContentType::kBinary) + { + Success = false; + } } if (Success) @@ -277,13 +293,13 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request ToString(ClientResultValue.Value.GetContentType())); m_CacheStats.HitCount++; - if (SkipData && AcceptType == ZenContentType::kBinary) + if (SkipData && AcceptType != ZenContentType::kCbPackage && AcceptType != ZenContentType::kCbObject) { return Request.WriteResponse(HttpResponseCode::OK); } else { - // Other types handled SkipData when constructing the ClientResultValue + // kCbPackage handled SkipData when constructing the ClientResultValue, kcbObject ignores SkipData return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } @@ -996,155 +1012,9 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack return PutResult::Success; } -#if BACKWARDS_COMPATABILITY_JAN2022 -void -HttpStructuredCacheService::HandleRpcGetCacheRecordsLegacy(zen::HttpServerRequest& Request, CbObjectView RpcRequest) -{ - ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); - - CbPackage RpcResponse; - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - CacheRecordPolicy BatchPolicy = LoadCacheRecordPolicy(Params["Policy"sv].AsObjectView()); - std::vector<CacheKey> CacheKeys; - std::vector<IoBuffer> CacheValues; - std::vector<size_t> UpstreamRequests; - - ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); - - for (CbFieldView KeyView : Params["CacheKeys"sv]) - { - CbObjectView KeyObject = KeyView.AsObjectView(); - CacheKeys.push_back(CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash())); - } - - if (CacheKeys.empty()) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); - } - - CacheValues.resize(CacheKeys.size()); - - for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys) - { - ZenCacheValue CacheValue; - uint32_t MissingCount = 0; - uint32_t MissingReadFromUpstreamCount = 0; - - if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue) && - CacheValue.Value.GetContentType() == ZenContentType::kCbObject) - { - CbObjectView CacheRecord(CacheValue.Value.Data()); - CacheRecord.IterateAttachments( - [this, &MissingCount, &MissingReadFromUpstreamCount, &RpcResponse, &BatchPolicy](CbFieldView AttachmentHash) { - CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy(); - if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal)) - { - // A value that is requested without the Query flag (such as None/Disable) does not count as missing, because we - // didn't ask for it and thus the record is complete in its absence. - if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) - { - MissingReadFromUpstreamCount++; - MissingCount++; - } - } - else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) - { - if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) - { - if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) - { - MissingReadFromUpstreamCount++; - } - MissingCount++; - } - } - else - { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) - { - ZEN_ASSERT(Chunk.GetSize() > 0); - RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); - } - else - { - if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) - { - MissingReadFromUpstreamCount++; - } - MissingCount++; - } - } - }); - } - - // Searching upstream is not implemented in this legacy support function - if (CacheValue.Value && (MissingCount == 0 || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) - { - ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL) {}", - Key.Bucket, - Key.Hash, - NiceBytes(CacheValue.Value.Size()), - ToString(CacheValue.Value.GetContentType()), - MissingCount ? "(PARTIAL)" : ""sv); - - CacheValues[KeyIndex] = std::move(CacheValue.Value); - m_CacheStats.HitCount++; - } - else - { - if (!EnumHasAnyFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::Query)) - { - // If they requested no query, do not record this as a miss - ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash); - } - else - { - ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAL)"sv : ""sv); - m_CacheStats.MissCount++; - } - } - - ++KeyIndex; - } - - CbObjectWriter ResponseObject; - - ResponseObject.BeginArray("Result"sv); - for (const IoBuffer& Value : CacheValues) - { - if (Value) - { - CbObjectView Record(Value.Data()); - ResponseObject << Record; - } - else - { - ResponseObject.AddNull(); - } - } - ResponseObject.EndArray(); - - RpcResponse.SetObject(ResponseObject.Save()); - - BinaryWriter MemStream; - RpcResponse.Save(MemStream); - - Request.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); -} -#endif - void HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) { -#if BACKWARDS_COMPATABILITY_JAN2022 - // Backwards compatability; - if (RpcRequest["Params"sv].AsObjectView()["CacheKeys"sv]) - { - return HandleRpcGetCacheRecordsLegacy(HttpRequest, RpcRequest); - } -#endif ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); @@ -1527,7 +1397,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kBinary, .Key = Key}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Key = Key}); } Results.push_back(Succeeded); ZEN_DEBUG("PUTCACHEVALUES - '{}/{}' {}, '{}'", Key.Bucket, Key.Hash, NiceBytes(TransferredSize), Succeeded ? "Added"sv : "Invalid"); @@ -1559,13 +1429,6 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ void HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) { -#if BACKWARDS_COMPATABILITY_JAN2022 - if (RpcRequest["Params"sv].AsObjectView()["ChunkRequests"]) - { - return HandleRpcGetCacheChunks(HttpRequest, RpcRequest); - } -#endif - ZEN_TRACE_CPU("Z$::RpcGetCacheValues"); CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); @@ -1615,7 +1478,8 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http } if (!Result && EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) { - GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kBinary); + GetUpstreamCacheResult UpstreamResult = + m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kCompressedBinary); if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType())) { Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)); @@ -1690,466 +1554,469 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } -namespace GetCacheChunks::detail { +namespace cache::detail { - struct ValueData + struct RecordValue { Oid ValueId; IoHash ContentId; uint64_t RawSize; }; - struct KeyRequestData - { - CacheKeyRequest Upstream; - IoBuffer CacheValue; - std::vector<ValueData> Values; - CachePolicy DownstreamRecordPolicy; - CachePolicy DownstreamPolicy; - std::string_view Source; - bool Exists = false; - bool HasRequest = false; - bool HasRecordRequest = false; - bool HasValueRequest = false; - bool ValuesRead = false; + struct RecordBody + { + IoBuffer CacheValue; + std::vector<RecordValue> Values; + std::string_view Source; + CachePolicy DownstreamPolicy; + bool Exists = false; + bool HasRequest = false; + bool ValuesRead = false; }; - struct ChunkRequestData - { - CacheChunkRequest Upstream; - KeyRequestData* KeyRequest; - size_t KeyRequestIndex; - CachePolicy DownstreamPolicy; - CompressedBuffer Value; - std::string_view Source; - uint64_t TotalSize = 0; - bool Exists = false; - bool IsRecordRequest = false; - bool TotalSizeKnown = false; + struct ChunkRequest + { + CacheChunkRequest* Key = nullptr; + RecordBody* Record = nullptr; + CompressedBuffer Value; + std::string_view Source; + uint64_t TotalSize = 0; + uint64_t RequestedSize = 0; + uint64_t RequestedOffset = 0; + CachePolicy DownstreamPolicy; + bool Exists = false; + bool TotalSizeKnown = false; + bool IsRecordRequest = false; }; -} // namespace GetCacheChunks::detail +} // namespace cache::detail void HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) { - using namespace GetCacheChunks::detail; + using namespace cache::detail; ZEN_TRACE_CPU("Z$::RpcGetCacheChunks"); - std::vector<KeyRequestData> KeyRequests; - std::vector<ChunkRequestData> Chunks; - BACKWARDS_COMPATABILITY_JAN2022_CODE(bool SendValueOnly = false;) - if (!TryGetCacheChunks_Parse(KeyRequests, Chunks BACKWARDS_COMPATABILITY_JAN2022_CODE(, SendValueOnly), RpcRequest)) + std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream + std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests + std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream + std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest + std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key + std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key + std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream + + // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests + if (!ParseGetCacheChunksRequest(RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } - GetCacheChunks_LoadKeys(KeyRequests); - GetCacheChunks_LoadChunks(Chunks); - GetCacheChunks_SendResults(Chunks, HttpRequest BACKWARDS_COMPATABILITY_JAN2022_CODE(, SendValueOnly)); + + // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we + // have it locally, and otherwise append a request for the payload to UpstreamChunks + GetLocalCacheRecords(RecordKeys, Records, RecordRequests, UpstreamChunks); + + // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks + GetLocalCacheValues(ValueRequests, UpstreamChunks); + + // Call GetCacheChunks on the upstream for any payloads we do not have locally + GetUpstreamCacheChunks(UpstreamChunks, RequestKeys, Requests); + + // Send the payload and descriptive data about each chunk to the client + WriteGetCacheChunksResponse(Requests, HttpRequest); } bool -HttpStructuredCacheService::TryGetCacheChunks_Parse(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests, - std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks, - BACKWARDS_COMPATABILITY_JAN2022_CODE(bool& SendValueOnly, ) CbObjectView RpcRequest) +HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::RecordBody>& Records, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, + CbObjectView RpcRequest) { - using namespace GetCacheChunks::detail; + using namespace cache::detail; -#if BACKWARDS_COMPATABILITY_JAN2022 - SendValueOnly = RpcRequest["MethodVersion"sv].AsInt32() < 1; -#else ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv); -#endif - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; + CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); + size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num()); + + // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed, + // we will need to change the pointers to indexes to handle reallocations. + RecordKeys.reserve(NumRequests); + Records.reserve(NumRequests); + RequestKeys.reserve(NumRequests); + Requests.reserve(NumRequests); + RecordRequests.reserve(NumRequests); + ValueRequests.reserve(NumRequests); + + CacheKeyRequest* PreviousRecordKey = nullptr; + RecordBody* PreviousRecord = nullptr; - KeyRequestData* PreviousKeyRequest = nullptr; - CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); - Chunks.reserve(ChunkRequestsArray.Num()); for (CbFieldView RequestView : ChunkRequestsArray) { - ChunkRequestData& Chunk = Chunks.emplace_back(); - CbObjectView RequestObject = RequestView.AsObjectView(); + CbObjectView RequestObject = RequestView.AsObjectView(); + CacheChunkRequest& RequestKey = RequestKeys.emplace_back(); + ChunkRequest& Request = Requests.emplace_back(); + Request.Key = &RequestKey; CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); CbFieldView HashField = KeyObject["Hash"sv]; - Chunk.Upstream.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash()); - if (Chunk.Upstream.Key.Bucket.empty() || HashField.HasError()) + RequestKey.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash()); + if (RequestKey.Key.Bucket.empty() || HashField.HasError()) { ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest."); return false; } - KeyRequestData* KeyRequest = nullptr; - if (!PreviousKeyRequest || PreviousKeyRequest->Upstream.Key < Chunk.Upstream.Key) - { - KeyRequest = &KeyRequests.emplace_back(); - KeyRequest->Upstream.Key = Chunk.Upstream.Key; - PreviousKeyRequest = KeyRequest; - } - else if (!(Chunk.Upstream.Key < PreviousKeyRequest->Upstream.Key)) + RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash(); + RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId(); + RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64(); + RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX); + Request.RequestedSize = RequestKey.RawSize; + Request.RequestedOffset = RequestKey.RawOffset; + std::string_view PolicyText = RequestObject["Policy"sv].AsString(); + Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; + Request.IsRecordRequest = (bool)RequestKey.ValueId; + + if (!Request.IsRecordRequest) { - KeyRequest = PreviousKeyRequest; + ValueRequests.push_back(&Request); } else { - ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.", - Chunk.Upstream.Key.Bucket, - Chunk.Upstream.Key.Hash, - PreviousKeyRequest->Upstream.Key.Bucket, - PreviousKeyRequest->Upstream.Key.Hash); - return false; - } - Chunk.KeyRequestIndex = std::distance(KeyRequests.data(), KeyRequest); - - Chunk.Upstream.ChunkId = RequestObject["ChunkId"sv].AsHash(); - Chunk.Upstream.ValueId = RequestObject["ValueId"sv].AsObjectId(); - Chunk.Upstream.RawOffset = RequestObject["RawOffset"sv].AsUInt64(); - Chunk.Upstream.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX); - std::string_view PolicyText = RequestObject["Policy"sv].AsString(); - Chunk.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; -#if BACKWARDS_COMPATABILITY_JAN2022 - if (SendValueOnly) - { - Chunk.DownstreamPolicy = Chunk.DownstreamPolicy & (~CachePolicy::SkipData); - } -#endif - Chunk.IsRecordRequest = (bool)Chunk.Upstream.ValueId; + RecordRequests.push_back(&Request); + CacheKeyRequest* RecordKey = nullptr; + RecordBody* Record = nullptr; - if (!Chunk.IsRecordRequest || Chunk.Upstream.ChunkId == IoHash::Zero) - { - KeyRequest->DownstreamPolicy = - KeyRequest->HasRequest ? Union(KeyRequest->DownstreamPolicy, Chunk.DownstreamPolicy) : Chunk.DownstreamPolicy; - KeyRequest->HasRequest = true; - (Chunk.IsRecordRequest ? KeyRequest->HasRecordRequest : KeyRequest->HasValueRequest) = true; + if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key) + { + RecordKey = &RecordKeys.emplace_back(); + PreviousRecordKey = RecordKey; + Record = &Records.emplace_back(); + PreviousRecord = Record; + RecordKey->Key = RequestKey.Key; + } + else if (RequestKey.Key == PreviousRecordKey->Key) + { + RecordKey = PreviousRecordKey; + Record = PreviousRecord; + } + else + { + ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.", + RequestKey.Key.Bucket, + RequestKey.Key.Hash, + PreviousRecordKey->Key.Bucket, + PreviousRecordKey->Key.Hash); + return false; + } + Request.Record = Record; + if (RequestKey.ChunkId == RequestKey.ChunkId.Zero) + { + Record->DownstreamPolicy = + Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy; + Record->HasRequest = true; + } } } - if (Chunks.empty()) + if (Requests.empty()) { return false; } - for (ChunkRequestData& Chunk : Chunks) - { - Chunk.KeyRequest = &KeyRequests[Chunk.KeyRequestIndex]; - } return true; } void -HttpStructuredCacheService::GetCacheChunks_LoadKeys(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests) +HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::RecordBody>& Records, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks) { - using namespace GetCacheChunks::detail; + using namespace cache::detail; std::vector<CacheKeyRequest*> UpstreamRecordRequests; - std::vector<KeyRequestData*> UpstreamValueRequests; - for (KeyRequestData& KeyRequest : KeyRequests) + for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) { - if (KeyRequest.HasRequest) + CacheKeyRequest& RecordKey = RecordKeys[RecordIndex]; + RecordBody& Record = Records[RecordIndex]; + if (Record.HasRequest) { - if (KeyRequest.HasRecordRequest) - { - KeyRequest.DownstreamRecordPolicy = KeyRequest.DownstreamPolicy | CachePolicy::SkipData | CachePolicy::SkipMeta; - } + Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta; - if (!KeyRequest.Exists && EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::QueryLocal)) + if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal)) { - // There's currently no interface for checking only whether a CacheValue exists without loading it, - // so we load it here even if SkipData is true and its a CacheValue request. ZenCacheValue CacheValue; - if (m_CacheStore.Get(KeyRequest.Upstream.Key.Bucket, KeyRequest.Upstream.Key.Hash, CacheValue)) + if (m_CacheStore.Get(RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) { - KeyRequest.Exists = true; - KeyRequest.CacheValue = std::move(CacheValue.Value); - KeyRequest.Source = "LOCAL"sv; + Record.Exists = true; + Record.CacheValue = std::move(CacheValue.Value); + Record.Source = "LOCAL"sv; } } - if (!KeyRequest.Exists) + if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote)) { - // At most one of RecordRequest or ValueRequest will succeed for the upstream request of the key a given key, but we don't - // know which, - // and if the requests (from arbitrary Unreal Class code) includes both types of request for a key, we want to ask for both - // kinds and pass the request that uses the one that succeeds. - if (KeyRequest.HasRecordRequest && EnumHasAllFlags(KeyRequest.DownstreamRecordPolicy, CachePolicy::QueryRemote)) - { - KeyRequest.Upstream.Policy = CacheRecordPolicy(ConvertToUpstream(KeyRequest.DownstreamRecordPolicy)); - UpstreamRecordRequests.push_back(&KeyRequest.Upstream); - } - if (KeyRequest.HasValueRequest && EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::QueryRemote)) - { - UpstreamValueRequests.push_back(&KeyRequest); - } + RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy)); + UpstreamRecordRequests.push_back(&RecordKey); } } } if (!UpstreamRecordRequests.empty()) { - const auto OnCacheRecordGetComplete = [this](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; } + CacheKeyRequest& RecordKey = Params.Request; + size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); + RecordBody& Record = Records[RecordIndex]; - KeyRequestData& KeyRequest = - *reinterpret_cast<KeyRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(KeyRequestData, Upstream)); - const CacheKey& Key = KeyRequest.Upstream.Key; - KeyRequest.Exists = true; + const CacheKey& Key = RecordKey.Key; + Record.Exists = true; CbObject ObjectBuffer = CbObject::Clone(Params.Record); - KeyRequest.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); - KeyRequest.CacheValue.SetContentType(ZenContentType::kCbObject); - KeyRequest.Source = "UPSTREAM"sv; + Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); + Record.CacheValue.SetContentType(ZenContentType::kCbObject); + Record.Source = "UPSTREAM"sv; - if (EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::StoreLocal)) + if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal)) { - m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = KeyRequest.CacheValue}); + m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); } }; m_UpstreamCache.GetCacheRecords(UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } - if (!UpstreamValueRequests.empty()) - { - for (KeyRequestData* KeyRequestPtr : UpstreamValueRequests) - { - KeyRequestData& KeyRequest = *KeyRequestPtr; - CacheKey& Key = KeyRequest.Upstream.Key; - GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kBinary); - if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType())) - { - CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)); - if (Result) - { - KeyRequest.CacheValue = std::move(UpstreamResult.Value); - KeyRequest.CacheValue.SetContentType(ZenContentType::kCompressedBinary); - KeyRequest.Exists = true; - KeyRequest.Source = "UPSTREAM"sv; - // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement - // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive - // for us to keep the data only on the upstream server. - // if (EnumHasAllFlags(KeyRequest->DownstreamValuePolicy, CachePolicy::StoreLocal)) - { - m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = KeyRequest.CacheValue}); - } - } - } - } - } -} - -void -HttpStructuredCacheService::GetCacheChunks_LoadChunks(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks) -{ - using namespace GetCacheChunks::detail; - std::vector<CacheChunkRequest*> UpstreamPayloadRequests; - for (ChunkRequestData& Chunk : Chunks) + for (ChunkRequest* Request : RecordRequests) { - if (Chunk.IsRecordRequest) + if (Request->Key->ChunkId == IoHash::Zero) { - if (Chunk.Upstream.ChunkId == IoHash::Zero) + // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) + // is missing, parse the cache record and try to find the raw hash from the ValueId. + RecordBody& Record = *Request->Record; + if (!Record.ValuesRead) { - // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) - // is missing, parse the cache record and try to find the raw hash from the ValueId. - KeyRequestData& KeyRequest = *Chunk.KeyRequest; - if (!KeyRequest.ValuesRead) + Record.ValuesRead = true; + if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject) { - KeyRequest.ValuesRead = true; - if (KeyRequest.CacheValue && KeyRequest.CacheValue.GetContentType() == ZenContentType::kCbObject) + CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData()); + CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); + Record.Values.reserve(ValuesArray.Num()); + for (CbFieldView ValueField : ValuesArray) { - CbObjectView RecordObject = CbObjectView(KeyRequest.CacheValue.GetData()); - CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); - KeyRequest.Values.reserve(ValuesArray.Num()); - for (CbFieldView ValueField : ValuesArray) + CbObjectView ValueObject = ValueField.AsObjectView(); + Oid ValueId = ValueObject["Id"sv].AsObjectId(); + CbFieldView RawHashField = ValueObject["RawHash"sv]; + IoHash RawHash = RawHashField.AsBinaryAttachment(); + if (ValueId && !RawHashField.HasError()) { - CbObjectView ValueObject = ValueField.AsObjectView(); - Oid ValueId = ValueObject["Id"sv].AsObjectId(); - CbFieldView RawHashField = ValueObject["RawHash"sv]; - IoHash RawHash = RawHashField.AsBinaryAttachment(); - if (ValueId && !RawHashField.HasError()) - { - KeyRequest.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); - } + Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); } } } + } - for (const ValueData& Value : KeyRequest.Values) + for (const RecordValue& Value : Record.Values) + { + if (Value.ValueId == Request->Key->ValueId) { - if (Value.ValueId == Chunk.Upstream.ValueId) - { - Chunk.Upstream.ChunkId = Value.ContentId; - Chunk.TotalSize = Value.RawSize; - Chunk.TotalSizeKnown = true; - break; - } + Request->Key->ChunkId = Value.ContentId; + Request->TotalSize = Value.RawSize; + Request->TotalSizeKnown = true; + break; } } + } - // Now load the ContentId from the local ContentIdStore or from the upstream - if (Chunk.Upstream.ChunkId != IoHash::Zero) + // Now load the ContentId from the local ContentIdStore or from the upstream + if (Request->Key->ChunkId != IoHash::Zero) + { + if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { - if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::QueryLocal)) + if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->TotalSizeKnown) { - if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData) && Chunk.TotalSizeKnown) + if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) { - if (m_CidStore.ContainsChunk(Chunk.Upstream.ChunkId)) - { - Chunk.Exists = true; - Chunk.Source = "LOCAL"sv; - } + Request->Exists = true; + Request->Source = "LOCAL"sv; } - else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Chunk.Upstream.ChunkId)) + } + else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + if (Compressed) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); - if (Compressed) + if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { - if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) - { - Chunk.Value = Compressed; - } - Chunk.Exists = true; - Chunk.TotalSize = Compressed.GetRawSize(); - Chunk.TotalSizeKnown = true; - Chunk.Source = "LOCAL"sv; + Request->Value = Compressed; } + Request->Exists = true; + Request->TotalSize = Compressed.GetRawSize(); + Request->TotalSizeKnown = true; + Request->Source = "LOCAL"sv; } } - if (!Chunk.Exists && EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::QueryRemote)) - { - Chunk.Upstream.Policy = ConvertToUpstream(Chunk.DownstreamPolicy); - UpstreamPayloadRequests.push_back(&Chunk.Upstream); - } + } + if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) + { + Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy); + OutUpstreamChunks.push_back(Request->Key); } } - else + } +} + +void +HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks) +{ + using namespace cache::detail; + + for (ChunkRequest* Request : ValueRequests) + { + if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { - if (Chunk.KeyRequest->Exists) + ZenCacheValue CacheValue; + if (m_CacheStore.Get(Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) { - if (Chunk.KeyRequest->CacheValue && IsCompressedBinary(Chunk.KeyRequest->CacheValue.GetContentType())) + if (IsCompressedBinary(CacheValue.Value.GetContentType())) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk.KeyRequest->CacheValue)); - if (Compressed) + CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); + if (Result) { - if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) + if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { - Chunk.Value = Compressed; + Request->Value = Result; } - Chunk.Exists = true; - Chunk.TotalSize = Compressed.GetRawSize(); - Chunk.TotalSizeKnown = true; - Chunk.Source = Chunk.KeyRequest->Source; - Chunk.Upstream.ChunkId = IoHash::FromBLAKE3(Compressed.GetRawHash()); + Request->Key->ChunkId = IoHash::FromBLAKE3(Result.GetRawHash()); + Request->Exists = true; + Request->TotalSize = Result.GetRawSize(); + Request->TotalSizeKnown = true; + Request->Source = "LOCAL"sv; } } } } + if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) + { + if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal)) + { + // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it locally + Request->Key->RawOffset = 0; + Request->Key->RawSize = UINT64_MAX; + } + OutUpstreamChunks.push_back(Request->Key); + } } +} - if (!UpstreamPayloadRequests.empty()) +void +HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests) +{ + using namespace cache::detail; + + if (!UpstreamChunks.empty()) { - const auto OnCacheValueGetComplete = [this](CacheValueGetCompleteParams&& Params) { + const auto OnCacheValueGetComplete = [this, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) { if (Params.RawHash == Params.RawHash.Zero) { return; } - ChunkRequestData& Chunk = - *reinterpret_cast<ChunkRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(ChunkRequestData, Upstream)); - if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::StoreLocal) || - !EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) + CacheChunkRequest& Key = Params.Request; + size_t RequestIndex = std::distance(RequestKeys.data(), &Key); + ChunkRequest& Request = Requests[RequestIndex]; + if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) || + !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); - if (!Compressed || Compressed.GetRawSize() != Params.RawSize) + if (!Compressed || Compressed.GetRawSize() != Params.RawSize || + IoHash::FromBLAKE3(Compressed.GetRawHash()) != Params.RawHash) { return; } - if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::StoreLocal)) + if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal)) { - m_CidStore.AddChunk(Compressed); + if (Request.IsRecordRequest) + { + m_CidStore.AddChunk(Compressed); + } + else + { + m_CacheStore.Put(Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value}); + } } - if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) + if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { - Chunk.Value = std::move(Compressed); + Request.Value = std::move(Compressed); } } - Chunk.Exists = true; - Chunk.TotalSize = Params.RawSize; - Chunk.TotalSizeKnown = true; - Chunk.Source = "UPSTREAM"sv; + Key.ChunkId = Params.RawHash; + Request.Exists = true; + Request.TotalSize = Params.RawSize; + Request.TotalSizeKnown = true; + Request.Source = "UPSTREAM"sv; m_CacheStats.UpstreamHitCount++; }; - m_UpstreamCache.GetCacheValues(UpstreamPayloadRequests, std::move(OnCacheValueGetComplete)); + m_UpstreamCache.GetCacheValues(UpstreamChunks, std::move(OnCacheValueGetComplete)); } } void -HttpStructuredCacheService::GetCacheChunks_SendResults(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks, - zen::HttpServerRequest& HttpRequest - BACKWARDS_COMPATABILITY_JAN2022_CODE(, bool SendValueOnly)) +HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests, + zen::HttpServerRequest& HttpRequest) { - using namespace GetCacheChunks::detail; + using namespace cache::detail; CbPackage RpcResponse; CbObjectWriter Writer; Writer.BeginArray("Result"sv); - for (ChunkRequestData& Chunk : Chunks) + for (ChunkRequest& Request : Requests) { -#if BACKWARDS_COMPATABILITY_JAN2022 - if (SendValueOnly) - { - if (Chunk.Value) - { - Writer << Chunk.Upstream.ChunkId; - RpcResponse.AddAttachment(CbAttachment(Chunk.Value)); - } - else - { - Writer << IoHash::Zero; - } - } - else -#endif + Writer.BeginObject(); { - Writer.BeginObject(); + if (Request.Exists) { - if (Chunk.Exists) + Writer.AddHash("RawHash"sv, Request.Key->ChunkId); + if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { - Writer.AddHash("RawHash"sv, Chunk.Upstream.ChunkId); - if (Chunk.Value && !EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) - { - RpcResponse.AddAttachment(CbAttachment(Chunk.Value)); - } - else - { - Writer.AddInteger("RawSize"sv, Chunk.TotalSize); - } - - ZEN_DEBUG("CHUNKHIT - '{}/{}/{}' {} '{}' ({})", - Chunk.Upstream.Key.Bucket, - Chunk.Upstream.Key.Hash, - Chunk.Upstream.ValueId, - NiceBytes(Chunk.TotalSize), - Chunk.IsRecordRequest ? "Record"sv : "Value"sv, - Chunk.Source); - m_CacheStats.HitCount++; - } - else if (!EnumHasAnyFlags(Chunk.DownstreamPolicy, CachePolicy::Query)) - { - ZEN_DEBUG("CHUNKSKIP - '{}/{}/{}'", Chunk.Upstream.Key.Bucket, Chunk.Upstream.Key.Hash, Chunk.Upstream.ValueId); + RpcResponse.AddAttachment(CbAttachment(Request.Value)); } else { - ZEN_DEBUG("MISS - '{}/{}/{}'", Chunk.Upstream.Key.Bucket, Chunk.Upstream.Key.Hash, Chunk.Upstream.ValueId); - m_CacheStats.MissCount++; + Writer.AddInteger("RawSize"sv, Request.TotalSize); } + + ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", + Request.Key->Key.Bucket, + Request.Key->Key.Hash, + Request.Key->ValueId, + NiceBytes(Request.TotalSize), + Request.IsRecordRequest ? "Record"sv : "Value"sv, + Request.Source); + m_CacheStats.HitCount++; + } + else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query)) + { + ZEN_DEBUG("SKIP - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); + } + else + { + ZEN_DEBUG("MISS - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); + m_CacheStats.MissCount++; } - Writer.EndObject(); } + Writer.EndObject(); } Writer.EndArray(); diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 14b001e48..00c4260aa 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -11,15 +11,14 @@ #include <memory> #include <vector> -// Include the define for BACKWARDS_COMPATABILITY_JAN2022 -#include <zenutil/cache/cachepolicy.h> - namespace spdlog { class logger; } namespace zen { +struct CacheChunkRequest; +struct CacheKeyRequest; class CasStore; class CidStore; class CbObjectView; @@ -29,10 +28,10 @@ class UpstreamCache; class ZenCacheStore; enum class CachePolicy : uint32_t; -namespace GetCacheChunks::detail { - struct KeyRequestData; - struct ChunkRequestData; -} // namespace GetCacheChunks::detail +namespace cache::detail { + struct RecordBody; + struct ChunkRequest; +} // namespace cache::detail /** * Structured cache service. Imposes constraints on keys, supports blobs and @@ -108,25 +107,36 @@ private: void HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); void HandleRpcRequest(zen::HttpServerRequest& Request); void HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest); -#if BACKWARDS_COMPATABILITY_JAN2022 - void HandleRpcGetCacheRecordsLegacy(zen::HttpServerRequest& Request, CbObjectView BatchRequest); -#endif - void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest); - void HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest); - void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest); - void HandleRpcGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest); - void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); - virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; - virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; - PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); - - bool TryGetCacheChunks_Parse(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests, - std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks, - BACKWARDS_COMPATABILITY_JAN2022_CODE(bool& SendValueOnly, ) CbObjectView RpcRequest); - void GetCacheChunks_LoadKeys(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests); - void GetCacheChunks_LoadChunks(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks); - void GetCacheChunks_SendResults(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks, - zen::HttpServerRequest& HttpRequest BACKWARDS_COMPATABILITY_JAN2022_CODE(, bool SendValueOnly)); + void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest); + void HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest); + void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest); + void HandleRpcGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest); + void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); + virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; + virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; + PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); + + /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ + bool ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::RecordBody>& Records, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, + CbObjectView RpcRequest); + /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */ + void GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::RecordBody>& Records, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks); + /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */ + void GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks); + /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */ + void GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests); + /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ + void WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests, zen::HttpServerRequest& HttpRequest); spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 5990536a9..da0743f0a 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -177,6 +177,43 @@ namespace detail { { Result = Session.GetDerivedData(CacheKey.Bucket, CacheKey.Hash); } + else if (Type == ZenContentType::kCompressedBinary) + { + Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + + if (Result.Success) + { + const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All); + if (Result.Success = ValidationResult == CbValidateError::None; Result.Success) + { + CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); + IoBuffer ContentBuffer; + int NumAttachments = 0; + + CacheRecord.IterateAttachments( + [&Session, &Result, &ContentBuffer, &NumAttachments](CbFieldView AttachmentHash) { + CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + Result.Bytes += AttachmentResult.Bytes; + Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; + Result.ErrorCode = AttachmentResult.ErrorCode; + + if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) + { + Result.Response = AttachmentResult.Response; + ++NumAttachments; + } + else + { + Result.Success = false; + } + }); + if (NumAttachments != 1) + { + Result.Success = false; + } + } + } + } else { const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type; @@ -402,14 +439,45 @@ namespace detail { .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } - else + else if (CacheRecord.Type == ZenContentType::kCompressedBinary) { - int64_t TotalBytes = 0ull; - double TotalElapsedSeconds = 0.0; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue)); + if (!Compressed) + { + return {.Reason = std::string("Invalid compressed value buffer"), .Success = false}; + } - const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { - for (const IoHash& ValueContentId : ValueContentIds) - { + IoHash RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); + + CbObjectWriter ReferencingObject; + ReferencingObject.AddBinaryAttachment("RawHash", RawHash); + ReferencingObject.AddInteger("RawSize", Compressed.GetRawSize()); + + return PerformStructuredPut( + Session, + CacheRecord.Key, + ReferencingObject.Save().GetBuffer().AsIoBuffer(), + MaxAttempts, + [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) { + if (ValueContentId != RawHash) + { + OutReason = + fmt::format("Value '{}' MISMATCHED from compressed buffer raw hash {}", ValueContentId, RawHash); + return false; + } + + OutBuffer = RecordValue; + return true; + }); + } + else + { + return PerformStructuredPut( + Session, + CacheRecord.Key, + RecordValue, + MaxAttempts, + [&](const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason) { const auto It = std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId); @@ -421,131 +489,145 @@ namespace detail { const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It); - CloudCacheResult BlobResult; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) - { - BlobResult = Session.PutCompressedBlob(CacheRecord.ValueContentIds[Idx], Values[Idx]); - } + OutBuffer = Values[Idx]; + return true; + }); + } + } + catch (std::exception& Err) + { + m_Status.Set(UpstreamEndpointState::kError, Err.what()); - m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); + return {.Reason = std::string(Err.what()), .Success = false}; + } + } - if (!BlobResult.Success) - { - OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason); - return false; - } + virtual UpstreamEndpointStats& Stats() override { return m_Stats; } - TotalBytes += BlobResult.Bytes; - TotalElapsedSeconds += BlobResult.ElapsedSeconds; - } + private: + static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out) + { + Out.Success &= Result.Success; + Out.Bytes += Result.Bytes; + Out.ElapsedSeconds += Result.ElapsedSeconds; - return true; - }; + if (Result.ErrorCode) + { + Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}; + } + }; + + PutUpstreamCacheResult PerformStructuredPut( + CloudCacheSession& Session, + const CacheKey& Key, + IoBuffer ObjectBuffer, + const int32_t MaxAttempts, + std::function<bool(const IoHash& ValueContentId, IoBuffer& OutBuffer, std::string& OutReason)>&& BlobFetchFn) + { + int64_t TotalBytes = 0ull; + double TotalElapsedSeconds = 0.0; - PutRefResult RefResult; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) + const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { + for (const IoHash& ValueContentId : ValueContentIds) + { + IoBuffer BlobBuffer; + if (!BlobFetchFn(ValueContentId, BlobBuffer, OutReason)) { - RefResult = Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kCbObject); + return false; } - m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); - - if (!RefResult.Success) + CloudCacheResult BlobResult; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { - return {.Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'", - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - RefResult.Reason), - .Success = false}; + BlobResult = Session.PutCompressedBlob(ValueContentId, BlobBuffer); } - TotalBytes += RefResult.Bytes; - TotalElapsedSeconds += RefResult.ElapsedSeconds; + m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); - std::string Reason; - if (!PutBlobs(RefResult.Needs, Reason)) + if (!BlobResult.Success) { - return {.Reason = std::move(Reason), .Success = false}; + OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason); + return false; } - const IoHash RefHash = IoHash::HashBuffer(RecordValue); - FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash); - - m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); + TotalBytes += BlobResult.Bytes; + TotalElapsedSeconds += BlobResult.ElapsedSeconds; + } - if (!FinalizeResult.Success) - { - return {.Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'", - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - FinalizeResult.Reason), - .Success = false}; - } + return true; + }; - if (!FinalizeResult.Needs.empty()) - { - if (!PutBlobs(FinalizeResult.Needs, Reason)) - { - return {.Reason = std::move(Reason), .Success = false}; - } + PutRefResult RefResult; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) + { + RefResult = Session.PutRef(Key.Bucket, Key.Hash, ObjectBuffer, ZenContentType::kCbObject); + } - FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash); + m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); - m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); + if (!RefResult.Success) + { + return {.Reason = fmt::format("upload cache record '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, RefResult.Reason), + .Success = false}; + } - if (!FinalizeResult.Success) - { - return {.Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'", - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - FinalizeResult.Reason), - .Success = false}; - } + TotalBytes += RefResult.Bytes; + TotalElapsedSeconds += RefResult.ElapsedSeconds; - if (!FinalizeResult.Needs.empty()) - { - ExtendableStringBuilder<256> Sb; - for (const IoHash& MissingHash : FinalizeResult.Needs) - { - Sb << MissingHash.ToHexString() << ","; - } + std::string Reason; + if (!PutBlobs(RefResult.Needs, Reason)) + { + return {.Reason = std::move(Reason), .Success = false}; + } - return {.Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'", - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - Sb.ToString()), - .Success = false}; - } - } + const IoHash RefHash = IoHash::HashBuffer(ObjectBuffer); + FinalizeRefResult FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash); - TotalBytes += FinalizeResult.Bytes; - TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); - return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true}; - } + if (!FinalizeResult.Success) + { + return { + .Reason = fmt::format("finalize cache record '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, FinalizeResult.Reason), + .Success = false}; } - catch (std::exception& Err) + + if (!FinalizeResult.Needs.empty()) { - m_Status.Set(UpstreamEndpointState::kError, Err.what()); + if (!PutBlobs(FinalizeResult.Needs, Reason)) + { + return {.Reason = std::move(Reason), .Success = false}; + } - return {.Reason = std::string(Err.what()), .Success = false}; - } - } + FinalizeResult = Session.FinalizeRef(Key.Bucket, Key.Hash, RefHash); - virtual UpstreamEndpointStats& Stats() override { return m_Stats; } + m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); - private: - static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out) - { - Out.Success &= Result.Success; - Out.Bytes += Result.Bytes; - Out.ElapsedSeconds += Result.ElapsedSeconds; + if (!FinalizeResult.Success) + { + return {.Reason = fmt::format("finalize '{}/{}' FAILED, reason '{}'", Key.Bucket, Key.Hash, FinalizeResult.Reason), + .Success = false}; + } - if (Result.ErrorCode) - { - Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}; + if (!FinalizeResult.Needs.empty()) + { + ExtendableStringBuilder<256> Sb; + for (const IoHash& MissingHash : FinalizeResult.Needs) + { + Sb << MissingHash.ToHexString() << ","; + } + + return { + .Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'", Key.Bucket, Key.Hash, Sb.ToString()), + .Success = false}; + } } - }; + + TotalBytes += FinalizeResult.Bytes; + TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + + return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true}; + } spdlog::logger& Log() { return m_Log; } @@ -776,9 +858,6 @@ namespace detail { BatchRequest << "Method"sv << "GetCacheChunks"; -#if BACKWARDS_COMPATABILITY_JAN2022 - BatchRequest.AddInteger("MethodVersion"sv, 1); -#endif BatchRequest.BeginObject("Params"sv); { CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy; @@ -921,7 +1000,7 @@ namespace detail { } else { - return {.Reason = std::string("invalid value buffer"), .Success = false}; + return {.Reason = std::string("Invalid value buffer"), .Success = false}; } } @@ -939,6 +1018,56 @@ namespace detail { TotalBytes = Result.Bytes; TotalElapsedSeconds = Result.ElapsedSeconds; } + else if (CacheRecord.Type == ZenContentType::kCompressedBinary) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordValue)); + if (!Compressed) + { + return {.Reason = std::string("Invalid value compressed buffer"), .Success = false}; + } + + CbPackage BatchPackage; + CbObjectWriter BatchWriter; + BatchWriter << "Method"sv + << "PutCacheValues"; + + BatchWriter.BeginObject("Params"sv); + { + // DefaultPolicy unspecified and expected to be Default + + BatchWriter.BeginArray("Requests"sv); + { + BatchWriter.BeginObject(); + { + const CacheKey& Key = CacheRecord.Key; + BatchWriter.BeginObject("Key"sv); + { + BatchWriter << "Bucket"sv << Key.Bucket; + BatchWriter << "Hash"sv << Key.Hash; + } + BatchWriter.EndObject(); + // Policy unspecified and expected to be Default + BatchWriter.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Compressed.GetRawHash())); + BatchPackage.AddAttachment(CbAttachment(Compressed)); + } + BatchWriter.EndObject(); + } + BatchWriter.EndArray(); + } + BatchWriter.EndObject(); + BatchPackage.SetObject(BatchWriter.Save()); + + Result.Success = false; + for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.InvokeRpc(BatchPackage); + } + + m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); + + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + } else { for (size_t Idx = 0, Count = Values.size(); Idx < Count; Idx++) diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 0570dd316..1ac4afe5c 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -3,10 +3,13 @@ #include "zen.h" #include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/fmtutils.h> #include <zencore/session.h> #include <zencore/stream.h> +#include <zenhttp/httpcommon.h> +#include <zenhttp/httpshared.h> #include "cache/structuredcachestore.h" #include "diag/formatters.h" @@ -413,11 +416,7 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetHeader(cpr::Header{{"Accept", - Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg" - : Type == ZenContentType::kCbObject ? "application/x-ue-cb" - : "application/octet-stream"}}); - + Session.SetHeader(cpr::Header{{"Accept", std::string{MapContentTypeToString(Type)}}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -539,4 +538,32 @@ ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request) return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } +ZenCacheResult +ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request) +{ + ExtendableStringBuilder<256> Uri; + Uri << m_Client.ServiceUrl() << "/z$/$rpc"; + + SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten(); + + cpr::Session& Session = m_SessionState->GetSession(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}}); + Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + } // namespace zen diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index bc8fd3c56..f70d9d06f 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -133,6 +133,7 @@ public: ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type); ZenCacheResult PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload); ZenCacheResult InvokeRpc(const CbObjectView& Request); + ZenCacheResult InvokeRpc(const CbPackage& Package); private: inline spdlog::logger& Log() { return m_Log; } diff --git a/zenstore/basicfile.cpp b/zenstore/basicfile.cpp index dcd9a8575..895db6cee 100644 --- a/zenstore/basicfile.cpp +++ b/zenstore/basicfile.cpp @@ -72,7 +72,7 @@ BasicFile::Open(std::filesystem::path FileName, bool IsCreate, std::error_code& return; } #else - int OpenFlags = O_RDWR; + int OpenFlags = O_RDWR | O_CLOEXEC; OpenFlags |= IsCreate ? O_CREAT | O_TRUNC : 0; int Fd = open(FileName.c_str(), OpenFlags, 0666); @@ -81,6 +81,10 @@ BasicFile::Open(std::filesystem::path FileName, bool IsCreate, std::error_code& Ec = zen::MakeErrorCodeFromLastError(); return; } + if (IsCreate) + { + fchmod(Fd, 0666); + } void* FileHandle = (void*)(uintptr_t(Fd)); #endif @@ -366,12 +370,13 @@ LockFile::Create(std::filesystem::path FileName, CbObject Payload, std::error_co return; } #elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - int Fd = open(FileName.c_str(), O_RDWR | O_CREAT, 0666); + int Fd = open(FileName.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, 0666); if (Fd < 0) { Ec = zen::MakeErrorCodeFromLastError(); return; } + fchmod(Fd, 0666); int LockRet = flock(Fd, LOCK_EX | LOCK_NB); if (LockRet < 0) diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index 6c137e128..758c0665b 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -388,7 +388,14 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize } #else // Attempt to exclusively create the file. - auto InternalCreateFile = [&] { return open(Name.ShardedPath.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0666); }; + auto InternalCreateFile = [&] { + int Fd = open(Name.ShardedPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666); + if (Fd >= 0) + { + fchmod(Fd, 0666); + } + return Fd; + }; int Fd = InternalCreateFile(); if (Fd < 0) { diff --git a/zenutil/cache/cachepolicy.cpp b/zenutil/cache/cachepolicy.cpp index 8c10ea674..3bca363bb 100644 --- a/zenutil/cache/cachepolicy.cpp +++ b/zenutil/cache/cachepolicy.cpp @@ -122,7 +122,7 @@ ConvertToUpstream(CachePolicy Policy) // Use the downstream value for all other flags. CachePolicy UpstreamPolicy = CachePolicy::None; - + if (EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) { UpstreamPolicy |= CachePolicy::QueryLocal; @@ -206,12 +206,6 @@ OptionalCacheRecordPolicy CacheRecordPolicy::Load(const CbObjectView Object) { std::string_view BasePolicyText = Object["BasePolicy"sv].AsString(); -#if BACKWARDS_COMPATABILITY_JAN2022 - if (BasePolicyText.empty()) - { - BasePolicyText = Object["DefaultValuePolicy"sv].AsString(); - } -#endif if (BasePolicyText.empty()) { return {}; @@ -228,14 +222,10 @@ CacheRecordPolicy::Load(const CbObjectView Object) return {}; } CachePolicy Policy = ParseCachePolicy(PolicyText); -#if BACKWARDS_COMPATABILITY_JAN2022 - Policy = Policy & CacheValuePolicy::PolicyMask; -#else if (EnumHasAnyFlags(Policy, ~CacheValuePolicy::PolicyMask)) { return {}; } -#endif Builder.AddValuePolicy(Id, Policy); } diff --git a/zenutil/include/zenutil/cache/cachepolicy.h b/zenutil/include/zenutil/cache/cachepolicy.h index efcc4fb49..9a745e42c 100644 --- a/zenutil/include/zenutil/cache/cachepolicy.h +++ b/zenutil/include/zenutil/cache/cachepolicy.h @@ -10,14 +10,6 @@ #include <gsl/gsl-lite.hpp> #include <span> - -#define BACKWARDS_COMPATABILITY_JAN2022 1 -#if BACKWARDS_COMPATABILITY_JAN2022 -# define BACKWARDS_COMPATABILITY_JAN2022_CODE(...) __VA_ARGS__ -#else -# define BACKWARDS_COMPATABILITY_JAN2022_CODE(...) -#endif - namespace zen::Private { class ICacheRecordPolicyShared; } diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp index 5bddc72bc..f49d5f6d8 100644 --- a/zenutil/zenserverprocess.cpp +++ b/zenutil/zenserverprocess.cpp @@ -159,11 +159,12 @@ ZenServerState::Initialize() ThrowLastError("Could not map view of Zen server state"); } #else - int Fd = shm_open("UnrealEngineZen", O_RDWR | O_CREAT, 0666); + int Fd = shm_open("/UnrealEngineZen", O_RDWR | O_CREAT | O_CLOEXEC, 0666); if (Fd < 0) { ThrowLastError("Could not open a shared memory object"); } + fchmod(Fd, 0666); void* hMap = (void*)intptr_t(Fd); int Result = ftruncate(Fd, MapSize); @@ -209,7 +210,7 @@ ZenServerState::InitializeReadOnly() ThrowLastError("Could not map view of Zen server state"); } #else - int Fd = shm_open("UnrealEngineZen", O_RDONLY, 0666); + int Fd = shm_open("/UnrealEngineZen", O_RDONLY | O_CLOEXEC, 0666); if (Fd < 0) { return false; |