diff options
Diffstat (limited to 'src/zenserver/projectstore/zenremoteprojectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/zenremoteprojectstore.cpp | 259 |
1 files changed, 70 insertions, 189 deletions
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp index cfb558040..600338843 100644 --- a/src/zenserver/projectstore/zenremoteprojectstore.cpp +++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp @@ -6,15 +6,10 @@ #include <zencore/compactbinarypackage.h> #include <zencore/compositebuffer.h> #include <zencore/fmtutils.h> -#include <zencore/scopeguard.h> #include <zencore/stream.h> -#include <zencore/timer.h> +#include <zenhttp/httpclient.h> #include <zenutil/packageformat.h> -ZEN_THIRD_PARTY_INCLUDES_START -#include <cpr/cpr.h> -ZEN_THIRD_PARTY_INCLUDES_END - namespace zen { using namespace std::literals; @@ -22,17 +17,16 @@ using namespace std::literals; class ZenRemoteStore : public RemoteProjectStore { public: - ZenRemoteStore(std::string_view HostAddress, - std::string_view Project, - std::string_view Oplog, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize) + ZenRemoteStore(std::string_view HostAddress, + std::string_view Project, + std::string_view Oplog, + const std::filesystem::path& TempFilePath) : m_HostAddress(HostAddress) , m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress)) , m_Project(Project) , m_Oplog(Oplog) - , m_MaxBlockSize(MaxBlockSize) - , m_MaxChunkEmbedSize(MaxChunkEmbedSize) + , m_TempFilePath(TempFilePath) + , m_Client(m_ProjectStoreUrl, {.LogCategory = "ZenRemoteStore", .RetryCount = 2}) { } @@ -47,39 +41,27 @@ public: virtual SaveResult SaveContainer(const IoBuffer& Payload) override { - Stopwatch Timer; - - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - - std::string SaveRequest = fmt::format("{}/{}/oplog/{}/save"sv, m_ProjectStoreUrl, m_Project, m_Oplog); - Session->SetUrl({SaveRequest}); - Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}}); - MemoryView Data(Payload.GetView()); - Session->SetBody({reinterpret_cast<const char*>(Data.GetData()), Data.GetSize()}); - cpr::Response Response = Session->Post(); - SaveResult Result = SaveResult{ConvertResult(Response)}; + std::string SaveRequest = fmt::format("/{}/oplog/{}/save"sv, m_Project, m_Oplog); + HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCbObject); + SaveResult Result = SaveResult{ConvertResult(Response)}; if (Result.ErrorCode) { - Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}. Reason: '{}'", m_ProjectStoreUrl, m_Project, m_Oplog, Result.Reason); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } - IoBuffer ResponsePayload(IoBuffer::Wrap, Response.text.data(), Response.text.size()); - CbObject ResponseObject = LoadCompactBinaryObject(ResponsePayload); + CbObject ResponseObject = Response.AsObject(); if (!ResponseObject) { - Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, - m_ProjectStoreUrl, - m_Project, - m_Oplog); - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; + Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, + m_ProjectStoreUrl, + m_Project, + m_Oplog); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); return Result; } CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView(); @@ -89,33 +71,15 @@ public: Result.Needs.insert(ChunkHash); } - Result.RawHash = IoHash::HashBuffer(Payload); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; + Result.RawHash = IoHash::HashBuffer(Payload); return Result; } virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override { - Stopwatch Timer; - - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - - std::string SaveRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash); - Session->SetUrl({SaveRequest}); - Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}}); - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); - cpr::Response Response = Session->Post(); - SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)}; + std::string SaveRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); + HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCompressedBinary); + SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'", @@ -125,14 +89,11 @@ public: RawHash, Result.Reason); } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override { - Stopwatch Timer; - CbPackage RequestPackage; { CbObjectWriter RequestWriter; @@ -151,26 +112,10 @@ public: RequestWriter.EndArray(); // "chunks" RequestPackage.SetObject(RequestWriter.Save()); } - CompositeBuffer Payload = FormatPackageMessageBuffer(RequestPackage, FormatFlags::kDefault); - - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog); - Session->SetUrl({SaveRequest}); - Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}}); + std::string SaveRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog); + HttpClient::Response Response = m_Client.Post(SaveRequest, RequestPackage); - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); - cpr::Response Response = Session->Post(); - SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)}; + SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving {} oplog attachments to {}/{}/{}. Reason: '{}'", @@ -180,17 +125,12 @@ public: m_Oplog, Result.Reason); } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override { - Stopwatch Timer; - - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog); + std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog); CbObject Request; { @@ -206,17 +146,22 @@ public: RequestWriter.EndArray(); // "chunks" Request = RequestWriter.Save(); } - IoBuffer Payload = Request.GetBuffer().AsIoBuffer(); - Session->SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()}); - Session->SetUrl(SaveRequest); - Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}, - {"Accept", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}}); - cpr::Response Response = Session->Post(); - LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)}; - if (!Result.ErrorCode) + HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage)); + + LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)}; + if (Result.ErrorCode) { - CbPackage Package = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size())); + Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'", + RawHashes.size(), + m_ProjectStoreUrl, + m_Project, + m_Oplog, + Result.Reason); + } + else + { + CbPackage Package = Response.AsPackage(); std::span<const CbAttachment> Attachments = Package.GetAttachments(); Result.Chunks.reserve(Attachments.size()); for (const CbAttachment& Attachment : Attachments) @@ -225,42 +170,17 @@ public: std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()}); } } - else - { - Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'", - RawHashes.size(), - m_ProjectStoreUrl, - m_Project, - m_Oplog, - Result.Reason); - } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; }; - virtual FinalizeResult FinalizeContainer(const IoHash&) override - { - Stopwatch Timer; - - RwLock::ExclusiveLockScope _(SessionsLock); - Sessions.clear(); - return FinalizeResult{Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}}; - } + virtual FinalizeResult FinalizeContainer(const IoHash&) override { return FinalizeResult{Result{}}; } virtual LoadContainerResult LoadContainer() override { - Stopwatch Timer; + std::string LoadRequest = fmt::format("/{}/oplog/{}/load"sv, m_Project, m_Oplog); - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - std::string SaveRequest = fmt::format("{}/{}/oplog/{}/load"sv, m_ProjectStoreUrl, m_Project, m_Oplog); - Session->SetUrl(SaveRequest); - Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCbObject))}}); - Session->SetParameters( - {{"maxblocksize", fmt::format("{}", m_MaxBlockSize)}, {"maxchunkembedsize", fmt::format("{}", m_MaxChunkEmbedSize)}}); - cpr::Response Response = Session->Get(); - - LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)}; + HttpClient::Response Response = m_Client.Get(LoadRequest, HttpClient::Accept(ZenContentType::kCbObject)); + LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}. Reason: '{}'", @@ -271,7 +191,7 @@ public: } else { - Result.ContainerObject = LoadCompactBinaryObject(IoBuffer(IoBuffer::Clone, Response.text.data(), Response.text.size())); + Result.ContainerObject = Response.AsObject(); if (!Result.ContainerObject) { Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, @@ -281,7 +201,6 @@ public: Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); } } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } @@ -299,19 +218,14 @@ public: virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { - Stopwatch Timer; - - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - - std::string LoadRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash); - Session->SetUrl({LoadRequest}); - Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}}); - cpr::Response Response = Session->Get(); - LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)}; + std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); + HttpClient::Response Response = + m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary)); + LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)}; if (!Result.ErrorCode) { - Result.Bytes = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); + Result.Bytes = Response.ResponsePayload; + Result.Bytes.MakeOwned(); } if (!Result.ErrorCode) { @@ -322,73 +236,40 @@ public: RawHash, Result.Reason); } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } private: - std::unique_ptr<cpr::Session> AllocateSession() - { - RwLock::ExclusiveLockScope _(SessionsLock); - if (Sessions.empty()) - { - Sessions.emplace_back(std::make_unique<cpr::Session>()); - } - std::unique_ptr<cpr::Session> Session = std::move(Sessions.back()); - Sessions.pop_back(); - return Session; - } - - void ReleaseSession(std::unique_ptr<cpr::Session>&& Session) + static Result ConvertResult(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) { - RwLock::ExclusiveLockScope _(SessionsLock); - Sessions.emplace_back(std::move(Session)); - } - - static Result ConvertResult(const cpr::Response& Response) - { - std::string Text; - std::string Reason = Response.reason; - int32_t ErrorCode = 0; - if (Response.error.code != cpr::ErrorCode::OK) + if (Response.Error) { - ErrorCode = static_cast<int32_t>(Response.error.code); - if (!Response.error.message.empty()) - { - Reason = Response.error.message; - } + return {.ErrorCode = Response.Error.value().ErrorCode, + .ElapsedSeconds = Response.ElapsedSeconds, + .Reason = Response.ErrorMessage(""), + .Text = Response.ToText()}; } - else if (!IsHttpSuccessCode(Response.status_code)) + if (!Response.IsSuccess()) { - ErrorCode = static_cast<int32_t>(Response.status_code); - - if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) - { - zen::HttpContentType ContentType = zen::ParseContentType(It->second); - if (ContentType == zen::HttpContentType::kText) - { - Text = Response.text; - } - } - - Reason = fmt::format("{}"sv, Response.status_code); + return {.ErrorCode = static_cast<int32_t>(Response.StatusCode), + .ElapsedSeconds = Response.ElapsedSeconds, + .Reason = Response.ErrorMessage(ErrorPrefix), + .Text = Response.ToText()}; } - return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.elapsed, .Reason = Reason, .Text = Text}; + return {.ErrorCode = 0, .ElapsedSeconds = Response.ElapsedSeconds}; } - RwLock SessionsLock; - std::vector<std::unique_ptr<cpr::Session>> Sessions; + const std::string m_HostAddress; + const std::string m_ProjectStoreUrl; + const std::string m_Project; + const std::string m_Oplog; + const std::filesystem::path m_TempFilePath; - const std::string m_HostAddress; - const std::string m_ProjectStoreUrl; - const std::string m_Project; - const std::string m_Oplog; - const size_t m_MaxBlockSize; - const size_t m_MaxChunkEmbedSize; + HttpClient m_Client; }; std::shared_ptr<RemoteProjectStore> -CreateZenRemoteStore(const ZenRemoteStoreOptions& Options) +CreateZenRemoteStore(const ZenRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath) { std::string Url = Options.Url; if (Url.find("://"sv) == std::string::npos) @@ -397,7 +278,7 @@ CreateZenRemoteStore(const ZenRemoteStoreOptions& Options) Url = fmt::format("http://{}"sv, Url); } std::shared_ptr<RemoteProjectStore> RemoteStore = - std::make_shared<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize); + std::make_shared<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, TempFilePath); return RemoteStore; } |