diff options
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 220 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 430 | ||||
| -rw-r--r-- | zenserver/compute/apply.cpp | 2 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 6 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 148 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 19 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 2 | ||||
| -rw-r--r-- | zenstore/cidstore.cpp | 13 | ||||
| -rw-r--r-- | zenstore/include/zenstore/cidstore.h | 17 | ||||
| -rw-r--r-- | zenutil/include/zenserverprocess.h | 2 | ||||
| -rw-r--r-- | zenutil/zenserverprocess.cpp | 9 |
11 files changed, 701 insertions, 167 deletions
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 973ef874a..8a634107d 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -11,6 +11,8 @@ #include <zencore/fmtutils.h> #include <zencore/iohash.h> #include <zencore/logging.h> +#include <zencore/memory.h> +#include <zencore/stream.h> #include <zencore/string.h> #include <zencore/thread.h> #include <zencore/timer.h> @@ -34,6 +36,7 @@ #include <filesystem> #include <map> #include <random> +#include <span> #include <atlbase.h> #include <process.h> @@ -1091,7 +1094,7 @@ TEST_CASE("project.pipe") } # endif -TEST_CASE("z$.basic") +TEST_CASE("zcache.basic") { using namespace std::literals; @@ -1182,6 +1185,221 @@ TEST_CASE("z$.basic") } } +TEST_CASE("zcache.cbpackage") +{ + using namespace std::literals; + + auto CreateTestPackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage { + auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9})); + auto CompressedData = zen::CompressedBuffer::Compress(Data); + + OutAttachmentKey = zen::IoHash::FromBLAKE3(CompressedData.GetRawHash()); + + zen::CbWriter Obj; + Obj.BeginObject("obj"sv); + Obj.AddBinaryAttachment("data", OutAttachmentKey); + Obj.EndObject(); + + zen::CbPackage Package; + Package.SetObject(Obj.Save().AsObject()); + Package.AddAttachment(zen::CbAttachment(CompressedData)); + + return Package; + }; + + auto SerializeToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { + zen::MemoryOutStream MemStream; + zen::BinaryWriter Writer(MemStream); + + Package.Save(Writer); + + return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + }; + + auto IsEqual = [](zen::CbPackage Lhs, zen::CbPackage Rhs) -> bool { + std::span<const zen::CbAttachment> LhsAttachments = Lhs.GetAttachments(); + std::span<const zen::CbAttachment> RhsAttachments = Rhs.GetAttachments(); + + if (LhsAttachments.size() != LhsAttachments.size()) + { + return false; + } + + for (const zen::CbAttachment& LhsAttachment : LhsAttachments) + { + const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash()); + CHECK(RhsAttachment); + + zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress(); + CHECK(!LhsBuffer.IsNull()); + + zen::SharedBuffer RhsBuffer = RhsAttachment->AsCompressedBinary().Decompress(); + CHECK(!RhsBuffer.IsNull()); + + if (!LhsBuffer.GetView().EqualBytes(RhsBuffer.GetView())) + { + return false; + } + } + + return true; + }; + + SUBCASE("PUT/GET returns correct package") + { + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const uint16_t PortNumber = 13337; + const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber); + + ZenServerInstance Instance1(TestEnv); + Instance1.SetTestDir(TestDir); + Instance1.SpawnServer(PortNumber); + Instance1.WaitUntilReady(); + + const std::string_view Bucket = "mosdef"sv; + zen::IoHash Key; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + + // PUT + { + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Body.Data(), Body.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + // GET + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Response); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + } + + SUBCASE("PUT propagates upstream") + { + // Setup local and remote server + std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); + std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); + const uint16_t LocalPortNumber = 13337; + const uint16_t RemotePortNumber = 13338; + + const auto LocalBaseUri = "http://localhost:{}/z$"_format(LocalPortNumber); + const auto RemoteBaseUri = "http://localhost:{}/z$"_format(RemotePortNumber); + + ZenServerInstance RemoteInstance(TestEnv); + RemoteInstance.SetTestDir(RemoteDataDir); + RemoteInstance.SpawnServer(RemotePortNumber); + + ZenServerInstance LocalInstance(TestEnv); + LocalInstance.SetTestDir(LocalDataDir); + LocalInstance.SpawnServer(LocalPortNumber, + "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(RemotePortNumber)); + + LocalInstance.WaitUntilReady(); + RemoteInstance.WaitUntilReady(); + + const std::string_view Bucket = "mosdef"sv; + zen::IoHash Key; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + + // Store the cache record package in the local instance + { + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)}, + cpr::Body{(const char*)Body.Data(), Body.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + + CHECK(Result.status_code == 201); + } + + // The cache record can be retrieved as a package from the local instance + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Body); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + + // The cache record can be retrieved as a package from the remote instance + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(RemoteBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Body); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + } + + SUBCASE("GET finds upstream when missing in local") + { + // Setup local and remote server + std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); + std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); + const uint16_t LocalPortNumber = 13337; + const uint16_t RemotePortNumber = 13338; + + const auto LocalBaseUri = "http://localhost:{}/z$"_format(LocalPortNumber); + const auto RemoteBaseUri = "http://localhost:{}/z$"_format(RemotePortNumber); + + ZenServerInstance RemoteInstance(TestEnv); + RemoteInstance.SetTestDir(RemoteDataDir); + RemoteInstance.SpawnServer(RemotePortNumber); + + ZenServerInstance LocalInstance(TestEnv); + LocalInstance.SetTestDir(LocalDataDir); + LocalInstance.SpawnServer(LocalPortNumber, + "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(RemotePortNumber)); + + LocalInstance.WaitUntilReady(); + RemoteInstance.WaitUntilReady(); + + const std::string_view Bucket = "mosdef"sv; + zen::IoHash Key; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + + // Store the cache record package in upstream cache + { + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(RemoteBaseUri, Bucket, Key)}, + cpr::Body{(const char*)Body.Data(), Body.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + + CHECK(Result.status_code == 201); + } + + // The cache record can be retrieved as a package from the local cache + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Body); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); + } + } +} + struct RemoteExecutionRequest { RemoteExecutionRequest(std::string_view Host, int Port, std::filesystem::path& TreePath) diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 9600c5f8a..cf7deaa93 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -1,10 +1,12 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/stream.h> #include <zencore/timer.h> #include <zenhttp/httpserver.h> @@ -15,6 +17,8 @@ #include "upstream/zen.h" #include "zenstore/cidstore.h" +#include <zencore/compactbinarypackage.h> + #include <algorithm> #include <atomic> #include <filesystem> @@ -37,7 +41,6 @@ HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InC , m_CidStore(InCidStore) , m_UpstreamCache(std::move(UpstreamCache)) { - // m_Log.set_level(spdlog::level::debug); } HttpStructuredCacheService::~HttpStructuredCacheService() @@ -127,14 +130,17 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req case kHead: case kGet: { + const ZenContentType AcceptType = Request.AcceptContentType(); + ZenCacheValue Value; bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); bool InUpstreamCache = false; if (!Success && m_UpstreamCache) { - const ZenContentType CacheRecordType = - Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : ZenContentType::kCbObject; + const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary + : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage + : ZenContentType::kCbObject; if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); UpstreamResult.Success) @@ -143,43 +149,85 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Success = true; InUpstreamCache = true; - if (CacheRecordType == ZenContentType::kCbObject) + if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject) { - const zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(MemoryView(UpstreamResult.Value.Data(), UpstreamResult.Value.Size()), - zen::CbValidateMode::All); - - if (ValidationResult == CbValidateError::None) + if (CacheRecordType == ZenContentType::kCbObject) { - zen::CbObjectView Cbo(UpstreamResult.Value.Data()); + const zen::CbValidateError ValidationResult = + zen::ValidateCompactBinary(UpstreamResult.Value, zen::CbValidateMode::All); - std::vector<IoHash> References; - Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + if (ValidationResult == CbValidateError::None) + { + zen::CbObjectView CacheRecord(UpstreamResult.Value.Data()); - if (!References.empty()) + zen::CbObjectWriter IndexData; + IndexData.BeginArray("references"); + CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); }); + IndexData.EndArray(); + + Value.IndexData = IndexData.Save(); + } + else { - zen::CbObjectWriter Idx; - Idx.BeginArray("references"); - for (const IoHash& Hash : References) - { - Idx.AddHash(Hash); - } - Idx.EndArray(); - - Value.IndexData = Idx.Save(); + Success = false; + ZEN_WARN("Get - cache record '{}/{}' FAILED, invalid compact binary object from upstream", + Ref.BucketSegment, + Ref.HashKey); } } - else + + if (Success) { - Value.Value = IoBuffer(); - Success = false; - ZEN_WARN("Upstream cache record '{}/{}' failed validation", Ref.BucketSegment, Ref.HashKey); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); } } - - if (Success) + else { - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); + ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage); + + CbPackage Package; + if (Package.TryLoad(UpstreamResult.Value)) + { + uint32_t AttachmentCount = 0; + uint32_t FoundCount = 0; + CbObject CacheRecord = Package.GetObject(); + + CacheRecord.IterateAttachments( + [this, &Package, &Ref, &AttachmentCount, &FoundCount](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) + { + if (CompressedBuffer Chunk = Attachment->AsCompressedBinary()) + { + m_CidStore.AddChunk(Chunk); + FoundCount++; + } + else + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, upstream attachment not compressed", + Ref.BucketSegment, + Ref.HashKey); + } + } + AttachmentCount++; + }); + + if (FoundCount == AttachmentCount) + { + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); + } + else + { + Success = false; + ZEN_WARN("GET - cache record '{}/{}' FAILED, attachments missing in upstream package", + Ref.BucketSegment, + Ref.HashKey); + } + } + else + { + Success = false; + ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey); + } } } } @@ -196,14 +244,74 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Request.SetSuppressResponseBody(); } - ZEN_DEBUG("HIT - '{}/{}' ({} bytes {}) ({})", - Ref.BucketSegment, - Ref.HashKey, - Value.Value.Size(), - Value.Value.GetContentType(), - InUpstreamCache ? "upstream" : "local"); + if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache) + { + CbObjectView CacheRecord(Value.Value.Data()); + + const zen::CbValidateError ValidationResult = zen::ValidateCompactBinary(Value.Value, zen::CbValidateMode::All); + + if (ValidationResult != CbValidateError::None) + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey); + + return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); + } + + uint32_t AttachmentCount = 0; + uint32_t FoundCount = 0; + uint64_t AttachmentBytes = 0ull; + + CbPackage Package; + + CacheRecord.IterateAttachments( + [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + AttachmentBytes += Chunk.Size(); + FoundCount++; + } + AttachmentCount++; + }); + + if (FoundCount != AttachmentCount) + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments", + Ref.BucketSegment, + Ref.HashKey, + FoundCount, + AttachmentCount); + + return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + } + + Package.SetObject(LoadCompactBinaryObject(Value.Value)); + + ZEN_DEBUG("HIT - '{}/{}' {}, {} attachments ({})", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(AttachmentBytes + Value.Value.Size()), + AttachmentCount, + InUpstreamCache ? "UPSTREAM" : "LOCAL"); + + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + Package.Save(Writer); + + IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + return Request.WriteResponse(zen::HttpResponseCode::OK, HttpContentType::kCbPackage, Response); + } + else + { + ZEN_DEBUG("HIT - '{}/{}' {} ({})", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Value.Value.Size()), + InUpstreamCache ? "UPSTREAM" : "LOCAL"); + + return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + } } break; @@ -218,28 +326,11 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req const HttpContentType ContentType = Request.RequestContentType(); - bool IsCompactBinary = false; - - switch (ContentType) - { - case HttpContentType::kUnknownContentType: - case HttpContentType::kBinary: - IsCompactBinary = false; - break; - - case HttpContentType::kCbObject: - IsCompactBinary = true; - break; - - default: - return Request.WriteResponse(zen::HttpResponseCode::BadRequest); - } - - if (!IsCompactBinary) + if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType) { // TODO: create a cache record and put value in CAS? m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); - ZEN_DEBUG("PUT (binary) - '{}/{}' ({} bytes, {})", Ref.BucketSegment, Ref.HashKey, Body.Size(), Body.GetContentType()); + ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size())); if (m_UpstreamCache) { @@ -249,86 +340,193 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req return Request.WriteResponse(zen::HttpResponseCode::Created); } - - // Validate payload before accessing it - const zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); - - if (ValidationResult != CbValidateError::None) + else if (ContentType == HttpContentType::kCbObject) { - ZEN_WARN("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size()); + // Validate payload before accessing it + const zen::CbValidateError ValidationResult = + zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); - // TODO: add details in response, kText || kCbObject? - return Request.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Compact binary validation failed"sv); - } + if (ValidationResult != CbValidateError::None) + { + ZEN_WARN("PUT - cache record '{}/{}' ({} bytes) FAILED, invalid compact binary", + Ref.BucketSegment, + Ref.HashKey, + Body.Size()); - // Extract referenced payload hashes - zen::CbObjectView Cbo(Body.Data()); + // TODO: add details in response, kText || kCbObject? + return Request.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Compact binary validation failed"sv); + } - std::vector<IoHash> References; - std::vector<IoHash> MissingRefs; - Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + // Extract referenced payload hashes + zen::CbObjectView Cbo(Body.Data()); - ZenCacheValue CacheValue; - CacheValue.Value = Body; + std::vector<IoHash> References; + std::vector<IoHash> MissingRefs; + Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); - if (!References.empty()) - { - zen::CbObjectWriter Idx; - Idx.BeginArray("references"); + ZenCacheValue CacheValue; + CacheValue.Value = Body; - for (const IoHash& Hash : References) + if (!References.empty()) { - Idx.AddHash(Hash); - if (!m_CidStore.ContainsChunk(Hash)) + zen::CbObjectWriter Idx; + Idx.BeginArray("references"); + + for (const IoHash& Hash : References) { - MissingRefs.push_back(Hash); + Idx.AddHash(Hash); + if (!m_CidStore.ContainsChunk(Hash)) + { + MissingRefs.push_back(Hash); + } } + + Idx.EndArray(); + + CacheValue.IndexData = Idx.Save(); } - Idx.EndArray(); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); - CacheValue.IndexData = Idx.Save(); - } + ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} attachments missing", + Ref.BucketSegment, + Ref.HashKey, + zen::NiceBytes(CacheValue.Value.Size()), + MissingRefs.size(), + References.size()); - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); + if (MissingRefs.empty()) + { + // Only enqueue valid cache records, i.e. all referenced payloads exists + if (m_UpstreamCache) + { + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(References)}); + } - ZEN_DEBUG("PUT (cache record) - '{}/{}' ({} bytes, {}, ({}/{} refs/missing))", - Ref.BucketSegment, - Ref.HashKey, - CacheValue.Value.Size(), - CacheValue.Value.GetContentType(), - References.size(), - MissingRefs.size()); + return Request.WriteResponse(zen::HttpResponseCode::Created); + } + else + { + // TODO: Binary attachments? + zen::CbObjectWriter Response; + Response.BeginArray("needs"); + for (const IoHash& MissingRef : MissingRefs) + { + Response.AddHash(MissingRef); + ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef); + } + Response.EndArray(); - if (MissingRefs.empty()) + // Return Created | BadRequest? + return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save()); + } + } + else if (ContentType == HttpContentType::kCbPackage) { - // Only enqueue valid cache records, i.e. all referenced payloads exists + CbPackage Package; + + if (!Package.TryLoad(Body)) + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey); + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"); + } + + CbObject CacheRecord = Package.GetObject(); + + int32_t AttachmentCount = 0; + int32_t NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + bool AttachmentsOk = true; + + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + + std::vector<IoHash> PayloadIds; + PayloadIds.reserve(Attachments.size()); + + CacheRecord.IterateAttachments([this, + &Ref, + &Package, + &AttachmentsOk, + &AttachmentCount, + &TotalAttachmentBytes, + &TotalNewBytes, + &NewAttachmentCount, + &PayloadIds](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) + { + if (Attachment->IsCompressedBinary()) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + const uint64_t ChunkSize = Chunk.GetCompressed().GetSize(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + + PayloadIds.emplace_back(InsertResult.DecompressedId); + + if (InsertResult.New) + { + TotalNewBytes += ChunkSize; + ++NewAttachmentCount; + } + + TotalAttachmentBytes += ChunkSize; + AttachmentCount++; + } + else + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, attachment '{}' is not compressed", + Ref.BucketSegment, + Ref.HashKey, + AttachmentHash.AsHash()); + AttachmentsOk = false; + } + } + else + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, missing attachment '{}'", + Ref.BucketSegment, + Ref.HashKey, + AttachmentHash.AsHash()); + AttachmentsOk = false; + } + }); + + if (!AttachmentsOk) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"); + } + + IoBuffer CacheRecordChunk = CacheRecord.GetBuffer().AsIoBuffer(); + const uint64_t TotalPackageBytes = TotalAttachmentBytes + CacheRecordChunk.Size(); + + ZenCacheValue CacheValue{.Value = CacheRecordChunk}; + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); + if (m_UpstreamCache) { - auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, .CacheKey = {Ref.BucketSegment, Ref.HashKey}, - .PayloadIds = std::move(References)}); + .PayloadIds = std::move(PayloadIds)}); } + ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} ({}/{}) new attachments", + Ref.BucketSegment, + Ref.HashKey, + zen::NiceBytes(TotalPackageBytes), + NewAttachmentCount, + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + zen::NiceBytes(TotalAttachmentBytes)); + return Request.WriteResponse(zen::HttpResponseCode::Created); } else { - // TODO: Binary attachments? - zen::CbObjectWriter Response; - Response.BeginArray("needs"); - for (const IoHash& MissingRef : MissingRefs) - { - Response.AddHash(MissingRef); - ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef); - } - Response.EndArray(); - - // Return Created | BadRequest? - return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save()); + return Request.WriteResponse(zen::HttpResponseCode::BadRequest); } } break; @@ -387,13 +585,13 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re return Request.WriteResponse(zen::HttpResponseCode::NotFound); } - ZEN_DEBUG("HIT - '{}/{}/{}' ({} bytes, {}) ({})", + ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, - Payload.Size(), + NiceBytes(Payload.Size()), Payload.GetContentType(), - InUpstreamCache ? "upstream" : "local"); + InUpstreamCache ? "UPSTREAM" : "LOCAL"); if (Verb == kHead) { @@ -438,13 +636,13 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); - ZEN_DEBUG("PUT ({}) - '{}/{}/{}' ({} bytes, {})", - Result.New ? "NEW" : "OLD", + ZEN_DEBUG("PUT - payload '{}/{}/{}' {} (type: {}) {}", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, - Body.Size(), - Body.GetContentType()); + NiceBytes(Body.Size()), + Body.GetContentType(), + Result.New ? "NEW" : "OLD"); if (Result.New) { diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index e40c6918d..3197eaee4 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -588,7 +588,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, TotalAttachmentBytes += CompressedSize; ++AttachmentCount; - const CasStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView); + const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView); if (InsertResult.New) { diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 0af92da6d..4a5467648 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -163,7 +163,7 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; } CloudCacheResult @@ -194,7 +194,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; } CloudCacheResult @@ -215,7 +215,7 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; } std::vector<IoHash> diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 97b222a68..38d30a795 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -4,8 +4,14 @@ #include "jupiter.h" #include "zen.h" +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalidation.h> #include <zencore/fmtutils.h> +#include <zencore/stream.h> #include <zencore/timer.h> + #include <zenstore/cas.h> #include <zenstore/cidstore.h> @@ -121,7 +127,45 @@ namespace detail { } else { - Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, Type); + const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type; + Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, AcceptType); + + if (Result.Success && Type == ZenContentType::kCbPackage) + { + CbPackage Package; + + const CbValidateError ValidationResult = zen::ValidateCompactBinary(Result.Response, CbValidateMode::All); + if (Result.Success = ValidationResult == CbValidateError::None; Result.Success) + { + CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); + + CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { + CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; + + if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) + { + Package.AddAttachment(CbAttachment(Chunk)); + } + else + { + Result.Success = false; + } + }); + + Package.SetObject(CacheRecord); + } + + if (Result.Success) + { + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + Package.Save(Writer); + + Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + Result.Bytes = MemStream.Size(); + } + } } return {.Value = Result.Response, @@ -305,37 +349,74 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) + if (CacheRecord.Type == ZenContentType::kCbPackage) { - Result.Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + zen::CbPackage Package; + Package.SetObject(CbObject(SharedBuffer(RecordValue))); + + for (const IoBuffer& Payload : Payloads) { - Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - CacheRecord.PayloadIds[Idx], - Payloads[Idx]); + if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload))) + { + Package.AddAttachment(CbAttachment(AttachmentBuffer)); + } + else + { + return {.Reason = std::string("invalid payload buffer"), .Success = false}; + } } - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + Package.Save(Writer); + IoBuffer PackagePayload(IoBuffer::Wrap, MemStream.Data(), MemStream.Size()); - if (!Result.Success) + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - return {.Reason = "Failed to upload payload", - .Bytes = TotalBytes, - .ElapsedSeconds = TotalElapsedSeconds, - .Success = false}; + Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + PackagePayload, + CacheRecord.Type); } - } - Result.Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); + TotalBytes = Result.Bytes; + TotalElapsedSeconds = Result.ElapsedSeconds; } + else + { + for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) + { + Result.Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + CacheRecord.PayloadIds[Idx], + Payloads[Idx]); + } - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + + if (!Result.Success) + { + return {.Reason = "Failed to upload payload", + .Bytes = TotalBytes, + .ElapsedSeconds = TotalElapsedSeconds, + .Success = false}; + } + } + + Result.Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = + Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); + } + + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + } return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; } @@ -448,7 +529,6 @@ public: , m_CacheStore(CacheStore) , m_CidStore(CidStore) { - ZEN_ASSERT(m_Options.ThreadCount > 0); } virtual ~DefaultUpstreamCache() { Shutdown(); } @@ -509,7 +589,15 @@ public: { if (m_IsRunning.load()) { - m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + if (!m_UpstreamThreads.empty()) + { + m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + } + else + { + ProcessCacheRecord(std::move(CacheRecord)); + } + return {.Success = true}; } @@ -548,11 +636,19 @@ private: for (auto& Endpoint : m_Endpoints) { - if (PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); - Result.Success) + const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + if (Result.Success) { m_Stats.Add(*Endpoint, Result); } + else + { + ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Endpoint->DisplayName(), + Result.Reason); + } } } diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index b553558e7..55ddd310f 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -390,7 +390,10 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetHeader(cpr::Header{{"Accept", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}}); + Session.SetHeader(cpr::Header{{"Accept", + Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg" + : Type == ZenContentType::kCbObject ? "application/x-ue-cb" + : "application/octet-stream"}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -430,14 +433,18 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetHeader( - cpr::Header{{"Content-Type", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}}); + Session.SetHeader(cpr::Header{{"Content-Type", + Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg" + : Type == ZenContentType::kCbObject ? "application/x-ue-cb" + : "application/octet-stream"}}); Session.SetBody(cpr::Body{static_cast<const char*>(Value.Data()), Value.Size()}); cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; } ZenCacheResult @@ -455,7 +462,9 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; } } // namespace zen diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index aa4a42fd7..f1960ab36 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -166,7 +166,7 @@ public: zen::UpstreamCacheOptions UpstreamOptions; - if (UpstreamConfig.UpstreamThreadCount > 0 && UpstreamConfig.UpstreamThreadCount < 32) + if (UpstreamConfig.UpstreamThreadCount < 32) { UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); } diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index e6c7f98ee..100054a0e 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -28,15 +28,16 @@ struct CidStore::CidState RwLock m_Lock; tsl::robin_map<IoHash, IoHash> m_CidMap; - CasStore::InsertResult AddChunk(CompressedBuffer& ChunkData) + CidStore::InsertResult AddChunk(CompressedBuffer& ChunkData) { - IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer(); - IoHash CompressedHash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); + const IoHash DecompressedId = IoHash::FromBLAKE3(ChunkData.GetRawHash()); + IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer(); + IoHash CompressedHash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, CompressedHash); - AddCompressedCid(IoHash::FromBLAKE3(ChunkData.GetRawHash()), CompressedHash); + AddCompressedCid(DecompressedId, CompressedHash); - return Result; + return {.DecompressedId = DecompressedId, .CompressedHash = CompressedHash, .New = Result.New}; } void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed) @@ -103,7 +104,7 @@ CidStore::~CidStore() { } -CasStore::InsertResult +CidStore::InsertResult CidStore::AddChunk(CompressedBuffer& ChunkData) { return m_Impl->AddChunk(ChunkData); diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h index 62d642ad1..76a33c915 100644 --- a/zenstore/include/zenstore/cidstore.h +++ b/zenstore/include/zenstore/cidstore.h @@ -31,11 +31,18 @@ public: CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir); ~CidStore(); - CasStore::InsertResult AddChunk(CompressedBuffer& ChunkData); - void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed); - IoBuffer FindChunkByCid(const IoHash& DecompressedId); - bool ContainsChunk(const IoHash& DecompressedId); - void Flush(); + struct InsertResult + { + IoHash DecompressedId; + IoHash CompressedHash; + bool New = false; + }; + + InsertResult AddChunk(CompressedBuffer& ChunkData); + void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed); + IoBuffer FindChunkByCid(const IoHash& DecompressedId); + bool ContainsChunk(const IoHash& DecompressedId); + void Flush(); // TODO: add batch filter support diff --git a/zenutil/include/zenserverprocess.h b/zenutil/include/zenserverprocess.h index 7fcacf788..b659f6e58 100644 --- a/zenutil/include/zenserverprocess.h +++ b/zenutil/include/zenserverprocess.h @@ -55,7 +55,7 @@ struct ZenServerInstance m_TestDir = TestDir; } - void SpawnServer(int BasePort = 0); + void SpawnServer(int BasePort = 0, std::string_view AdditionalServerArgs = std::string_view()); void AttachToRunningServer(int BasePort = 0); diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp index 5142c6a54..2f2b3bd33 100644 --- a/zenutil/zenserverprocess.cpp +++ b/zenutil/zenserverprocess.cpp @@ -395,7 +395,7 @@ ZenServerInstance::Shutdown() } void -ZenServerInstance::SpawnServer(int BasePort) +ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerArgs) { ZEN_ASSERT(!m_Process.IsValid()); // Only spawn once @@ -414,7 +414,7 @@ ZenServerInstance::SpawnServer(int BasePort) zen::ExtendableStringBuilder<32> LogId; LogId << "Zen" << ChildId; - zen::ExtendableWideStringBuilder<128> CommandLine; + zen::ExtendableWideStringBuilder<512> CommandLine; CommandLine << "\""; CommandLine.Append(Executable.c_str()); CommandLine << "\""; @@ -455,6 +455,11 @@ ZenServerInstance::SpawnServer(int BasePort) CommandLine << " --mesh"; } + if (!AdditionalServerArgs.empty()) + { + CommandLine << " " << AdditionalServerArgs; + } + std::filesystem::path CurrentDirectory = std::filesystem::current_path(); ZEN_DEBUG("Spawning server '{}'", LogId); |