// Copyright Epic Games, Inc. All Rights Reserved. #include "zenremoteprojectstore.h" #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END namespace zen { using namespace std::literals; class ZenRemoteStore : public RemoteProjectStore { public: ZenRemoteStore(std::string_view HostAddress, std::string_view Project, std::string_view Oplog, size_t MaxBlockSize, size_t MaxChunkEmbedSize) : m_HostAddress(HostAddress) , m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress)) , m_Project(Project) , m_Oplog(Oplog) , m_MaxBlockSize(MaxBlockSize) , m_MaxChunkEmbedSize(MaxChunkEmbedSize) { } virtual RemoteStoreInfo GetInfo() const override { return {.CreateBlocks = false, .UseTempBlockFiles = false, .Description = fmt::format("[zen] {}"sv, m_HostAddress)}; } virtual SaveResult SaveContainer(const IoBuffer& Payload) override { Stopwatch Timer; std::unique_ptr Session(AllocateSession()); auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); std::string SaveRequest = fmt::format("{}/{}/oplog/{}/save"sv, m_ProjectStoreUrl, m_Project, m_Oplog); Session->SetUrl({SaveRequest}); Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}}); MemoryView Data(Payload.GetView()); Session->SetBody({reinterpret_cast(Data.GetData()), Data.GetSize()}); cpr::Response Response = Session->Post(); SaveResult Result = SaveResult{ConvertResult(Response)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}. Reason: '{}'", m_ProjectStoreUrl, m_Project, m_Oplog, Result.Reason); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; return Result; } IoBuffer ResponsePayload(IoBuffer::Wrap, Response.text.data(), Response.text.size()); CbObject ResponseObject = LoadCompactBinaryObject(ResponsePayload); if (!ResponseObject) { Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, m_ProjectStoreUrl, m_Project, m_Oplog); Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; return Result; } CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView(); for (CbFieldView FieldView : NeedsArray) { IoHash ChunkHash = FieldView.AsHash(); Result.Needs.insert(ChunkHash); } Result.RawHash = IoHash::HashBuffer(Payload); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; return Result; } virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override { Stopwatch Timer; std::unique_ptr Session(AllocateSession()); auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); std::string SaveRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash); Session->SetUrl({SaveRequest}); Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}}); uint64_t SizeLeft = Payload.GetSize(); CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { size = Min(size, SizeLeft); MutableMemoryView Data(buffer, size); Payload.CopyTo(Data, BufferIt); SizeLeft -= size; return true; }; Session->SetReadCallback(cpr::ReadCallback(gsl::narrow(SizeLeft), ReadCallback)); cpr::Response Response = Session->Post(); SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'", m_ProjectStoreUrl, m_Project, m_Oplog, RawHash, Result.Reason); } Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; return Result; } virtual SaveAttachmentsResult SaveAttachments(const std::vector& Chunks) override { Stopwatch Timer; CbPackage RequestPackage; { CbObjectWriter RequestWriter; RequestWriter.AddString("method"sv, "putchunks"sv); RequestWriter.BeginArray("chunks"sv); { for (const SharedBuffer& Chunk : Chunks) { IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, RawHash, RawSize); RequestWriter.AddHash(RawHash); RequestPackage.AddAttachment(CbAttachment(Compressed, RawHash)); } } RequestWriter.EndArray(); // "chunks" RequestPackage.SetObject(RequestWriter.Save()); } CompositeBuffer Payload = FormatPackageMessageBuffer(RequestPackage, FormatFlags::kDefault); std::unique_ptr Session(AllocateSession()); auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog); Session->SetUrl({SaveRequest}); Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}}); uint64_t SizeLeft = Payload.GetSize(); CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { size = Min(size, SizeLeft); MutableMemoryView Data(buffer, size); Payload.CopyTo(Data, BufferIt); SizeLeft -= size; return true; }; Session->SetReadCallback(cpr::ReadCallback(gsl::narrow(SizeLeft), ReadCallback)); cpr::Response Response = Session->Post(); SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving {} oplog attachments to {}/{}/{}. Reason: '{}'", Chunks.size(), m_ProjectStoreUrl, m_Project, m_Oplog, Result.Reason); } Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; return Result; } virtual LoadAttachmentsResult LoadAttachments(const std::vector& RawHashes) override { Stopwatch Timer; std::unique_ptr Session(AllocateSession()); auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog); CbObject Request; { CbObjectWriter RequestWriter; RequestWriter.AddString("method"sv, "getchunks"sv); RequestWriter.BeginArray("chunks"sv); { for (const IoHash& RawHash : RawHashes) { RequestWriter.AddHash(RawHash); } } RequestWriter.EndArray(); // "chunks" Request = RequestWriter.Save(); } IoBuffer Payload = Request.GetBuffer().AsIoBuffer(); Session->SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()}); Session->SetUrl(SaveRequest); Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}, {"Accept", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}}); cpr::Response Response = Session->Post(); LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)}; if (!Result.ErrorCode) { CbPackage Package = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size())); std::span Attachments = Package.GetAttachments(); Result.Chunks.reserve(Attachments.size()); for (const CbAttachment& Attachment : Attachments) { Result.Chunks.emplace_back( std::pair{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()}); } } else { Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'", RawHashes.size(), m_ProjectStoreUrl, m_Project, m_Oplog, Result.Reason); } Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; return Result; }; virtual FinalizeResult FinalizeContainer(const IoHash&) override { Stopwatch Timer; RwLock::ExclusiveLockScope _(SessionsLock); Sessions.clear(); return FinalizeResult{Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}}; } virtual LoadContainerResult LoadContainer() override { Stopwatch Timer; std::unique_ptr Session(AllocateSession()); auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); std::string SaveRequest = fmt::format("{}/{}/oplog/{}/load"sv, m_ProjectStoreUrl, m_Project, m_Oplog); Session->SetUrl(SaveRequest); Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCbObject))}}); Session->SetParameters( {{"maxblocksize", fmt::format("{}", m_MaxBlockSize)}, {"maxchunkembedsize", fmt::format("{}", m_MaxChunkEmbedSize)}}); cpr::Response Response = Session->Get(); LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}. Reason: '{}'", m_ProjectStoreUrl, m_Project, m_Oplog, Result.Reason); } else { Result.ContainerObject = LoadCompactBinaryObject(IoBuffer(IoBuffer::Clone, Response.text.data(), Response.text.size())); if (!Result.ContainerObject) { Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, m_ProjectStoreUrl, m_Project, m_Oplog); Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); } } Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; return Result; } virtual LoadContainerResult LoadBaseContainer() override { return LoadContainerResult{{.ErrorCode = static_cast(HttpResponseCode::NoContent)}}; } virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { Stopwatch Timer; std::unique_ptr Session(AllocateSession()); auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); std::string LoadRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash); Session->SetUrl({LoadRequest}); Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}}); cpr::Response Response = Session->Get(); LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)}; if (!Result.ErrorCode) { Result.Bytes = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); } if (!Result.ErrorCode) { Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'", m_ProjectStoreUrl, m_Project, m_Oplog, RawHash, Result.Reason); } Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; return Result; } private: std::unique_ptr AllocateSession() { RwLock::ExclusiveLockScope _(SessionsLock); if (Sessions.empty()) { Sessions.emplace_back(std::make_unique()); } std::unique_ptr Session = std::move(Sessions.back()); Sessions.pop_back(); return Session; } void ReleaseSession(std::unique_ptr&& Session) { RwLock::ExclusiveLockScope _(SessionsLock); Sessions.emplace_back(std::move(Session)); } static Result ConvertResult(const cpr::Response& Response) { std::string Text; std::string Reason = Response.reason; int32_t ErrorCode = 0; if (Response.error.code != cpr::ErrorCode::OK) { ErrorCode = static_cast(Response.error.code); if (!Response.error.message.empty()) { Reason = Response.error.message; } } else if (!IsHttpSuccessCode(Response.status_code)) { ErrorCode = static_cast(Response.status_code); if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) { zen::HttpContentType ContentType = zen::ParseContentType(It->second); if (ContentType == zen::HttpContentType::kText) { Text = Response.text; } } Reason = fmt::format("{}"sv, Response.status_code); } return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.elapsed, .Reason = Reason, .Text = Text}; } RwLock SessionsLock; std::vector> Sessions; const std::string m_HostAddress; const std::string m_ProjectStoreUrl; const std::string m_Project; const std::string m_Oplog; const size_t m_MaxBlockSize; const size_t m_MaxChunkEmbedSize; }; std::shared_ptr CreateZenRemoteStore(const ZenRemoteStoreOptions& Options) { std::string Url = Options.Url; if (Url.find("://"sv) == std::string::npos) { // Assume http URL Url = fmt::format("http://{}"sv, Url); } std::shared_ptr RemoteStore = std::make_shared(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize); return RemoteStore; } } // namespace zen