From d39724eb644cab4ec5bbf19a703cb770b34e68c4 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 12 Feb 2025 08:58:52 +0100 Subject: improved builds api interface in jupiter (#281) * multipart upload/download iterface in jupiter * review fixes --- src/zenhttp/httpclient.cpp | 311 +++++++++++++++++++++++++-------------------- 1 file changed, 174 insertions(+), 137 deletions(-) (limited to 'src/zenhttp/httpclient.cpp') diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index 8052a8fd5..211a5d05c 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -282,7 +282,7 @@ AsCprBody(const IoBuffer& Obj) ////////////////////////////////////////////////////////////////////////// static HttpClient::Response -ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload) +ResponseWithPayload(std::string_view SessionId, cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload) { // This ends up doing a memcpy, would be good to get rid of it by streaming results // into buffer directly @@ -297,7 +297,7 @@ ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResp if (!IsHttpSuccessCode(WorkResponseCode) && WorkResponseCode != HttpResponseCode::NotFound) { - ZEN_WARN("HttpClient request failed: {}", HttpResponse); + ZEN_WARN("HttpClient request failed (session: {}): {}", SessionId, HttpResponse); } return HttpClient::Response{.StatusCode = WorkResponseCode, @@ -309,12 +309,12 @@ ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResp } static HttpClient::Response -CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) +CommonResponse(std::string_view SessionId, cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) { const HttpResponseCode WorkResponseCode = HttpResponseCode(HttpResponse.status_code); if (HttpResponse.error) { - ZEN_WARN("HttpClient client error: {}", HttpResponse); + ZEN_WARN("HttpClient client error (session: {}): {}", SessionId, HttpResponse); // Client side failure code return HttpClient::Response{ @@ -339,6 +339,7 @@ CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) else { return ResponseWithPayload( + SessionId, HttpResponse, WorkResponseCode, Payload ? std::move(Payload) : IoBufferBuilder::MakeCloneFromMemory(HttpResponse.text.data(), HttpResponse.text.size())); @@ -448,7 +449,7 @@ ValidatePayload(cpr::Response& Response, std::unique_ptr&& Func, uint8_t RetryCount) +DoWithRetry(std::string_view SessionId, std::function&& Func, uint8_t RetryCount) { uint8_t Attempt = 0; cpr::Response Result = Func(); @@ -456,14 +457,17 @@ DoWithRetry(std::function&& Func, uint8_t RetryCount) { Sleep(100 * (Attempt + 1)); Attempt++; - ZEN_INFO("{} Attempt {}/{}", CommonResponse(std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1); + ZEN_INFO("{} Attempt {}/{}", CommonResponse(SessionId, std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1); Result = Func(); } return Result; } static cpr::Response -DoWithRetry(std::function&& Func, std::unique_ptr& PayloadFile, uint8_t RetryCount) +DoWithRetry(std::string_view SessionId, + std::function&& Func, + std::unique_ptr& PayloadFile, + uint8_t RetryCount) { uint8_t Attempt = 0; cpr::Response Result = Func(); @@ -482,7 +486,7 @@ DoWithRetry(std::function&& Func, std::unique_ptrAllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); - return Sess.Put(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->SetBody(AsCprBody(Payload)); + Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -845,31 +852,36 @@ HttpClient::Put(std::string_view Url, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("HttpClient::Put"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, - Url, - m_ConnectionSettings, - {{"Content-Length", "0"}}, - Parameters, - m_SessionId, - GetAccessToken()); - return Sess.Put(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse(m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, + Url, + m_ConnectionSettings, + {{"Content-Length", "0"}}, + Parameters, + m_SessionId, + GetAccessToken()); + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response HttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("HttpClient::Get"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); - return Sess.Get(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); + return Sess.Get(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -877,13 +889,16 @@ HttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Head"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - return Sess.Head(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + return Sess.Head(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -891,13 +906,16 @@ HttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Delete"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - return Sess.Delete(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + return Sess.Delete(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -905,13 +923,16 @@ HttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, cons { ZEN_TRACE_CPU("HttpClient::PostNoPayload"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); - return Sess.Post(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -925,16 +946,19 @@ HttpClient::Post(std::string_view Url, const IoBuffer& Payload, ZenContentType C { ZEN_TRACE_CPU("HttpClient::PostWithPayload"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader({HeaderContentType(ContentType)}); - return Sess.Post(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + + Sess->SetBody(AsCprBody(Payload)); + Sess->UpdateHeader({HeaderContentType(ContentType)}); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -942,16 +966,19 @@ HttpClient::Post(std::string_view Url, CbObject Payload, const KeyValueMap& Addi { ZEN_TRACE_CPU("HttpClient::PostObjectPayload"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbObject)}); - return Sess.Post(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + + Sess->SetBody(AsCprBody(Payload)); + Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbObject)}); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -965,24 +992,27 @@ HttpClient::Post(std::string_view Url, const CompositeBuffer& Payload, ZenConten { ZEN_TRACE_CPU("HttpClient::Post"); - return CommonResponse(DoWithRetry( - [&]() { - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Sess->UpdateHeader({HeaderContentType(ContentType)}); - - return Sess.Post(cpr::ReadCallback(gsl::narrow(Payload.GetSize()), ReadCallback)); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + uint64_t SizeLeft = Payload.GetSize(); + CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + size = Min(size, SizeLeft); + MutableMemoryView Data(buffer, size); + Payload.CopyTo(Data, BufferIt); + SizeLeft -= size; + return true; + }; + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(ContentType)}); + + return Sess.Post(cpr::ReadCallback(gsl::narrow(Payload.GetSize()), ReadCallback)); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -990,29 +1020,32 @@ HttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValue { ZEN_TRACE_CPU("HttpClient::Upload"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); - - uint64_t Offset = 0; - if (Payload.IsWholeFile()) - { - auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min(size, Payload.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; - }; - return Sess.Put(cpr::ReadCallback(gsl::narrow(Payload.GetSize()), ReadCallback)); - } - Sess->SetBody(AsCprBody(Payload)); - return Sess.Put(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); + + uint64_t Offset = 0; + if (Payload.IsWholeFile()) + { + auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { + size = Min(size, Payload.GetSize() - Offset); + IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); + MutableMemoryView Data(buffer, size); + Data.CopyFrom(PayloadRange.GetView()); + Offset += size; + return true; + }; + return Sess.Put(cpr::ReadCallback(gsl::narrow(Payload.GetSize()), ReadCallback)); + } + Sess->SetBody(AsCprBody(Payload)); + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -1020,24 +1053,27 @@ HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenCont { ZEN_TRACE_CPU("HttpClient::Upload"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Sess->UpdateHeader({HeaderContentType(ContentType)}); - - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - return Sess.Put(cpr::ReadCallback(gsl::narrow(Payload.GetSize()), ReadCallback)); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(ContentType)}); + + uint64_t SizeLeft = Payload.GetSize(); + CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + size = Min(size, SizeLeft); + MutableMemoryView Data(buffer, size); + Payload.CopyTo(Data, BufferIt); + SizeLeft -= size; + return true; + }; + return Sess.Put(cpr::ReadCallback(gsl::narrow(Payload.GetSize()), ReadCallback)); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -1048,6 +1084,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold std::string PayloadString; std::unique_ptr PayloadFile; cpr::Response Response = DoWithRetry( + m_SessionId, [&]() { auto GetHeader = [&](std::string header) -> std::pair { size_t DelimiterPos = header.find(':'); @@ -1249,7 +1286,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold PayloadFile, m_ConnectionSettings.RetryCount); - return CommonResponse(std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{}); + return CommonResponse(m_SessionId, std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{}); } ////////////////////////////////////////////////////////////////////////// -- cgit v1.2.3