diff options
Diffstat (limited to 'src/zenserver')
8 files changed, 841 insertions, 20 deletions
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp index 5c7a5536a..0673030a1 100644 --- a/src/zenserver/projectstore/fileremoteprojectstore.cpp +++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp @@ -57,15 +57,13 @@ public: .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; } - virtual SaveResult SaveContainer(const IoBuffer& Payload) override + virtual SaveResult SaveContainer(const CbObject& Container) override { Stopwatch Timer; SaveResult Result; { - CbObject ContainerObject = LoadCompactBinaryObject(Payload); - - ContainerObject.IterateAttachments([&](CbFieldView FieldView) { + Container.IterateAttachments([&](CbFieldView FieldView) { IoHash AttachmentHash = FieldView.AsBinaryAttachment(); std::filesystem::path AttachmentPath = GetAttachmentPath(AttachmentHash); if (!std::filesystem::exists(AttachmentPath)) @@ -84,19 +82,19 @@ public: BasicFile ContainerFile; ContainerFile.Open(ContainerPath, BasicFile::Mode::kTruncate); std::error_code Ec; - ContainerFile.WriteAll(Payload, Ec); + ContainerFile.WriteAll(Container.GetBuffer().AsIoBuffer(), Ec); if (Ec) { throw std::system_error(Ec, Ec.message()); } - Result.RawHash = IoHash::HashBuffer(Payload); + Result.RawHash = IoHash::HashBuffer(Container.GetBuffer().AsIoBuffer()); } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed saving oplog container to '{}'. Reason: {}", ContainerPath, Ex.what()); } - AddStats(Payload.GetSize(), 0, Timer.GetElapsedTimeUs() * 1000); + AddStats(Container.GetBuffer().GetSize(), 0, Timer.GetElapsedTimeUs() * 1000); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp index bede3abb4..51224aca1 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -66,11 +66,11 @@ public: .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; } - virtual SaveResult SaveContainer(const IoBuffer& Payload) override + virtual SaveResult SaveContainer(const CbObject& Container) override { CloudCacheSession Session(m_CloudClient.Get()); - PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); - AddStats(PutResult); + PutRefResult PutResult = + Session.PutRef(m_Namespace, m_Bucket, m_Key, Container.GetBuffer().AsIoBuffer(), ZenContentType::kCbObject); SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash}; if (Result.ErrorCode) diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 65d2730d8..aba6fbe7e 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -25,6 +25,7 @@ #include "fileremoteprojectstore.h" #include "jupiterremoteprojectstore.h" #include "remoteprojectstore.h" +#include "zencacheremoteprojectstore.h" #include "zenremoteprojectstore.h" ZEN_THIRD_PARTY_INCLUDES_START @@ -196,11 +197,11 @@ namespace { { std::string_view Url = Zen["url"sv].AsString(); std::string_view Project = Zen["project"sv].AsString(); + std::string_view Oplog = Zen["oplog"sv].AsString(); if (Project.empty()) { return {nullptr, "Missing project"}; } - std::string_view Oplog = Zen["oplog"sv].AsString(); if (Oplog.empty()) { return {nullptr, "Missing oplog"}; @@ -212,6 +213,43 @@ namespace { RemoteStore = CreateZenRemoteStore(Options, TempFilePath); } + if (CbObjectView Zen = Params["zencache"sv].AsObjectView(); Zen) + { + std::string_view Url = Zen["url"sv].AsString(); + std::string_view Namespace = Zen["namespace"sv].AsString(); + std::string_view Bucket = Zen["bucket"sv].AsString(); + std::string_view KeyParam = Zen["key"sv].AsString(); + bool ForceBlocks = Zen["forceblocks"sv].AsBool(false); + if (Namespace.empty()) + { + return {nullptr, "Missing namespace"}; + } + if (Bucket.empty()) + { + return {nullptr, "Missing bucket"}; + } + if (KeyParam.empty()) + { + return {nullptr, "Missing bucket"}; + } + if (KeyParam.length() != IoHash::StringLength) + { + return {nullptr, "Invalid key"}; + } + IoHash Key = IoHash::FromHexString(KeyParam); + if (Key == IoHash::Zero) + { + return {nullptr, "Invalid key string"}; + } + ZenCacheRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + std::string(Url), + std::string(Namespace), + std::string(Bucket), + Key, + ForceBlocks}; + RemoteStore = CreateZenCacheRemoteStore(Options, TempFilePath); + } + if (!RemoteStore) { return {nullptr, "Unknown remote store type"}; diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index cc9385f5e..61c04f50a 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -1951,7 +1951,7 @@ SaveOplog(CidStore& ChunkStore, ChunkCount, BlockCount)); Stopwatch SaveContainerTimer; - RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer()); + RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject); TransferWallTimeMS += SaveContainerTimer.GetElapsedTimeMs(); if (ContainerSaveResult.ErrorCode) { diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index 6b83e526c..96b8e7526 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -90,7 +90,7 @@ public: virtual RemoteStoreInfo GetInfo() const = 0; virtual Stats GetStats() const = 0; - virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0; + virtual SaveResult SaveContainer(const CbObject& Container) = 0; virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) = 0; virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0; virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0; diff --git a/src/zenserver/projectstore/zencacheremoteprojectstore.cpp b/src/zenserver/projectstore/zencacheremoteprojectstore.cpp new file mode 100644 index 000000000..7c87aeda0 --- /dev/null +++ b/src/zenserver/projectstore/zencacheremoteprojectstore.cpp @@ -0,0 +1,735 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencacheremoteprojectstore.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/compositebuffer.h> +#include <zencore/fmtutils.h> +#include <zencore/stream.h> +#include <zenhttp/httpclient.h> +#include <zenutil/cache/cachepolicy.h> +#include <zenutil/packageformat.h> + +namespace zen { + +using namespace std::literals; + +class ZenCacheRemoteStore : public RemoteProjectStore +{ +public: + ZenCacheRemoteStore(std::string_view HostAddress, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& Key, + bool ForceBlocks, + const std::filesystem::path& TempFilePath) + : m_HostAddress(HostAddress) + , m_ProjectStoreUrl(fmt::format("{}"sv, m_HostAddress)) + , m_Namespace(Namespace) + , m_Bucket(Bucket) + , m_Key(Key) + , m_ForceBlocks(ForceBlocks) + , m_TempFilePath(TempFilePath) + , m_Client(m_ProjectStoreUrl, {.LogCategory = "ZenCacheRemoteStore", .RetryCount = 2}) + { + } + + virtual RemoteStoreInfo GetInfo() const override + { + return {.CreateBlocks = m_ForceBlocks, + .UseTempBlockFiles = false, + .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key), + .BaseContainerName = "", + .Description = fmt::format("[zen] {}"sv, m_HostAddress)}; + } + + virtual Stats GetStats() const override + { + return {.m_SentBytes = m_SentBytes.load(), + .m_ReceivedBytes = m_ReceivedBytes.load(), + .m_RequestTimeNS = m_RequestTimeNS.load(), + .m_RequestCount = m_RequestCount.load(), + .m_PeakSentBytes = m_PeakSentBytes.load(), + .m_PeakReceivedBytes = m_PeakReceivedBytes.load(), + .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; + } + + virtual SaveResult SaveContainer(const CbObject& Container) override + { + std::string SaveRequest = fmt::format("/z$/$rpc"); + + CbPackage Package; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("Method"sv, "PutCacheRecords"sv); + RequestWriter.AddInteger("Accept"sv, kCbPkgMagic); + RequestWriter.BeginObject("Params"sv); + { + RequestWriter.AddString("Namespace"sv, m_Namespace); + RequestWriter.BeginArray("Requests"sv); + { + RequestWriter.BeginObject(); + { + RequestWriter.BeginObject("Key"sv); + { + RequestWriter.AddString("Bucket"sv, m_Bucket); + RequestWriter.AddHash("Hash"sv, m_Key); + } + RequestWriter.EndObject(); // Key + RequestWriter.AddObject("Record"sv, Container); + } + RequestWriter.EndObject(); + } + RequestWriter.EndArray(); // Requests + } + RequestWriter.EndObject(); // Params + Package.SetObject(RequestWriter.Save()); + } + HttpClient::Response Response = m_Client.Post(SaveRequest, Package, HttpClient::Accept(ZenContentType::kCbPackage)); + AddStats(Response); + SaveResult Result = SaveResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed saving oplog container to {}/z$/{}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Namespace, + m_Bucket, + m_Key, + Result.Reason); + return Result; + } + CbPackage ResponsePackage = Response.AsPackage(); + if (!ResponsePackage) + { + Result.Reason = fmt::format("The response for {}/z$/{}/{}/{} is not formatted as a package"sv, + m_ProjectStoreUrl, + m_Namespace, + m_Bucket, + m_Key); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + return Result; + } + CbObject ResponseObject = ResponsePackage.GetObject(); + for (CbFieldView Reponse : ResponseObject["Result"sv]) + { + bool Success = Reponse.AsBool(); + if (!Success) + { + Result.Reason = "Remote server failed to save oplog container (return Success == false)"; + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + return Result; + } + } + CbArrayView NeedsArray = ResponseObject["needs"sv].AsArrayView(); + for (CbFieldView FieldView : NeedsArray) + { + IoHash ChunkHash = FieldView.AsHash(); + Result.Needs.insert(ChunkHash); + } + + Result.RawHash = IoHash::HashBuffer(Container.GetBuffer().AsIoBuffer()); + return Result; + } + + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override + { + std::string SaveRequest = fmt::format("/z$/$rpc"); + + CbPackage Package; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("Method"sv, "PutRecordAttachments"sv); + RequestWriter.AddInteger("Accept"sv, kCbPkgMagic); + RequestWriter.BeginObject("Params"sv); + { + RequestWriter.AddString("Namespace"sv, m_Namespace); + RequestWriter.BeginArray("Requests"sv); + { + RequestWriter.BeginObject(); + { + RequestWriter.BeginObject("Key"sv); + { + RequestWriter.AddString("Bucket"sv, m_Bucket); + RequestWriter.AddHash("Hash"sv, m_Key); + } + RequestWriter.EndObject(); // Key + RequestWriter.BeginArray("Chunks"sv); + { + IoHash VerifyRawHash; + uint64_t _; + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Payload, VerifyRawHash, _)) + { + ZEN_ASSERT(VerifyRawHash == RawHash); + Package.AddAttachment(CbAttachment(Compressed, RawHash)); + RequestWriter.AddHash(RawHash); + } + } + RequestWriter.EndArray(); // Chunks + } + RequestWriter.EndObject(); + } + RequestWriter.EndArray(); // Requests + } + RequestWriter.EndObject(); // Params + Package.SetObject(RequestWriter.Save()); + } + HttpClient::Response Response = m_Client.Post(SaveRequest, Package, HttpClient::Accept(ZenContentType::kCbPackage)); + AddStats(Response); + SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed saving oplog attachments to {}/z$/{}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Namespace, + m_Bucket, + m_Key, + Result.Reason); + return Result; + } + // TODO: Parse attachment results? + return Result; + } + + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override + { + std::string SaveRequest = fmt::format("/z$/$rpc"); + + CbPackage Package; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("Method"sv, "PutRecordAttachments"sv); + RequestWriter.AddInteger("Accept"sv, kCbPkgMagic); + RequestWriter.BeginObject("Params"sv); + { + RequestWriter.AddString("Namespace"sv, m_Namespace); + RequestWriter.BeginArray("Requests"sv); + { + RequestWriter.BeginObject(); + { + RequestWriter.BeginObject("Key"sv); + { + RequestWriter.AddString("Bucket"sv, m_Bucket); + RequestWriter.AddHash("Hash"sv, m_Key); + } + RequestWriter.EndObject(); // Key + RequestWriter.BeginArray("Chunks"sv); + { + for (const SharedBuffer& Chunk : Chunks) + { + IoHash RawHash; + uint64_t _; + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, RawHash, _)) + { + Package.AddAttachment(CbAttachment(Compressed, RawHash)); + RequestWriter.AddHash(RawHash); + } + } + } + RequestWriter.EndArray(); // Chunks + } + RequestWriter.EndObject(); + } + RequestWriter.EndArray(); // Requests + } + RequestWriter.EndObject(); // Params + Package.SetObject(RequestWriter.Save()); + } + HttpClient::Response Response = m_Client.Post(SaveRequest, Package, HttpClient::Accept(ZenContentType::kCbPackage)); + AddStats(Response); + SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed saving oplog attachments to {}/z$/{}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Namespace, + m_Bucket, + m_Key, + Result.Reason); + return Result; + } + // TODO: Parse attachment results? + return Result; + } + + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + { + std::string SaveRequest = fmt::format("/z$/$rpc"); + + CbPackage Package; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("Method"sv, "GetRecordAttachments"sv); + RequestWriter.AddInteger("Accept"sv, kCbPkgMagic); + RequestWriter.BeginObject("Params"sv); + { + RequestWriter.AddString("Namespace"sv, m_Namespace); + RequestWriter.BeginArray("Requests"sv); + { + RequestWriter.BeginObject(); + { + RequestWriter.BeginObject("Key"sv); + { + RequestWriter.AddString("Bucket"sv, m_Bucket); + RequestWriter.AddHash("Hash"sv, m_Key); + } + RequestWriter.EndObject(); // Key + RequestWriter.BeginArray("Chunks"sv); + { + for (const IoHash& RawHash : RawHashes) + { + RequestWriter.AddHash(RawHash); + } + } + RequestWriter.EndArray(); // Chunks + } + RequestWriter.EndObject(); + } + RequestWriter.EndArray(); // Requests + } + RequestWriter.EndObject(); // Params + Package.SetObject(RequestWriter.Save()); + } + HttpClient::Response Response = m_Client.Post(SaveRequest, Package, HttpClient::Accept(ZenContentType::kCbPackage)); + AddStats(Response); + LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed loading oplog attachments to {}/z$/{}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Namespace, + m_Bucket, + m_Key, + Result.Reason); + return Result; + } + + CbPackage ResponsePackage = Response.AsPackage(); + if (!ResponsePackage) + { + Result.Reason = fmt::format("The response for {}/z$/{}/{}/{} is not formatted as a package"sv, + m_ProjectStoreUrl, + m_Namespace, + m_Bucket, + m_Key); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + return Result; + } + CbObject ResponseObject = ResponsePackage.GetObject(); + for (CbFieldView Reponse : ResponseObject["Result"sv]) + { + CbObjectView ResultView = Reponse.AsObjectView(); + CbArrayView ChunksArrayView = ResultView["Chunks"sv].AsArrayView(); + for (CbFieldView ChunkView : ChunksArrayView) + { + IoHash ChunkHash = ChunkView.AsHash(); + if (ChunkHash != IoHash::Zero) + { + const CbAttachment* Attachment = ResponsePackage.FindAttachment(ChunkHash); + if (Attachment && Attachment->IsCompressedBinary()) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + Result.Chunks.emplace_back(std::make_pair(ChunkHash, Chunk)); + } + } + } + } + return Result; + }; + + virtual FinalizeResult FinalizeContainer(const IoHash&) override + { + std::string SaveRequest = fmt::format("/z$/$rpc"); + + CbPackage Package; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("Method"sv, "PutCacheRecords"sv); + RequestWriter.AddInteger("Accept"sv, kCbPkgMagic); + RequestWriter.BeginObject("Params"sv); + { + RequestWriter.AddString("Namespace"sv, m_Namespace); + RequestWriter.BeginArray("Requests"sv); + { + RequestWriter.BeginObject(); + { + RequestWriter.BeginObject("Key"sv); + { + RequestWriter.AddString("Bucket"sv, m_Bucket); + RequestWriter.AddHash("Hash"sv, m_Key); + } + RequestWriter.EndObject(); // Key + // RequestWriter.AddObject("Record"sv, Container); + } + RequestWriter.EndObject(); + } + RequestWriter.EndArray(); // Requests + } + RequestWriter.EndObject(); // Params + Package.SetObject(RequestWriter.Save()); + } + HttpClient::Response Response = m_Client.Post(SaveRequest, Package, HttpClient::Accept(ZenContentType::kCbPackage)); + AddStats(Response); + FinalizeResult Result = FinalizeResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed saving oplog container to {}/z$/{}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Namespace, + m_Bucket, + m_Key, + Result.Reason); + return Result; + } + CbPackage ResponsePackage = Response.AsPackage(); + if (!ResponsePackage) + { + Result.Reason = fmt::format("The response for {}/z$/{}/{}/{} is not formatted as a package"sv, + m_ProjectStoreUrl, + m_Namespace, + m_Bucket, + m_Key); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + return Result; + } + CbObject ResponseObject = ResponsePackage.GetObject(); + for (CbFieldView Reponse : ResponseObject["Result"sv]) + { + bool Success = Reponse.AsBool(); + if (!Success) + { + Result.Reason = "Remote server failed to save oplog container (return Success == false)"; + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + return Result; + } + } + CbArrayView NeedsArray = ResponseObject["needs"sv].AsArrayView(); + for (CbFieldView FieldView : NeedsArray) + { + IoHash ChunkHash = FieldView.AsHash(); + Result.Needs.insert(ChunkHash); + } + + return Result; + } + + virtual LoadContainerResult LoadContainer() override + { + std::string LoadRequest = fmt::format("/z$/$rpc"sv); + CbPackage Package; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("Method"sv, "GetCacheRecords"sv); + RequestWriter.AddInteger("Accept"sv, kCbPkgMagic); + RequestWriter.BeginObject("Params"sv); + { + RequestWriter.AddString("DefaultPolicy"sv, WriteToString<128>(CachePolicy::SkipData | CachePolicy::QueryLocal)); + RequestWriter.AddString("Namespace"sv, m_Namespace); + RequestWriter.BeginArray("Requests"sv); + { + RequestWriter.BeginObject(); + { + RequestWriter.BeginObject("Key"sv); + { + RequestWriter.AddString("Bucket"sv, m_Bucket); + RequestWriter.AddHash("Hash"sv, m_Key); + } + RequestWriter.EndObject(); // Key + } + RequestWriter.EndObject(); + } + RequestWriter.EndArray(); // Requests + } + RequestWriter.EndObject(); // Params + Package.SetObject(RequestWriter.Save()); + } + HttpClient::Response Response = m_Client.Post(LoadRequest, Package, HttpClient::Accept(ZenContentType::kCbPackage)); + AddStats(Response); + LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed loading oplog container from {}/z$/{}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Namespace, + m_Bucket, + m_Key, + Result.Reason); + return Result; + } + + CbPackage ResponsePackage = Response.AsPackage(); + if (!ResponsePackage) + { + Result.Reason = fmt::format("The response for {}/z$/{}/{}/{} is not formatted as a package"sv, + m_ProjectStoreUrl, + m_Namespace, + m_Bucket, + m_Key); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + return Result; + } + + CbObject ResponseObject = ResponsePackage.GetObject(); + for (CbFieldView Reponse : ResponseObject["Result"]) + { + CbObjectView ResultView = Reponse.AsObjectView(); + MemoryView ResultBuffer = ResultView.GetView(); + if (ValidateCompactBinary(ResultBuffer, CbValidateMode::Default, CbFieldType::Object) == CbValidateError::None) + { + Result.ContainerObject = + CbObject(SharedBuffer(IoBuffer(IoBuffer::Wrap, ResultBuffer.GetData(), ResultBuffer.GetSize())), CbFieldType::Object); + Result.ContainerObject.MakeOwned(); + } + } + return Result; + } + + virtual LoadContainerResult LoadBaseContainer() override + { + return LoadContainerResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}}; + } + + virtual HasAttachmentsResult HasAttachments(const std::span<IoHash> RawHashes) override + { + std::string SaveRequest = fmt::format("/z$/$rpc"); + + CbPackage Package; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("Method"sv, "GetRecordAttachments"sv); + RequestWriter.AddInteger("Accept"sv, kCbPkgMagic); + RequestWriter.BeginObject("Params"sv); + { + RequestWriter.AddString("DefaultPolicy"sv, WriteToString<128>(CachePolicy::SkipData | CachePolicy::QueryLocal)); + RequestWriter.AddString("Namespace"sv, m_Namespace); + RequestWriter.BeginArray("Requests"sv); + { + RequestWriter.BeginObject(); + { + RequestWriter.BeginObject("Key"sv); + { + RequestWriter.AddString("Bucket"sv, m_Bucket); + RequestWriter.AddHash("Hash"sv, m_Key); + } + RequestWriter.EndObject(); // Key + RequestWriter.BeginArray("Chunks"sv); + { + for (const IoHash& RawHash : RawHashes) + { + RequestWriter.AddHash(RawHash); + } + } + RequestWriter.EndArray(); // Chunks + } + RequestWriter.EndObject(); + } + RequestWriter.EndArray(); // Requests + } + RequestWriter.EndObject(); // Params + Package.SetObject(RequestWriter.Save()); + } + HttpClient::Response Response = m_Client.Post(SaveRequest, Package, HttpClient::Accept(ZenContentType::kCbPackage)); + AddStats(Response); + HasAttachmentsResult Result = HasAttachmentsResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed loading oplog attachments to {}/z$/{}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Namespace, + m_Bucket, + m_Key, + Result.Reason); + return Result; + } + + CbPackage ResponsePackage = Response.AsPackage(); + if (!ResponsePackage) + { + Result.Reason = fmt::format("The response for {}/z$/{}/{}/{} is not formatted as a package"sv, + m_ProjectStoreUrl, + m_Namespace, + m_Bucket, + m_Key); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + return Result; + } + CbObject ResponseObject = ResponsePackage.GetObject(); + for (CbFieldView Reponse : ResponseObject["Result"sv]) + { + CbObjectView ResultView = Reponse.AsObjectView(); + CbArrayView ChunksArrayView = ResultView["Chunks"sv].AsArrayView(); + size_t ChunkIndex = 0; + for (CbFieldView ChunkView : ChunksArrayView) + { + IoHash ChunkHash = ChunkView.AsHash(); + if (ChunkHash == IoHash::Zero) + { + Result.Needs.insert(RawHashes[ChunkIndex]); + } + ChunkIndex++; + } + } + return Result; + } + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + { + std::string SaveRequest = fmt::format("/z$/$rpc"); + + CbPackage Package; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("Method"sv, "GetRecordAttachments"sv); + RequestWriter.AddInteger("Accept"sv, kCbPkgMagic); + RequestWriter.BeginObject("Params"sv); + { + RequestWriter.AddString("Namespace"sv, m_Namespace); + RequestWriter.BeginArray("Requests"sv); + { + RequestWriter.BeginObject(); + { + RequestWriter.BeginObject("Key"sv); + { + RequestWriter.AddString("Bucket"sv, m_Bucket); + RequestWriter.AddHash("Hash"sv, m_Key); + } + RequestWriter.EndObject(); // Key + RequestWriter.BeginArray("Chunks"sv); + { + RequestWriter.AddHash(RawHash); + } + RequestWriter.EndArray(); // Chunks + } + RequestWriter.EndObject(); + } + RequestWriter.EndArray(); // Requests + } + RequestWriter.EndObject(); // Params + Package.SetObject(RequestWriter.Save()); + } + HttpClient::Response Response = m_Client.Post(SaveRequest, Package, HttpClient::Accept(ZenContentType::kCbPackage)); + AddStats(Response); + LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed loading oplog attachments to {}/z$/{}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Namespace, + m_Bucket, + m_Key, + Result.Reason); + return Result; + } + + CbPackage ResponsePackage = Response.AsPackage(); + if (!ResponsePackage) + { + Result.Reason = fmt::format("The response for {}/z$/{}/{}/{} is not formatted as a package"sv, + m_ProjectStoreUrl, + m_Namespace, + m_Bucket, + m_Key); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + return Result; + } + CbObject ResponseObject = ResponsePackage.GetObject(); + for (CbFieldView Reponse : ResponseObject["Result"sv]) + { + CbObjectView ResultView = Reponse.AsObjectView(); + CbArrayView ChunksArrayView = ResultView["Chunks"sv].AsArrayView(); + for (CbFieldView ChunkView : ChunksArrayView) + { + IoHash ChunkHash = ChunkView.AsHash(); + if (ChunkHash == IoHash::Zero) + { + Result.Reason = fmt::format("The oplog {}/z$/{}/{}/{} is missing attachment {}"sv, + m_ProjectStoreUrl, + m_Namespace, + m_Bucket, + m_Key, + RawHash); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::NotFound); + return Result; + } + const CbAttachment* Attachment = ResponsePackage.FindAttachment(ChunkHash); + if (Attachment && Attachment->IsCompressedBinary()) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + Result.Bytes = Chunk.GetCompressed().Flatten().AsIoBuffer(); + } + } + } + return Result; + } + +private: + void AddStats(const HttpClient::Response& Result) + { + m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.UploadedBytes)); + m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.DownloadedBytes)); + m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000)); + SetAtomicMax(m_PeakSentBytes, Result.UploadedBytes); + SetAtomicMax(m_PeakReceivedBytes, Result.DownloadedBytes); + if (Result.ElapsedSeconds > 0.0) + { + uint64_t BytesPerSec = static_cast<uint64_t>((Result.UploadedBytes + Result.DownloadedBytes) / Result.ElapsedSeconds); + SetAtomicMax(m_PeakBytesPerSec, BytesPerSec); + } + + m_RequestCount.fetch_add(1); + } + + static Result ConvertResult(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) + { + if (Response.Error) + { + return {.ErrorCode = Response.Error.value().ErrorCode, + .ElapsedSeconds = Response.ElapsedSeconds, + .Reason = Response.ErrorMessage(""), + .Text = Response.ToText()}; + } + if (!Response.IsSuccess()) + { + return {.ErrorCode = static_cast<int32_t>(Response.StatusCode), + .ElapsedSeconds = Response.ElapsedSeconds, + .Reason = Response.ErrorMessage(ErrorPrefix), + .Text = Response.ToText()}; + } + return {.ErrorCode = 0, .ElapsedSeconds = Response.ElapsedSeconds}; + } + + const std::string m_HostAddress; + const std::string m_ProjectStoreUrl; + const std::string m_Namespace; + const std::string m_Bucket; + const IoHash m_Key; + const bool m_ForceBlocks; + const std::filesystem::path m_TempFilePath; + + HttpClient m_Client; + + std::atomic_uint64_t m_SentBytes; + std::atomic_uint64_t m_ReceivedBytes; + std::atomic_uint64_t m_RequestTimeNS; + std::atomic_uint64_t m_RequestCount; + std::atomic_uint64_t m_PeakSentBytes; + std::atomic_uint64_t m_PeakReceivedBytes; + std::atomic_uint64_t m_PeakBytesPerSec; +}; + +std::shared_ptr<RemoteProjectStore> +CreateZenCacheRemoteStore(const ZenCacheRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath) +{ + std::string Url = Options.Url; + if (Url.find("://"sv) == std::string::npos) + { + // Assume http URL + Url = fmt::format("http://{}"sv, Url); + } + std::shared_ptr<RemoteProjectStore> RemoteStore = + std::make_shared<ZenCacheRemoteStore>(Url, Options.Namespace, Options.Bucket, Options.Key, Options.ForceBlocks, TempFilePath); + return RemoteStore; +} + +} // namespace zen diff --git a/src/zenserver/projectstore/zencacheremoteprojectstore.h b/src/zenserver/projectstore/zencacheremoteprojectstore.h new file mode 100644 index 000000000..ce12f3461 --- /dev/null +++ b/src/zenserver/projectstore/zencacheremoteprojectstore.h @@ -0,0 +1,21 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "remoteprojectstore.h" + +namespace zen { + +struct ZenCacheRemoteStoreOptions : RemoteStoreOptions +{ + std::string Url; + std::string Namespace; + std::string Bucket; + IoHash Key; + bool ForceBlocks = false; +}; + +std::shared_ptr<RemoteProjectStore> CreateZenCacheRemoteStore(const ZenCacheRemoteStoreOptions& Options, + const std::filesystem::path& TempFilePath); + +} // namespace zen diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp index cec68111f..bd7ce921b 100644 --- a/src/zenserver/projectstore/zenremoteprojectstore.cpp +++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp @@ -51,10 +51,10 @@ public: .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; } - virtual SaveResult SaveContainer(const IoBuffer& Payload) override + virtual SaveResult SaveContainer(const CbObject& Container) override { std::string SaveRequest = fmt::format("/{}/oplog/{}/save"sv, m_Project, m_Oplog); - HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCbObject); + HttpClient::Response Response = m_Client.Post(SaveRequest, Container); AddStats(Response); SaveResult Result = SaveResult{ConvertResult(Response)}; @@ -84,7 +84,7 @@ public: Result.Needs.insert(ChunkHash); } - Result.RawHash = IoHash::HashBuffer(Payload); + Result.RawHash = IoHash::HashBuffer(Container.GetBuffer().AsIoBuffer()); return Result; } @@ -227,11 +227,40 @@ public: return LoadContainerResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}}; } - virtual HasAttachmentsResult HasAttachments(const std::span<IoHash>) override + virtual HasAttachmentsResult HasAttachments(const std::span<IoHash> RawHashes) override { - // For zen as remote store we never store blocks so we should never get here - ZEN_ASSERT(false); - return HasAttachmentsResult{}; + std::string PrepRequest = fmt::format("/{}/oplog/{}/prep"sv, m_Project, m_Oplog); + CbObject Request; + { + CbObjectWriter RequestWriter; + RequestWriter.BeginArray("have"sv); + for (const IoHash& RawHash : RawHashes) + { + RequestWriter.AddHash(RawHash); + } + RequestWriter.EndArray(); // "have" + Request = RequestWriter.Save(); + } + + HttpClient::Response Response = m_Client.Post(PrepRequest, Request, HttpClient::Accept(ZenContentType::kCbObject)); + HasAttachmentsResult Result = {ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed checking chunk existance from {}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Project, + m_Oplog, + Result.Reason); + } + else + { + CbArrayView NeedsArray = Response.AsObject()["need"].AsArrayView(); + for (CbFieldView RawHashField : NeedsArray) + { + Result.Needs.insert(RawHashField.AsHash()); + } + } + return Result; } virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override |