diff options
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 430 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 1 | ||||
| -rw-r--r-- | zenserver/compute/apply.cpp | 54 | ||||
| -rw-r--r-- | zenserver/compute/apply.h | 1 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 6 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 148 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 30 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 6 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 2 |
9 files changed, 508 insertions, 170 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 9600c5f8a..cf7deaa93 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -1,10 +1,12 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/stream.h> #include <zencore/timer.h> #include <zenhttp/httpserver.h> @@ -15,6 +17,8 @@ #include "upstream/zen.h" #include "zenstore/cidstore.h" +#include <zencore/compactbinarypackage.h> + #include <algorithm> #include <atomic> #include <filesystem> @@ -37,7 +41,6 @@ HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InC , m_CidStore(InCidStore) , m_UpstreamCache(std::move(UpstreamCache)) { - // m_Log.set_level(spdlog::level::debug); } HttpStructuredCacheService::~HttpStructuredCacheService() @@ -127,14 +130,17 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req case kHead: case kGet: { + const ZenContentType AcceptType = Request.AcceptContentType(); + ZenCacheValue Value; bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); bool InUpstreamCache = false; if (!Success && m_UpstreamCache) { - const ZenContentType CacheRecordType = - Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : ZenContentType::kCbObject; + const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary + : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage + : ZenContentType::kCbObject; if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); UpstreamResult.Success) @@ -143,43 +149,85 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Success = true; InUpstreamCache = true; - if (CacheRecordType == ZenContentType::kCbObject) + if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject) { - const zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(MemoryView(UpstreamResult.Value.Data(), UpstreamResult.Value.Size()), - zen::CbValidateMode::All); - - if (ValidationResult == CbValidateError::None) + if (CacheRecordType == ZenContentType::kCbObject) { - zen::CbObjectView Cbo(UpstreamResult.Value.Data()); + const zen::CbValidateError ValidationResult = + zen::ValidateCompactBinary(UpstreamResult.Value, zen::CbValidateMode::All); - std::vector<IoHash> References; - Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + if (ValidationResult == CbValidateError::None) + { + zen::CbObjectView CacheRecord(UpstreamResult.Value.Data()); - if (!References.empty()) + zen::CbObjectWriter IndexData; + IndexData.BeginArray("references"); + CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); }); + IndexData.EndArray(); + + Value.IndexData = IndexData.Save(); + } + else { - zen::CbObjectWriter Idx; - Idx.BeginArray("references"); - for (const IoHash& Hash : References) - { - Idx.AddHash(Hash); - } - Idx.EndArray(); - - Value.IndexData = Idx.Save(); + Success = false; + ZEN_WARN("Get - cache record '{}/{}' FAILED, invalid compact binary object from upstream", + Ref.BucketSegment, + Ref.HashKey); } } - else + + if (Success) { - Value.Value = IoBuffer(); - Success = false; - ZEN_WARN("Upstream cache record '{}/{}' failed validation", Ref.BucketSegment, Ref.HashKey); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); } } - - if (Success) + else { - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); + ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage); + + CbPackage Package; + if (Package.TryLoad(UpstreamResult.Value)) + { + uint32_t AttachmentCount = 0; + uint32_t FoundCount = 0; + CbObject CacheRecord = Package.GetObject(); + + CacheRecord.IterateAttachments( + [this, &Package, &Ref, &AttachmentCount, &FoundCount](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) + { + if (CompressedBuffer Chunk = Attachment->AsCompressedBinary()) + { + m_CidStore.AddChunk(Chunk); + FoundCount++; + } + else + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, upstream attachment not compressed", + Ref.BucketSegment, + Ref.HashKey); + } + } + AttachmentCount++; + }); + + if (FoundCount == AttachmentCount) + { + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); + } + else + { + Success = false; + ZEN_WARN("GET - cache record '{}/{}' FAILED, attachments missing in upstream package", + Ref.BucketSegment, + Ref.HashKey); + } + } + else + { + Success = false; + ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey); + } } } } @@ -196,14 +244,74 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Request.SetSuppressResponseBody(); } - ZEN_DEBUG("HIT - '{}/{}' ({} bytes {}) ({})", - Ref.BucketSegment, - Ref.HashKey, - Value.Value.Size(), - Value.Value.GetContentType(), - InUpstreamCache ? "upstream" : "local"); + if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache) + { + CbObjectView CacheRecord(Value.Value.Data()); + + const zen::CbValidateError ValidationResult = zen::ValidateCompactBinary(Value.Value, zen::CbValidateMode::All); + + if (ValidationResult != CbValidateError::None) + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey); + + return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); + } + + uint32_t AttachmentCount = 0; + uint32_t FoundCount = 0; + uint64_t AttachmentBytes = 0ull; + + CbPackage Package; + + CacheRecord.IterateAttachments( + [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + AttachmentBytes += Chunk.Size(); + FoundCount++; + } + AttachmentCount++; + }); + + if (FoundCount != AttachmentCount) + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments", + Ref.BucketSegment, + Ref.HashKey, + FoundCount, + AttachmentCount); + + return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + } + + Package.SetObject(LoadCompactBinaryObject(Value.Value)); + + ZEN_DEBUG("HIT - '{}/{}' {}, {} attachments ({})", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(AttachmentBytes + Value.Value.Size()), + AttachmentCount, + InUpstreamCache ? "UPSTREAM" : "LOCAL"); + + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + Package.Save(Writer); + + IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + return Request.WriteResponse(zen::HttpResponseCode::OK, HttpContentType::kCbPackage, Response); + } + else + { + ZEN_DEBUG("HIT - '{}/{}' {} ({})", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Value.Value.Size()), + InUpstreamCache ? "UPSTREAM" : "LOCAL"); + + return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + } } break; @@ -218,28 +326,11 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req const HttpContentType ContentType = Request.RequestContentType(); - bool IsCompactBinary = false; - - switch (ContentType) - { - case HttpContentType::kUnknownContentType: - case HttpContentType::kBinary: - IsCompactBinary = false; - break; - - case HttpContentType::kCbObject: - IsCompactBinary = true; - break; - - default: - return Request.WriteResponse(zen::HttpResponseCode::BadRequest); - } - - if (!IsCompactBinary) + if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType) { // TODO: create a cache record and put value in CAS? m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); - ZEN_DEBUG("PUT (binary) - '{}/{}' ({} bytes, {})", Ref.BucketSegment, Ref.HashKey, Body.Size(), Body.GetContentType()); + ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size())); if (m_UpstreamCache) { @@ -249,86 +340,193 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req return Request.WriteResponse(zen::HttpResponseCode::Created); } - - // Validate payload before accessing it - const zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); - - if (ValidationResult != CbValidateError::None) + else if (ContentType == HttpContentType::kCbObject) { - ZEN_WARN("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size()); + // Validate payload before accessing it + const zen::CbValidateError ValidationResult = + zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); - // TODO: add details in response, kText || kCbObject? - return Request.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Compact binary validation failed"sv); - } + if (ValidationResult != CbValidateError::None) + { + ZEN_WARN("PUT - cache record '{}/{}' ({} bytes) FAILED, invalid compact binary", + Ref.BucketSegment, + Ref.HashKey, + Body.Size()); - // Extract referenced payload hashes - zen::CbObjectView Cbo(Body.Data()); + // TODO: add details in response, kText || kCbObject? + return Request.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Compact binary validation failed"sv); + } - std::vector<IoHash> References; - std::vector<IoHash> MissingRefs; - Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + // Extract referenced payload hashes + zen::CbObjectView Cbo(Body.Data()); - ZenCacheValue CacheValue; - CacheValue.Value = Body; + std::vector<IoHash> References; + std::vector<IoHash> MissingRefs; + Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); - if (!References.empty()) - { - zen::CbObjectWriter Idx; - Idx.BeginArray("references"); + ZenCacheValue CacheValue; + CacheValue.Value = Body; - for (const IoHash& Hash : References) + if (!References.empty()) { - Idx.AddHash(Hash); - if (!m_CidStore.ContainsChunk(Hash)) + zen::CbObjectWriter Idx; + Idx.BeginArray("references"); + + for (const IoHash& Hash : References) { - MissingRefs.push_back(Hash); + Idx.AddHash(Hash); + if (!m_CidStore.ContainsChunk(Hash)) + { + MissingRefs.push_back(Hash); + } } + + Idx.EndArray(); + + CacheValue.IndexData = Idx.Save(); } - Idx.EndArray(); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); - CacheValue.IndexData = Idx.Save(); - } + ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} attachments missing", + Ref.BucketSegment, + Ref.HashKey, + zen::NiceBytes(CacheValue.Value.Size()), + MissingRefs.size(), + References.size()); - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); + if (MissingRefs.empty()) + { + // Only enqueue valid cache records, i.e. all referenced payloads exists + if (m_UpstreamCache) + { + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(References)}); + } - ZEN_DEBUG("PUT (cache record) - '{}/{}' ({} bytes, {}, ({}/{} refs/missing))", - Ref.BucketSegment, - Ref.HashKey, - CacheValue.Value.Size(), - CacheValue.Value.GetContentType(), - References.size(), - MissingRefs.size()); + return Request.WriteResponse(zen::HttpResponseCode::Created); + } + else + { + // TODO: Binary attachments? + zen::CbObjectWriter Response; + Response.BeginArray("needs"); + for (const IoHash& MissingRef : MissingRefs) + { + Response.AddHash(MissingRef); + ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef); + } + Response.EndArray(); - if (MissingRefs.empty()) + // Return Created | BadRequest? + return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save()); + } + } + else if (ContentType == HttpContentType::kCbPackage) { - // Only enqueue valid cache records, i.e. all referenced payloads exists + CbPackage Package; + + if (!Package.TryLoad(Body)) + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey); + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"); + } + + CbObject CacheRecord = Package.GetObject(); + + int32_t AttachmentCount = 0; + int32_t NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + bool AttachmentsOk = true; + + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + + std::vector<IoHash> PayloadIds; + PayloadIds.reserve(Attachments.size()); + + CacheRecord.IterateAttachments([this, + &Ref, + &Package, + &AttachmentsOk, + &AttachmentCount, + &TotalAttachmentBytes, + &TotalNewBytes, + &NewAttachmentCount, + &PayloadIds](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) + { + if (Attachment->IsCompressedBinary()) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + const uint64_t ChunkSize = Chunk.GetCompressed().GetSize(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + + PayloadIds.emplace_back(InsertResult.DecompressedId); + + if (InsertResult.New) + { + TotalNewBytes += ChunkSize; + ++NewAttachmentCount; + } + + TotalAttachmentBytes += ChunkSize; + AttachmentCount++; + } + else + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, attachment '{}' is not compressed", + Ref.BucketSegment, + Ref.HashKey, + AttachmentHash.AsHash()); + AttachmentsOk = false; + } + } + else + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, missing attachment '{}'", + Ref.BucketSegment, + Ref.HashKey, + AttachmentHash.AsHash()); + AttachmentsOk = false; + } + }); + + if (!AttachmentsOk) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"); + } + + IoBuffer CacheRecordChunk = CacheRecord.GetBuffer().AsIoBuffer(); + const uint64_t TotalPackageBytes = TotalAttachmentBytes + CacheRecordChunk.Size(); + + ZenCacheValue CacheValue{.Value = CacheRecordChunk}; + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); + if (m_UpstreamCache) { - auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, .CacheKey = {Ref.BucketSegment, Ref.HashKey}, - .PayloadIds = std::move(References)}); + .PayloadIds = std::move(PayloadIds)}); } + ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} ({}/{}) new attachments", + Ref.BucketSegment, + Ref.HashKey, + zen::NiceBytes(TotalPackageBytes), + NewAttachmentCount, + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + zen::NiceBytes(TotalAttachmentBytes)); + return Request.WriteResponse(zen::HttpResponseCode::Created); } else { - // TODO: Binary attachments? - zen::CbObjectWriter Response; - Response.BeginArray("needs"); - for (const IoHash& MissingRef : MissingRefs) - { - Response.AddHash(MissingRef); - ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef); - } - Response.EndArray(); - - // Return Created | BadRequest? - return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save()); + return Request.WriteResponse(zen::HttpResponseCode::BadRequest); } } break; @@ -387,13 +585,13 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re return Request.WriteResponse(zen::HttpResponseCode::NotFound); } - ZEN_DEBUG("HIT - '{}/{}/{}' ({} bytes, {}) ({})", + ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, - Payload.Size(), + NiceBytes(Payload.Size()), Payload.GetContentType(), - InUpstreamCache ? "upstream" : "local"); + InUpstreamCache ? "UPSTREAM" : "LOCAL"); if (Verb == kHead) { @@ -438,13 +636,13 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); - ZEN_DEBUG("PUT ({}) - '{}/{}/{}' ({} bytes, {})", - Result.New ? "NEW" : "OLD", + ZEN_DEBUG("PUT - payload '{}/{}/{}' {} (type: {}) {}", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, - Body.Size(), - Body.GetContentType()); + NiceBytes(Body.Size()), + Body.GetContentType(), + Result.New ? "NEW" : "OLD"); if (Result.New) { diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index c8c959569..8289fd700 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -74,6 +74,7 @@ private: void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); + spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; ZenCacheStore& m_CacheStore; zen::CasStore& m_CasStore; diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index 7b76bb80b..3197eaee4 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -588,7 +588,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, TotalAttachmentBytes += CompressedSize; ++AttachmentCount; - const CasStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView); + const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView); if (InsertResult.New) { @@ -659,16 +659,21 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) { CbObjectView ExecEntry = It.AsObjectView(); - std::string_view Name = ExecEntry["name"sv].AsString(); - const IoHash Hash = ExecEntry["hash"sv].AsHash(); - const uint64_t Size = ExecEntry["size"sv].AsUInt64(); + std::string_view Name = ExecEntry["name"sv].AsString(); + const IoHash ChunkHash = ExecEntry["hash"sv].AsHash(); + const uint64_t Size = ExecEntry["size"sv].AsUInt64(); std::filesystem::path FilePath{SandboxPath / Name}; - IoBuffer DataBuffer = m_CasStore.FindChunk(Hash); + IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkHash); if (!DataBuffer) { - throw std::runtime_error("worker CAS chunk '{}' missing"_format(Hash)); + throw std::runtime_error("worker CAS chunk '{}' missing"_format(ChunkHash)); + } + + if (DataBuffer.Size() != Size) + { + throw std::runtime_error("worker CAS chunk '{}' size: {}, action spec expected {}"_format(ChunkHash, DataBuffer.Size(), Size)); } zen::WriteFile(FilePath, DataBuffer); @@ -685,16 +690,21 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) { CbObjectView FileEntry = It.AsObjectView(); - std::string_view Name = FileEntry["name"sv].AsString(); - const IoHash Hash = FileEntry["hash"sv].AsHash(); - const uint64_t Size = FileEntry["size"sv].AsUInt64(); + std::string_view Name = FileEntry["name"sv].AsString(); + const IoHash ChunkHash = FileEntry["hash"sv].AsHash(); + const uint64_t Size = FileEntry["size"sv].AsUInt64(); std::filesystem::path FilePath{SandboxPath / Name}; - IoBuffer DataBuffer = m_CasStore.FindChunk(Hash); + IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkHash); if (!DataBuffer) { - throw std::runtime_error("worker CAS chunk '{}' missing"_format(Hash)); + throw std::runtime_error("worker CAS chunk '{}' missing"_format(ChunkHash)); + } + + if (DataBuffer.Size() != Size) + { + throw std::runtime_error("worker CAS chunk '{}' size: {}, action spec expected {}"_format(ChunkHash, DataBuffer.Size(), Size)); } zen::WriteFile(FilePath, DataBuffer); @@ -796,11 +806,14 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) CbPackage OutputPackage; CbObject Output = zen::LoadCompactBinaryObject(OutputData.Data[0]); + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalRawAttachmentBytes = 0; + Output.IterateAttachments([&](CbFieldView Field) { IoHash Hash = Field.AsHash(); std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()}; - FileContents ChunkData = zen::ReadFile(SandboxPath / "build.output"); + FileContents ChunkData = zen::ReadFile(OutputPath); if (ChunkData.ErrorCode) { @@ -809,12 +822,27 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) ZEN_ASSERT(OutputData.Data.size() == 1); - CbAttachment Attachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Data[0]))); + CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Data[0])); + + if (!AttachmentBuffer) + { + throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)"); + } + + TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); + TotalRawAttachmentBytes += AttachmentBuffer.GetCompressedSize(); + + CbAttachment Attachment(AttachmentBuffer); OutputPackage.AddAttachment(Attachment); }); OutputPackage.SetObject(Output); + ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)", + OutputPackage.GetAttachments().size(), + NiceBytes(TotalAttachmentBytes), + NiceBytes(TotalRawAttachmentBytes)); + return OutputPackage; } diff --git a/zenserver/compute/apply.h b/zenserver/compute/apply.h index 474156a5e..86b262213 100644 --- a/zenserver/compute/apply.h +++ b/zenserver/compute/apply.h @@ -28,6 +28,7 @@ public: virtual void HandleRequest(HttpServerRequest& Request) override; private: + spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; HttpRequestRouter m_Router; CasStore& m_CasStore; diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 0af92da6d..4a5467648 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -163,7 +163,7 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; } CloudCacheResult @@ -194,7 +194,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; } CloudCacheResult @@ -215,7 +215,7 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; } std::vector<IoHash> diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 97b222a68..38d30a795 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -4,8 +4,14 @@ #include "jupiter.h" #include "zen.h" +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalidation.h> #include <zencore/fmtutils.h> +#include <zencore/stream.h> #include <zencore/timer.h> + #include <zenstore/cas.h> #include <zenstore/cidstore.h> @@ -121,7 +127,45 @@ namespace detail { } else { - Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, Type); + const ZenContentType AcceptType = Type == ZenContentType::kCbPackage ? ZenContentType::kCbObject : Type; + Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, AcceptType); + + if (Result.Success && Type == ZenContentType::kCbPackage) + { + CbPackage Package; + + const CbValidateError ValidationResult = zen::ValidateCompactBinary(Result.Response, CbValidateMode::All); + if (Result.Success = ValidationResult == CbValidateError::None; Result.Success) + { + CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); + + CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { + CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; + + if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) + { + Package.AddAttachment(CbAttachment(Chunk)); + } + else + { + Result.Success = false; + } + }); + + Package.SetObject(CacheRecord); + } + + if (Result.Success) + { + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + Package.Save(Writer); + + Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + Result.Bytes = MemStream.Size(); + } + } } return {.Value = Result.Response, @@ -305,37 +349,74 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) + if (CacheRecord.Type == ZenContentType::kCbPackage) { - Result.Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + zen::CbPackage Package; + Package.SetObject(CbObject(SharedBuffer(RecordValue))); + + for (const IoBuffer& Payload : Payloads) { - Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - CacheRecord.PayloadIds[Idx], - Payloads[Idx]); + if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload))) + { + Package.AddAttachment(CbAttachment(AttachmentBuffer)); + } + else + { + return {.Reason = std::string("invalid payload buffer"), .Success = false}; + } } - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + Package.Save(Writer); + IoBuffer PackagePayload(IoBuffer::Wrap, MemStream.Data(), MemStream.Size()); - if (!Result.Success) + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - return {.Reason = "Failed to upload payload", - .Bytes = TotalBytes, - .ElapsedSeconds = TotalElapsedSeconds, - .Success = false}; + Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + PackagePayload, + CacheRecord.Type); } - } - Result.Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); + TotalBytes = Result.Bytes; + TotalElapsedSeconds = Result.ElapsedSeconds; } + else + { + for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) + { + Result.Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + CacheRecord.PayloadIds[Idx], + Payloads[Idx]); + } - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + + if (!Result.Success) + { + return {.Reason = "Failed to upload payload", + .Bytes = TotalBytes, + .ElapsedSeconds = TotalElapsedSeconds, + .Success = false}; + } + } + + Result.Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = + Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); + } + + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + } return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; } @@ -448,7 +529,6 @@ public: , m_CacheStore(CacheStore) , m_CidStore(CidStore) { - ZEN_ASSERT(m_Options.ThreadCount > 0); } virtual ~DefaultUpstreamCache() { Shutdown(); } @@ -509,7 +589,15 @@ public: { if (m_IsRunning.load()) { - m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + if (!m_UpstreamThreads.empty()) + { + m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + } + else + { + ProcessCacheRecord(std::move(CacheRecord)); + } + return {.Success = true}; } @@ -548,11 +636,19 @@ private: for (auto& Endpoint : m_Endpoints) { - if (PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); - Result.Success) + const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + if (Result.Success) { m_Stats.Add(*Endpoint, Result); } + else + { + ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Endpoint->DisplayName(), + Result.Reason); + } } } diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index eef92bab4..55ddd310f 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -5,6 +5,7 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/fmtutils.h> +#include <zencore/session.h> #include <zencore/stream.h> #include "cache/structuredcachestore.h" @@ -72,7 +73,7 @@ namespace detail { // Note that currently this just implements an UDP echo service for testing purposes -Mesh::Mesh(asio::io_context& IoContext) : m_IoContext(IoContext) +Mesh::Mesh(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(zen::GetSessionId()) { } @@ -257,10 +258,10 @@ Mesh::IssueReceive() if (SessionId != Oid::Zero && SessionId != m_SessionId) { - const uint16_t Port = (++It)->AsUInt16(m_SenderEndpoint.port()); - const uint32_t Lsn = (++It)->AsUInt32(); + // const uint16_t Port = (++It)->AsUInt16(m_SenderEndpoint.port()); + // const uint32_t Lsn = (++It)->AsUInt32(); - ZEN_INFO("received hey from {} ({})", SenderIp, SessionId); + ZEN_TRACE("received hey from {} ({})", SenderIp, SessionId); RwLock::ExclusiveLockScope _(m_SessionsLock); @@ -279,7 +280,7 @@ Mesh::IssueReceive() { Oid SessionId = Field.AsObjectId(); - ZEN_INFO("received bye from {} ({})", SenderIp, SessionId); + ZEN_DEBUG("received bye from {} ({})", SenderIp, SessionId); // We could verify that it's sent from a known IP before erasing the // session, if we want to be paranoid @@ -389,7 +390,10 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetHeader(cpr::Header{{"Accept", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}}); + Session.SetHeader(cpr::Header{{"Accept", + Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg" + : Type == ZenContentType::kCbObject ? "application/x-ue-cb" + : "application/octet-stream"}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -429,14 +433,18 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetHeader( - cpr::Header{{"Content-Type", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}}); + Session.SetHeader(cpr::Header{{"Content-Type", + Type == ZenContentType::kCbPackage ? "application/x-ue-cbpkg" + : Type == ZenContentType::kCbObject ? "application/x-ue-cb" + : "application/octet-stream"}}); Session.SetBody(cpr::Body{static_cast<const char*>(Value.Data()), Value.Size()}); cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; } ZenCacheResult @@ -454,7 +462,9 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; } } // namespace zen diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 541495818..ff4a551bf 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -4,6 +4,7 @@ #include <zencore/iobuffer.h> #include <zencore/iohash.h> +#include <zencore/logging.h> #include <zencore/memory.h> #include <zencore/thread.h> #include <zencore/uid.h> @@ -59,6 +60,9 @@ private: static const int kMaxMessageSize = 2048; static const int kMaxUpdateSize = 1400; // We'll try not to send messages larger than this + spdlog::logger& Log() { return m_Log; } + + spdlog::logger& m_Log; std::atomic<State> m_State = kInitializing; asio::io_context& m_IoContext; std::unique_ptr<asio::ip::udp::socket> m_UdpSocket; @@ -68,7 +72,7 @@ private: uint16_t m_Port = 0; uint8_t m_MessageBuffer[kMaxMessageSize]; asio::high_resolution_timer m_Timer{m_IoContext}; - Oid m_SessionId{Oid::NewOid()}; + Oid m_SessionId; struct PeerInfo { diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 3b56d8683..53dc41a24 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -157,7 +157,7 @@ public: zen::UpstreamCacheOptions UpstreamOptions; - if (UpstreamConfig.UpstreamThreadCount > 0 && UpstreamConfig.UpstreamThreadCount < 32) + if (UpstreamConfig.UpstreamThreadCount < 32) { UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); } |