aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-03 12:32:34 +0200
committerGitHub Enterprise <[email protected]>2024-04-03 12:32:34 +0200
commit034459cb66580d0aa680ae96a18b5a884808386c (patch)
treedaef3bdb41666db9491cefc224f3da8c8b822631 /src
parentcompressed header reading opt (#33) (diff)
downloadzen-034459cb66580d0aa680ae96a18b5a884808386c.tar.xz
zen-034459cb66580d0aa680ae96a18b5a884808386c.zip
zenremoteprojectstore with httpclient (#35)
- Bugfix: Fix log of Success/Failure for oplog import - Improvement: Use HttpClient when doing oplog export/import with a zenserver as a remote target. Includes retry logic - Improvement: Increase the retry count to 4 (5 attempts in total) when talking to Jupiter for oplog export/import
Diffstat (limited to 'src')
-rw-r--r--src/zenhttp/httpclient.cpp50
-rw-r--r--src/zenhttp/include/zenhttp/httpclient.h8
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.cpp2
-rw-r--r--src/zenserver/projectstore/projectstore.cpp2
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp2
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.cpp259
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.h2
7 files changed, 114 insertions, 211 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index 9811e5814..277b93a0f 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -51,16 +51,6 @@ AsCprBody(const IoBuffer& Obj)
return cpr::Body((const char*)Obj.GetData(), Obj.GetSize());
}
-static cpr::Body
-AsCprBody(const CompositeBuffer& Buffers)
-{
- SharedBuffer Buffer = Buffers.Flatten();
-
- // This is super inefficient, should be fixed
- std::string String{(const char*)Buffer.GetData(), Buffer.GetSize()};
- return cpr::Body{std::move(String)};
-}
-
//////////////////////////////////////////////////////////////////////////
static HttpClient::Response
@@ -221,10 +211,15 @@ struct HttpClient::Impl : public RefCounted
CprSession->SetReadCallback({});
return Result;
}
- inline cpr::Response Post()
+ inline cpr::Response Post(std::optional<cpr::ReadCallback>&& Read = {})
{
+ if (Read)
+ {
+ CprSession->SetReadCallback(std::move(Read.value()));
+ }
cpr::Response Result = CprSession->Post();
ZEN_TRACE("POST {}", Result);
+ CprSession->SetReadCallback({});
return Result;
}
inline cpr::Response Delete()
@@ -724,6 +719,12 @@ HttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, cons
HttpClient::Response
HttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader)
{
+ return Post(Url, Payload, Payload.GetContentType(), AdditionalHeader);
+}
+
+HttpClient::Response
+HttpClient::Post(std::string_view Url, const IoBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader)
+{
ZEN_TRACE_CPU("HttpClient::PostWithPayload");
return CommonResponse(DoWithRetry(
@@ -732,7 +733,7 @@ HttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMa
m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
Sess->SetBody(AsCprBody(Payload));
- Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())});
+ Sess->UpdateHeader({HeaderContentType(ContentType)});
return Sess.Post();
},
m_ConnectionSettings.RetryCount));
@@ -758,17 +759,30 @@ HttpClient::Post(std::string_view Url, CbObject Payload, const KeyValueMap& Addi
HttpClient::Response
HttpClient::Post(std::string_view Url, CbPackage Pkg, const KeyValueMap& AdditionalHeader)
{
- ZEN_TRACE_CPU("HttpClient::PostPackage");
+ return Post(Url, zen::FormatPackageMessageBuffer(Pkg), ZenContentType::kCbPackage, AdditionalHeader);
+}
+
+HttpClient::Response
+HttpClient::Post(std::string_view Url, const CompositeBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader)
+{
+ ZEN_TRACE_CPU("HttpClient::Post");
return CommonResponse(DoWithRetry(
[&]() {
- CompositeBuffer Message = zen::FormatPackageMessageBuffer(Pkg);
-
+ uint64_t SizeLeft = Payload.GetSize();
+ CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
+ auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, SizeLeft);
+ MutableMemoryView Data(buffer, size);
+ Payload.CopyTo(Data, BufferIt);
+ SizeLeft -= size;
+ return true;
+ };
Impl::Session Sess =
m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
- Sess->SetBody(AsCprBody(Message));
- Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbPackage)});
- return Sess.Post();
+ Sess->UpdateHeader({HeaderContentType(ContentType)});
+
+ return Sess.Post(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
},
m_ConnectionSettings.RetryCount));
}
diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h
index f3559f214..8318e3679 100644
--- a/src/zenhttp/include/zenhttp/httpclient.h
+++ b/src/zenhttp/include/zenhttp/httpclient.h
@@ -149,8 +149,16 @@ public:
[[nodiscard]] Response Delete(std::string_view Url, const KeyValueMap& AdditionalHeader = {});
[[nodiscard]] Response Post(std::string_view Url, const KeyValueMap& AdditionalHeader = {}, const KeyValueMap& Parameters = {});
[[nodiscard]] Response Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader = {});
+ [[nodiscard]] Response Post(std::string_view Url,
+ const IoBuffer& Payload,
+ ZenContentType ContentType,
+ const KeyValueMap& AdditionalHeader = {});
[[nodiscard]] Response Post(std::string_view Url, CbObject Payload, const KeyValueMap& AdditionalHeader = {});
[[nodiscard]] Response Post(std::string_view Url, CbPackage Payload, const KeyValueMap& AdditionalHeader = {});
+ [[nodiscard]] Response Post(std::string_view Url,
+ const CompositeBuffer& Payload,
+ ZenContentType ContentType,
+ const KeyValueMap& AdditionalHeader = {});
[[nodiscard]] Response Upload(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader = {});
[[nodiscard]] Response Upload(std::string_view Url,
const CompositeBuffer& Payload,
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
index 1508dbc3f..6b1f591f0 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
@@ -276,7 +276,7 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi
.Timeout = std::chrono::milliseconds(1800000),
.AssumeHttp2 = Options.AssumeHttp2,
.AllowResume = true,
- .RetryCount = 2};
+ .RetryCount = 4};
// 1) Access token as parameter in request
// 2) Environment variable (different win vs linux/mac)
// 3) openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index dd390d08c..0109533f6 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -209,7 +209,7 @@ namespace {
std::string(Url),
std::string(Project),
std::string(Oplog)};
- RemoteStore = CreateZenRemoteStore(Options);
+ RemoteStore = CreateZenRemoteStore(Options, TempFilePath);
}
if (!RemoteStore)
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index ae4777278..e11541534 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -2658,7 +2658,7 @@ LoadOplog(CidStore& ChunkStore,
ReportMessage(OptionalContext,
fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {}",
RemoteStoreInfo.ContainerName,
- RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
+ Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE",
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
NiceBytes(Info.OplogSizeBytes),
Info.AttachmentBlocksDownloaded.load(),
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp
index cfb558040..600338843 100644
--- a/src/zenserver/projectstore/zenremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp
@@ -6,15 +6,10 @@
#include <zencore/compactbinarypackage.h>
#include <zencore/compositebuffer.h>
#include <zencore/fmtutils.h>
-#include <zencore/scopeguard.h>
#include <zencore/stream.h>
-#include <zencore/timer.h>
+#include <zenhttp/httpclient.h>
#include <zenutil/packageformat.h>
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <cpr/cpr.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
namespace zen {
using namespace std::literals;
@@ -22,17 +17,16 @@ using namespace std::literals;
class ZenRemoteStore : public RemoteProjectStore
{
public:
- ZenRemoteStore(std::string_view HostAddress,
- std::string_view Project,
- std::string_view Oplog,
- size_t MaxBlockSize,
- size_t MaxChunkEmbedSize)
+ ZenRemoteStore(std::string_view HostAddress,
+ std::string_view Project,
+ std::string_view Oplog,
+ const std::filesystem::path& TempFilePath)
: m_HostAddress(HostAddress)
, m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress))
, m_Project(Project)
, m_Oplog(Oplog)
- , m_MaxBlockSize(MaxBlockSize)
- , m_MaxChunkEmbedSize(MaxChunkEmbedSize)
+ , m_TempFilePath(TempFilePath)
+ , m_Client(m_ProjectStoreUrl, {.LogCategory = "ZenRemoteStore", .RetryCount = 2})
{
}
@@ -47,39 +41,27 @@ public:
virtual SaveResult SaveContainer(const IoBuffer& Payload) override
{
- Stopwatch Timer;
-
- std::unique_ptr<cpr::Session> Session(AllocateSession());
- auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
-
- std::string SaveRequest = fmt::format("{}/{}/oplog/{}/save"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
- Session->SetUrl({SaveRequest});
- Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}});
- MemoryView Data(Payload.GetView());
- Session->SetBody({reinterpret_cast<const char*>(Data.GetData()), Data.GetSize()});
- cpr::Response Response = Session->Post();
- SaveResult Result = SaveResult{ConvertResult(Response)};
+ std::string SaveRequest = fmt::format("/{}/oplog/{}/save"sv, m_Project, m_Oplog);
+ HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCbObject);
+ SaveResult Result = SaveResult{ConvertResult(Response)};
if (Result.ErrorCode)
{
- Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}. Reason: '{}'",
+ Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}. Reason: '{}'",
m_ProjectStoreUrl,
m_Project,
m_Oplog,
Result.Reason);
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
- IoBuffer ResponsePayload(IoBuffer::Wrap, Response.text.data(), Response.text.size());
- CbObject ResponseObject = LoadCompactBinaryObject(ResponsePayload);
+ CbObject ResponseObject = Response.AsObject();
if (!ResponseObject)
{
- Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv,
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog);
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv,
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
return Result;
}
CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView();
@@ -89,33 +71,15 @@ public:
Result.Needs.insert(ChunkHash);
}
- Result.RawHash = IoHash::HashBuffer(Payload);
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ Result.RawHash = IoHash::HashBuffer(Payload);
return Result;
}
virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override
{
- Stopwatch Timer;
-
- std::unique_ptr<cpr::Session> Session(AllocateSession());
- auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
-
- std::string SaveRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash);
- Session->SetUrl({SaveRequest});
- Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}});
- uint64_t SizeLeft = Payload.GetSize();
- CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
- auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, SizeLeft);
- MutableMemoryView Data(buffer, size);
- Payload.CopyTo(Data, BufferIt);
- SizeLeft -= size;
- return true;
- };
- Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback));
- cpr::Response Response = Session->Post();
- SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)};
+ std::string SaveRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
+ HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCompressedBinary);
+ SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)};
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'",
@@ -125,14 +89,11 @@ public:
RawHash,
Result.Reason);
}
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
{
- Stopwatch Timer;
-
CbPackage RequestPackage;
{
CbObjectWriter RequestWriter;
@@ -151,26 +112,10 @@ public:
RequestWriter.EndArray(); // "chunks"
RequestPackage.SetObject(RequestWriter.Save());
}
- CompositeBuffer Payload = FormatPackageMessageBuffer(RequestPackage, FormatFlags::kDefault);
-
- std::unique_ptr<cpr::Session> Session(AllocateSession());
- auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
- std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
- Session->SetUrl({SaveRequest});
- Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}});
+ std::string SaveRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog);
+ HttpClient::Response Response = m_Client.Post(SaveRequest, RequestPackage);
- uint64_t SizeLeft = Payload.GetSize();
- CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
- auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, SizeLeft);
- MutableMemoryView Data(buffer, size);
- Payload.CopyTo(Data, BufferIt);
- SizeLeft -= size;
- return true;
- };
- Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback));
- cpr::Response Response = Session->Post();
- SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)};
+ SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)};
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed saving {} oplog attachments to {}/{}/{}. Reason: '{}'",
@@ -180,17 +125,12 @@ public:
m_Oplog,
Result.Reason);
}
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
{
- Stopwatch Timer;
-
- std::unique_ptr<cpr::Session> Session(AllocateSession());
- auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
- std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
+ std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog);
CbObject Request;
{
@@ -206,17 +146,22 @@ public:
RequestWriter.EndArray(); // "chunks"
Request = RequestWriter.Save();
}
- IoBuffer Payload = Request.GetBuffer().AsIoBuffer();
- Session->SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()});
- Session->SetUrl(SaveRequest);
- Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))},
- {"Accept", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}});
- cpr::Response Response = Session->Post();
- LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)};
- if (!Result.ErrorCode)
+ HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage));
+
+ LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
{
- CbPackage Package = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()));
+ Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'",
+ RawHashes.size(),
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog,
+ Result.Reason);
+ }
+ else
+ {
+ CbPackage Package = Response.AsPackage();
std::span<const CbAttachment> Attachments = Package.GetAttachments();
Result.Chunks.reserve(Attachments.size());
for (const CbAttachment& Attachment : Attachments)
@@ -225,42 +170,17 @@ public:
std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()});
}
}
- else
- {
- Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'",
- RawHashes.size(),
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog,
- Result.Reason);
- }
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
};
- virtual FinalizeResult FinalizeContainer(const IoHash&) override
- {
- Stopwatch Timer;
-
- RwLock::ExclusiveLockScope _(SessionsLock);
- Sessions.clear();
- return FinalizeResult{Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}};
- }
+ virtual FinalizeResult FinalizeContainer(const IoHash&) override { return FinalizeResult{Result{}}; }
virtual LoadContainerResult LoadContainer() override
{
- Stopwatch Timer;
+ std::string LoadRequest = fmt::format("/{}/oplog/{}/load"sv, m_Project, m_Oplog);
- std::unique_ptr<cpr::Session> Session(AllocateSession());
- auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
- std::string SaveRequest = fmt::format("{}/{}/oplog/{}/load"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
- Session->SetUrl(SaveRequest);
- Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCbObject))}});
- Session->SetParameters(
- {{"maxblocksize", fmt::format("{}", m_MaxBlockSize)}, {"maxchunkembedsize", fmt::format("{}", m_MaxChunkEmbedSize)}});
- cpr::Response Response = Session->Get();
-
- LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)};
+ HttpClient::Response Response = m_Client.Get(LoadRequest, HttpClient::Accept(ZenContentType::kCbObject));
+ LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)};
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}. Reason: '{}'",
@@ -271,7 +191,7 @@ public:
}
else
{
- Result.ContainerObject = LoadCompactBinaryObject(IoBuffer(IoBuffer::Clone, Response.text.data(), Response.text.size()));
+ Result.ContainerObject = Response.AsObject();
if (!Result.ContainerObject)
{
Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv,
@@ -281,7 +201,6 @@ public:
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
}
}
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
@@ -299,19 +218,14 @@ public:
virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
{
- Stopwatch Timer;
-
- std::unique_ptr<cpr::Session> Session(AllocateSession());
- auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
-
- std::string LoadRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash);
- Session->SetUrl({LoadRequest});
- Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}});
- cpr::Response Response = Session->Get();
- LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)};
+ std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
+ HttpClient::Response Response =
+ m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary));
+ LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)};
if (!Result.ErrorCode)
{
- Result.Bytes = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
+ Result.Bytes = Response.ResponsePayload;
+ Result.Bytes.MakeOwned();
}
if (!Result.ErrorCode)
{
@@ -322,73 +236,40 @@ public:
RawHash,
Result.Reason);
}
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
private:
- std::unique_ptr<cpr::Session> AllocateSession()
- {
- RwLock::ExclusiveLockScope _(SessionsLock);
- if (Sessions.empty())
- {
- Sessions.emplace_back(std::make_unique<cpr::Session>());
- }
- std::unique_ptr<cpr::Session> Session = std::move(Sessions.back());
- Sessions.pop_back();
- return Session;
- }
-
- void ReleaseSession(std::unique_ptr<cpr::Session>&& Session)
+ static Result ConvertResult(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv)
{
- RwLock::ExclusiveLockScope _(SessionsLock);
- Sessions.emplace_back(std::move(Session));
- }
-
- static Result ConvertResult(const cpr::Response& Response)
- {
- std::string Text;
- std::string Reason = Response.reason;
- int32_t ErrorCode = 0;
- if (Response.error.code != cpr::ErrorCode::OK)
+ if (Response.Error)
{
- ErrorCode = static_cast<int32_t>(Response.error.code);
- if (!Response.error.message.empty())
- {
- Reason = Response.error.message;
- }
+ return {.ErrorCode = Response.Error.value().ErrorCode,
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .Reason = Response.ErrorMessage(""),
+ .Text = Response.ToText()};
}
- else if (!IsHttpSuccessCode(Response.status_code))
+ if (!Response.IsSuccess())
{
- ErrorCode = static_cast<int32_t>(Response.status_code);
-
- if (auto It = Response.header.find("Content-Type"); It != Response.header.end())
- {
- zen::HttpContentType ContentType = zen::ParseContentType(It->second);
- if (ContentType == zen::HttpContentType::kText)
- {
- Text = Response.text;
- }
- }
-
- Reason = fmt::format("{}"sv, Response.status_code);
+ return {.ErrorCode = static_cast<int32_t>(Response.StatusCode),
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .Reason = Response.ErrorMessage(ErrorPrefix),
+ .Text = Response.ToText()};
}
- return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.elapsed, .Reason = Reason, .Text = Text};
+ return {.ErrorCode = 0, .ElapsedSeconds = Response.ElapsedSeconds};
}
- RwLock SessionsLock;
- std::vector<std::unique_ptr<cpr::Session>> Sessions;
+ const std::string m_HostAddress;
+ const std::string m_ProjectStoreUrl;
+ const std::string m_Project;
+ const std::string m_Oplog;
+ const std::filesystem::path m_TempFilePath;
- const std::string m_HostAddress;
- const std::string m_ProjectStoreUrl;
- const std::string m_Project;
- const std::string m_Oplog;
- const size_t m_MaxBlockSize;
- const size_t m_MaxChunkEmbedSize;
+ HttpClient m_Client;
};
std::shared_ptr<RemoteProjectStore>
-CreateZenRemoteStore(const ZenRemoteStoreOptions& Options)
+CreateZenRemoteStore(const ZenRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath)
{
std::string Url = Options.Url;
if (Url.find("://"sv) == std::string::npos)
@@ -397,7 +278,7 @@ CreateZenRemoteStore(const ZenRemoteStoreOptions& Options)
Url = fmt::format("http://{}"sv, Url);
}
std::shared_ptr<RemoteProjectStore> RemoteStore =
- std::make_shared<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize);
+ std::make_shared<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, TempFilePath);
return RemoteStore;
}
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.h b/src/zenserver/projectstore/zenremoteprojectstore.h
index 9f079ee74..7c81a597d 100644
--- a/src/zenserver/projectstore/zenremoteprojectstore.h
+++ b/src/zenserver/projectstore/zenremoteprojectstore.h
@@ -13,6 +13,6 @@ struct ZenRemoteStoreOptions : RemoteStoreOptions
std::string OplogId;
};
-std::shared_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options);
+std::shared_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath);
} // namespace zen