// Copyright Epic Games, Inc. All Rights Reserved. #include "zenremoteprojectstore.h" #include #include #include #include #include #include #include namespace zen { using namespace std::literals; class ZenRemoteStore : public RemoteProjectStore { public: ZenRemoteStore(std::string_view HostAddress, std::string_view Project, std::string_view Oplog, const std::filesystem::path& TempFilePath) : m_HostAddress(HostAddress) , m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress)) , m_Project(Project) , m_Oplog(Oplog) , m_TempFilePath(TempFilePath) , m_Client(m_ProjectStoreUrl, {.LogCategory = "ZenRemoteStore", .RetryCount = 2}) { } virtual RemoteStoreInfo GetInfo() const override { return {.CreateBlocks = false, .UseTempBlockFiles = false, .AllowChunking = false, .ContainerName = fmt::format("{}/{}", m_Project, m_Oplog), .BaseContainerName = "", .Description = fmt::format("[zen] {}"sv, m_HostAddress)}; } virtual Stats GetStats() const override { return {.m_SentBytes = m_SentBytes.load(), .m_ReceivedBytes = m_ReceivedBytes.load(), .m_RequestTimeNS = m_RequestTimeNS.load(), .m_RequestCount = m_RequestCount.load(), .m_PeakSentBytes = m_PeakSentBytes.load(), .m_PeakReceivedBytes = m_PeakReceivedBytes.load(), .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; } virtual SaveResult SaveContainer(const IoBuffer& Payload) override { std::string SaveRequest = fmt::format("/{}/oplog/{}/save"sv, m_Project, m_Oplog); HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCbObject); AddStats(Response); SaveResult Result = SaveResult{ConvertResult(Response)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}. Reason: '{}'", m_ProjectStoreUrl, m_Project, m_Oplog, Result.Reason); return Result; } CbObject ResponseObject = Response.AsObject(); if (!ResponseObject) { Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, m_ProjectStoreUrl, m_Project, m_Oplog); Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); return Result; } CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView(); for (CbFieldView FieldView : NeedsArray) { IoHash ChunkHash = FieldView.AsHash(); Result.Needs.insert(ChunkHash); } Result.RawHash = IoHash::HashBuffer(Payload); return Result; } virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override { std::string SaveRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCompressedBinary); AddStats(Response); SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'", m_ProjectStoreUrl, m_Project, m_Oplog, RawHash, Result.Reason); } return Result; } virtual SaveAttachmentsResult SaveAttachments(const std::vector& Chunks) override { CbPackage RequestPackage; { CbObjectWriter RequestWriter; RequestWriter.AddString("method"sv, "putchunks"sv); RequestWriter.BeginArray("chunks"sv); { for (const SharedBuffer& Chunk : Chunks) { IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, RawHash, RawSize); RequestWriter.AddHash(RawHash); RequestPackage.AddAttachment(CbAttachment(Compressed, RawHash)); } } RequestWriter.EndArray(); // "chunks" RequestPackage.SetObject(RequestWriter.Save()); } std::string SaveRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog); HttpClient::Response Response = m_Client.Post(SaveRequest, RequestPackage); AddStats(Response); SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving {} oplog attachments to {}/{}/{}. Reason: '{}'", Chunks.size(), m_ProjectStoreUrl, m_Project, m_Oplog, Result.Reason); } return Result; } virtual LoadAttachmentsResult LoadAttachments(const std::vector& RawHashes) override { std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog); CbObject Request; { CbObjectWriter RequestWriter; RequestWriter.AddString("method"sv, "getchunks"sv); RequestWriter.BeginArray("chunks"sv); { for (const IoHash& RawHash : RawHashes) { RequestWriter.AddHash(RawHash); } } RequestWriter.EndArray(); // "chunks" Request = RequestWriter.Save(); } HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage)); AddStats(Response); LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'", RawHashes.size(), m_ProjectStoreUrl, m_Project, m_Oplog, Result.Reason); } else { CbPackage Package = Response.AsPackage(); std::span Attachments = Package.GetAttachments(); Result.Chunks.reserve(Attachments.size()); for (const CbAttachment& Attachment : Attachments) { Result.Chunks.emplace_back( std::pair{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()}); } } return Result; }; virtual FinalizeResult FinalizeContainer(const IoHash&) override { return FinalizeResult{Result{}}; } virtual LoadContainerResult LoadContainer() override { std::string LoadRequest = fmt::format("/{}/oplog/{}/load"sv, m_Project, m_Oplog); HttpClient::Response Response = m_Client.Get(LoadRequest, HttpClient::Accept(ZenContentType::kCbObject)); AddStats(Response); LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}. Reason: '{}'", m_ProjectStoreUrl, m_Project, m_Oplog, Result.Reason); } else { Result.ContainerObject = Response.AsObject(); if (!Result.ContainerObject) { Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, m_ProjectStoreUrl, m_Project, m_Oplog); Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); } } return Result; } virtual LoadContainerResult LoadBaseContainer() override { return LoadContainerResult{{.ErrorCode = static_cast(HttpResponseCode::NoContent)}}; } virtual HasAttachmentsResult HasAttachments(const std::span) override { // For zen as remote store we never store blocks so we should never get here ZEN_ASSERT(false); return HasAttachmentsResult{}; } virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); HttpClient::Response Response = m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary)); AddStats(Response); LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)}; if (!Result.ErrorCode) { Result.Bytes = Response.ResponsePayload; Result.Bytes.MakeOwned(); } if (!Result.ErrorCode) { Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'", m_ProjectStoreUrl, m_Project, m_Oplog, RawHash, Result.Reason); } return Result; } private: void AddStats(const HttpClient::Response& Result) { m_SentBytes.fetch_add(gsl::narrow(Result.UploadedBytes)); m_ReceivedBytes.fetch_add(gsl::narrow(Result.DownloadedBytes)); m_RequestTimeNS.fetch_add(static_cast(Result.ElapsedSeconds * 1000000000)); SetAtomicMax(m_PeakSentBytes, Result.UploadedBytes); SetAtomicMax(m_PeakReceivedBytes, Result.DownloadedBytes); if (Result.ElapsedSeconds > 0.0) { uint64_t BytesPerSec = static_cast((Result.UploadedBytes + Result.DownloadedBytes) / Result.ElapsedSeconds); SetAtomicMax(m_PeakBytesPerSec, BytesPerSec); } m_RequestCount.fetch_add(1); } static Result ConvertResult(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) { if (Response.Error) { return {.ErrorCode = Response.Error.value().ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.ErrorMessage(""), .Text = Response.ToText()}; } if (!Response.IsSuccess()) { return {.ErrorCode = static_cast(Response.StatusCode), .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.ErrorMessage(ErrorPrefix), .Text = Response.ToText()}; } return {.ErrorCode = 0, .ElapsedSeconds = Response.ElapsedSeconds}; } const std::string m_HostAddress; const std::string m_ProjectStoreUrl; const std::string m_Project; const std::string m_Oplog; const std::filesystem::path m_TempFilePath; HttpClient m_Client; std::atomic_uint64_t m_SentBytes; std::atomic_uint64_t m_ReceivedBytes; std::atomic_uint64_t m_RequestTimeNS; std::atomic_uint64_t m_RequestCount; std::atomic_uint64_t m_PeakSentBytes; std::atomic_uint64_t m_PeakReceivedBytes; std::atomic_uint64_t m_PeakBytesPerSec; }; std::shared_ptr CreateZenRemoteStore(const ZenRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath) { std::string Url = Options.Url; if (Url.find("://"sv) == std::string::npos) { // Assume http URL Url = fmt::format("http://{}"sv, Url); } std::shared_ptr RemoteStore = std::make_shared(Url, Options.ProjectId, Options.OplogId, TempFilePath); return RemoteStore; } } // namespace zen