aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-02-12 08:58:52 +0100
committerGitHub Enterprise <[email protected]>2025-02-12 08:58:52 +0100
commitd39724eb644cab4ec5bbf19a703cb770b34e68c4 (patch)
treee7263163d6bed8ba7f5cd05822f6993a26907b01
parentFix workspace shares reply array (#280) (diff)
downloadzen-d39724eb644cab4ec5bbf19a703cb770b34e68c4.tar.xz
zen-d39724eb644cab4ec5bbf19a703cb770b34e68c4.zip
improved builds api interface in jupiter (#281)
* multipart upload/download iterface in jupiter * review fixes
-rw-r--r--src/zenhttp/httpclient.cpp311
-rw-r--r--src/zenhttp/httpclientauth.cpp3
-rw-r--r--src/zenhttp/include/zenhttp/httpclient.h4
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupitersession.h70
-rw-r--r--src/zenutil/jupiter/jupitersession.cpp293
5 files changed, 518 insertions, 163 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index 8052a8fd5..211a5d05c 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -282,7 +282,7 @@ AsCprBody(const IoBuffer& Obj)
//////////////////////////////////////////////////////////////////////////
static HttpClient::Response
-ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload)
+ResponseWithPayload(std::string_view SessionId, cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload)
{
// This ends up doing a memcpy, would be good to get rid of it by streaming results
// into buffer directly
@@ -297,7 +297,7 @@ ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResp
if (!IsHttpSuccessCode(WorkResponseCode) && WorkResponseCode != HttpResponseCode::NotFound)
{
- ZEN_WARN("HttpClient request failed: {}", HttpResponse);
+ ZEN_WARN("HttpClient request failed (session: {}): {}", SessionId, HttpResponse);
}
return HttpClient::Response{.StatusCode = WorkResponseCode,
@@ -309,12 +309,12 @@ ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResp
}
static HttpClient::Response
-CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {})
+CommonResponse(std::string_view SessionId, cpr::Response&& HttpResponse, IoBuffer&& Payload = {})
{
const HttpResponseCode WorkResponseCode = HttpResponseCode(HttpResponse.status_code);
if (HttpResponse.error)
{
- ZEN_WARN("HttpClient client error: {}", HttpResponse);
+ ZEN_WARN("HttpClient client error (session: {}): {}", SessionId, HttpResponse);
// Client side failure code
return HttpClient::Response{
@@ -339,6 +339,7 @@ CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {})
else
{
return ResponseWithPayload(
+ SessionId,
HttpResponse,
WorkResponseCode,
Payload ? std::move(Payload) : IoBufferBuilder::MakeCloneFromMemory(HttpResponse.text.data(), HttpResponse.text.size()));
@@ -448,7 +449,7 @@ ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile
}
static cpr::Response
-DoWithRetry(std::function<cpr::Response()>&& Func, uint8_t RetryCount)
+DoWithRetry(std::string_view SessionId, std::function<cpr::Response()>&& Func, uint8_t RetryCount)
{
uint8_t Attempt = 0;
cpr::Response Result = Func();
@@ -456,14 +457,17 @@ DoWithRetry(std::function<cpr::Response()>&& Func, uint8_t RetryCount)
{
Sleep(100 * (Attempt + 1));
Attempt++;
- ZEN_INFO("{} Attempt {}/{}", CommonResponse(std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1);
+ ZEN_INFO("{} Attempt {}/{}", CommonResponse(SessionId, std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1);
Result = Func();
}
return Result;
}
static cpr::Response
-DoWithRetry(std::function<cpr::Response()>&& Func, std::unique_ptr<detail::TempPayloadFile>& PayloadFile, uint8_t RetryCount)
+DoWithRetry(std::string_view SessionId,
+ std::function<cpr::Response()>&& Func,
+ std::unique_ptr<detail::TempPayloadFile>& PayloadFile,
+ uint8_t RetryCount)
{
uint8_t Attempt = 0;
cpr::Response Result = Func();
@@ -482,7 +486,7 @@ DoWithRetry(std::function<cpr::Response()>&& Func, std::unique_ptr<detail::TempP
}
Sleep(100 * (Attempt + 1));
Attempt++;
- ZEN_INFO("{} Attempt {}/{}", CommonResponse(std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1);
+ ZEN_INFO("{} Attempt {}/{}", CommonResponse(SessionId, std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1);
Result = Func();
}
return Result;
@@ -829,15 +833,18 @@ HttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap
{
ZEN_TRACE_CPU("HttpClient::Put");
- return CommonResponse(DoWithRetry(
- [&]() {
- Impl::Session Sess =
- m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
- Sess->SetBody(AsCprBody(Payload));
- Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())});
- return Sess.Put();
- },
- m_ConnectionSettings.RetryCount));
+ return CommonResponse(
+ m_SessionId,
+ DoWithRetry(
+ m_SessionId,
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Sess->SetBody(AsCprBody(Payload));
+ Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())});
+ return Sess.Put();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -845,31 +852,36 @@ HttpClient::Put(std::string_view Url, const KeyValueMap& Parameters)
{
ZEN_TRACE_CPU("HttpClient::Put");
- return CommonResponse(DoWithRetry(
- [&]() {
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri,
- Url,
- m_ConnectionSettings,
- {{"Content-Length", "0"}},
- Parameters,
- m_SessionId,
- GetAccessToken());
- return Sess.Put();
- },
- m_ConnectionSettings.RetryCount));
+ return CommonResponse(m_SessionId,
+ DoWithRetry(
+ m_SessionId,
+ [&]() {
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri,
+ Url,
+ m_ConnectionSettings,
+ {{"Content-Length", "0"}},
+ Parameters,
+ m_SessionId,
+ GetAccessToken());
+ return Sess.Put();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
HttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters)
{
ZEN_TRACE_CPU("HttpClient::Get");
- return CommonResponse(DoWithRetry(
- [&]() {
- Impl::Session Sess =
- m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken());
- return Sess.Get();
- },
- m_ConnectionSettings.RetryCount));
+ return CommonResponse(
+ m_SessionId,
+ DoWithRetry(
+ m_SessionId,
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken());
+ return Sess.Get();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -877,13 +889,16 @@ HttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader)
{
ZEN_TRACE_CPU("HttpClient::Head");
- return CommonResponse(DoWithRetry(
- [&]() {
- Impl::Session Sess =
- m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
- return Sess.Head();
- },
- m_ConnectionSettings.RetryCount));
+ return CommonResponse(
+ m_SessionId,
+ DoWithRetry(
+ m_SessionId,
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ return Sess.Head();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -891,13 +906,16 @@ HttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader)
{
ZEN_TRACE_CPU("HttpClient::Delete");
- return CommonResponse(DoWithRetry(
- [&]() {
- Impl::Session Sess =
- m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
- return Sess.Delete();
- },
- m_ConnectionSettings.RetryCount));
+ return CommonResponse(
+ m_SessionId,
+ DoWithRetry(
+ m_SessionId,
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ return Sess.Delete();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -905,13 +923,16 @@ HttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, cons
{
ZEN_TRACE_CPU("HttpClient::PostNoPayload");
- return CommonResponse(DoWithRetry(
- [&]() {
- Impl::Session Sess =
- m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken());
- return Sess.Post();
- },
- m_ConnectionSettings.RetryCount));
+ return CommonResponse(
+ m_SessionId,
+ DoWithRetry(
+ m_SessionId,
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken());
+ return Sess.Post();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -925,16 +946,19 @@ HttpClient::Post(std::string_view Url, const IoBuffer& Payload, ZenContentType C
{
ZEN_TRACE_CPU("HttpClient::PostWithPayload");
- return CommonResponse(DoWithRetry(
- [&]() {
- Impl::Session Sess =
- m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
-
- Sess->SetBody(AsCprBody(Payload));
- Sess->UpdateHeader({HeaderContentType(ContentType)});
- return Sess.Post();
- },
- m_ConnectionSettings.RetryCount));
+ return CommonResponse(
+ m_SessionId,
+ DoWithRetry(
+ m_SessionId,
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+
+ Sess->SetBody(AsCprBody(Payload));
+ Sess->UpdateHeader({HeaderContentType(ContentType)});
+ return Sess.Post();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -942,16 +966,19 @@ HttpClient::Post(std::string_view Url, CbObject Payload, const KeyValueMap& Addi
{
ZEN_TRACE_CPU("HttpClient::PostObjectPayload");
- return CommonResponse(DoWithRetry(
- [&]() {
- Impl::Session Sess =
- m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
-
- Sess->SetBody(AsCprBody(Payload));
- Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbObject)});
- return Sess.Post();
- },
- m_ConnectionSettings.RetryCount));
+ return CommonResponse(
+ m_SessionId,
+ DoWithRetry(
+ m_SessionId,
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+
+ Sess->SetBody(AsCprBody(Payload));
+ Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbObject)});
+ return Sess.Post();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -965,24 +992,27 @@ HttpClient::Post(std::string_view Url, const CompositeBuffer& Payload, ZenConten
{
ZEN_TRACE_CPU("HttpClient::Post");
- return CommonResponse(DoWithRetry(
- [&]() {
- uint64_t SizeLeft = Payload.GetSize();
- CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
- auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, SizeLeft);
- MutableMemoryView Data(buffer, size);
- Payload.CopyTo(Data, BufferIt);
- SizeLeft -= size;
- return true;
- };
- Impl::Session Sess =
- m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
- Sess->UpdateHeader({HeaderContentType(ContentType)});
-
- return Sess.Post(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
- },
- m_ConnectionSettings.RetryCount));
+ return CommonResponse(
+ m_SessionId,
+ DoWithRetry(
+ m_SessionId,
+ [&]() {
+ uint64_t SizeLeft = Payload.GetSize();
+ CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
+ auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, SizeLeft);
+ MutableMemoryView Data(buffer, size);
+ Payload.CopyTo(Data, BufferIt);
+ SizeLeft -= size;
+ return true;
+ };
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Sess->UpdateHeader({HeaderContentType(ContentType)});
+
+ return Sess.Post(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -990,29 +1020,32 @@ HttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValue
{
ZEN_TRACE_CPU("HttpClient::Upload");
- return CommonResponse(DoWithRetry(
- [&]() {
- Impl::Session Sess =
- m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
- Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())});
-
- uint64_t Offset = 0;
- if (Payload.IsWholeFile())
- {
- auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, Payload.GetSize() - Offset);
- IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
- MutableMemoryView Data(buffer, size);
- Data.CopyFrom(PayloadRange.GetView());
- Offset += size;
- return true;
- };
- return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
- }
- Sess->SetBody(AsCprBody(Payload));
- return Sess.Put();
- },
- m_ConnectionSettings.RetryCount));
+ return CommonResponse(
+ m_SessionId,
+ DoWithRetry(
+ m_SessionId,
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())});
+
+ uint64_t Offset = 0;
+ if (Payload.IsWholeFile())
+ {
+ auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, Payload.GetSize() - Offset);
+ IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
+ MutableMemoryView Data(buffer, size);
+ Data.CopyFrom(PayloadRange.GetView());
+ Offset += size;
+ return true;
+ };
+ return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+ }
+ Sess->SetBody(AsCprBody(Payload));
+ return Sess.Put();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -1020,24 +1053,27 @@ HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenCont
{
ZEN_TRACE_CPU("HttpClient::Upload");
- return CommonResponse(DoWithRetry(
- [&]() {
- Impl::Session Sess =
- m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
- Sess->UpdateHeader({HeaderContentType(ContentType)});
-
- uint64_t SizeLeft = Payload.GetSize();
- CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
- auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, SizeLeft);
- MutableMemoryView Data(buffer, size);
- Payload.CopyTo(Data, BufferIt);
- SizeLeft -= size;
- return true;
- };
- return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
- },
- m_ConnectionSettings.RetryCount));
+ return CommonResponse(
+ m_SessionId,
+ DoWithRetry(
+ m_SessionId,
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Sess->UpdateHeader({HeaderContentType(ContentType)});
+
+ uint64_t SizeLeft = Payload.GetSize();
+ CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
+ auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, SizeLeft);
+ MutableMemoryView Data(buffer, size);
+ Payload.CopyTo(Data, BufferIt);
+ SizeLeft -= size;
+ return true;
+ };
+ return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -1048,6 +1084,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
std::string PayloadString;
std::unique_ptr<detail::TempPayloadFile> PayloadFile;
cpr::Response Response = DoWithRetry(
+ m_SessionId,
[&]() {
auto GetHeader = [&](std::string header) -> std::pair<std::string, std::string> {
size_t DelimiterPos = header.find(':');
@@ -1249,7 +1286,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
PayloadFile,
m_ConnectionSettings.RetryCount);
- return CommonResponse(std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{});
+ return CommonResponse(m_SessionId, std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{});
}
//////////////////////////////////////////////////////////////////////////
diff --git a/src/zenhttp/httpclientauth.cpp b/src/zenhttp/httpclientauth.cpp
index 04ac2ad3f..7fb3224f1 100644
--- a/src/zenhttp/httpclientauth.cpp
+++ b/src/zenhttp/httpclientauth.cpp
@@ -2,6 +2,7 @@
#include <zenhttp/httpclientauth.h>
+#include <zencore/logging.h>
#include <zenhttp/auth/authmgr.h>
ZEN_THIRD_PARTY_INCLUDES_START
@@ -41,6 +42,7 @@ namespace zen { namespace httpclientauth {
if (Response.error || Response.status_code != 200)
{
+ ZEN_WARN("Failed fetching OAuth access token {}. Reason: '{}'", OAuthParams.Url, Response.reason);
return HttpClientAccessToken{};
}
@@ -49,6 +51,7 @@ namespace zen { namespace httpclientauth {
if (JsonError.empty() == false)
{
+ ZEN_WARN("Unable to parse OAuth json response from {}. Reason: '{}'", OAuthParams.Url, JsonError);
return HttpClientAccessToken{};
}
diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h
index 1cf77d794..a46b9fd83 100644
--- a/src/zenhttp/include/zenhttp/httpclient.h
+++ b/src/zenhttp/include/zenhttp/httpclient.h
@@ -60,9 +60,6 @@ struct HttpClientSettings
class HttpClient
{
public:
- struct Settings
- {
- };
HttpClient(std::string_view BaseUri, const HttpClientSettings& Connectionsettings = {});
~HttpClient();
@@ -180,6 +177,7 @@ public:
LoggerRef Logger() { return m_Log; }
std::string_view GetBaseUri() const { return m_BaseUri; }
bool Authenticate();
+ std::string_view GetSessionId() const { return m_SessionId; }
private:
const std::optional<HttpClientAccessToken> GetAccessToken();
diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h
index 6a80332f4..075c35b40 100644
--- a/src/zenutil/include/zenutil/jupiter/jupitersession.h
+++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h
@@ -102,29 +102,48 @@ public:
std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
- JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload);
- JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
- JupiterResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
- PutBuildPartResult PutBuildPart(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- std::string_view PartName,
- const IoBuffer& Payload);
- JupiterResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
- JupiterResult PutBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- ZenContentType ContentType,
- const CompositeBuffer& Payload);
- JupiterResult GetBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- std::filesystem::path TempFolderPath);
+ JupiterResult ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload);
+ JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload);
+ JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ JupiterResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ PutBuildPartResult PutBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ std::string_view PartName,
+ const IoBuffer& Payload);
+ JupiterResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
+ JupiterResult PutBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload);
+ JupiterResult GetBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ std::filesystem::path TempFolderPath);
+
+ JupiterResult PutMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ uint64_t PayloadSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
+ std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems);
+ JupiterResult GetMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver,
+ std::vector<std::function<JupiterResult()>>& OutWorkItems);
JupiterResult PutBlockMetadata(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
@@ -137,6 +156,11 @@ public:
const Oid& PartId,
const IoHash& RawHash);
JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
+ JupiterResult GetBlockMetadata(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ IoBuffer Payload);
private:
inline LoggerRef Log() { return m_Log; }
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp
index f706a7efc..d56927b44 100644
--- a/src/zenutil/jupiter/jupitersession.cpp
+++ b/src/zenutil/jupiter/jupitersession.cpp
@@ -3,6 +3,8 @@
#include <zenutil/jupiter/jupitersession.h>
#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compositebuffer.h>
#include <zencore/fmtutils.h>
#include <zencore/trace.h>
@@ -355,6 +357,16 @@ JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view Typ
}
JupiterResult
+JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/search", Namespace, BucketId),
+ Payload,
+ {HttpClient::Accept(ZenContentType::kCbObject)});
+ return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv);
+}
+
+JupiterResult
JupiterSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload)
{
ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
@@ -437,6 +449,271 @@ JupiterSession::PutBuildBlob(std::string_view Namespace,
}
JupiterResult
+JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ uint64_t PayloadSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
+ std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems)
+{
+ struct MultipartUploadResponse
+ {
+ struct Part
+ {
+ uint64_t FirstByte;
+ uint64_t LastByte;
+ std::string PartId;
+ std::string QueryString;
+ };
+
+ std::string UploadId;
+ std::string BlobName;
+ std::vector<Part> Parts;
+
+ static MultipartUploadResponse Parse(CbObject& Payload)
+ {
+ MultipartUploadResponse Result;
+ Result.UploadId = Payload["uploadId"sv].AsString();
+ Result.BlobName = Payload["blobName"sv].AsString();
+ CbArrayView PartsArray = Payload["parts"sv].AsArrayView();
+ Result.Parts.reserve(PartsArray.Num());
+ for (CbFieldView PartView : PartsArray)
+ {
+ CbObjectView PartObject = PartView.AsObjectView();
+ Result.Parts.emplace_back(Part{
+ .FirstByte = PartObject["firstByte"sv].AsUInt64(),
+ .LastByte = PartObject["lastByte"sv].AsUInt64(),
+ .PartId = std::string(PartObject["partId"sv].AsString()),
+ .QueryString = std::string(PartObject["queryString"sv].AsString()),
+ });
+ }
+ return Result;
+ }
+ };
+
+ CbObjectWriter StartMultipartPayloadWriter;
+ StartMultipartPayloadWriter.AddInteger("blobLength"sv, PayloadSize);
+ CbObject StartMultipartPayload = StartMultipartPayloadWriter.Save();
+
+ std::string StartMultipartResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/startMultipartUpload",
+ Namespace,
+ BucketId,
+ BuildId,
+ PartId,
+ Hash.ToHexString());
+ // ZEN_INFO("POST: {}", StartMultipartResponseRequestString);
+ HttpClient::Response StartMultipartResponse =
+ m_HttpClient.Post(StartMultipartResponseRequestString, StartMultipartPayload, HttpClient::Accept(ZenContentType::kCbObject));
+ if (!StartMultipartResponse.IsSuccess())
+ {
+ ZEN_WARN("{}", StartMultipartResponse.ErrorMessage("startMultipartUpload: "));
+ return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+ }
+ CbObject ResponseObject = LoadCompactBinaryObject(StartMultipartResponse.ResponsePayload);
+
+ struct WorkloadData
+ {
+ MultipartUploadResponse PartDescription;
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)> Transmitter;
+ std::atomic<size_t> PartsLeft;
+ };
+
+ std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
+
+ Workload->PartDescription = MultipartUploadResponse::Parse(ResponseObject);
+ Workload->Transmitter = std::move(Transmitter);
+ Workload->PartsLeft = Workload->PartDescription.Parts.size();
+
+ for (size_t PartIndex = 0; PartIndex < Workload->PartDescription.Parts.size(); PartIndex++)
+ {
+ OutWorkItems.emplace_back([this, Namespace, BucketId, BuildId, PartId, Hash, ContentType, Workload, PartIndex](
+ bool& OutIsComplete) -> JupiterResult {
+ const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex];
+ IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte);
+ std::string MultipartUploadResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/uploadMultipart{}",
+ Namespace,
+ BucketId,
+ BuildId,
+ PartId,
+ Hash.ToHexString(),
+ Part.QueryString);
+ // ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString);
+ HttpClient::Response MultipartUploadResponse = m_HttpClient.Put(MultipartUploadResponseRequestString, PartPayload);
+ if (!MultipartUploadResponse.IsSuccess())
+ {
+ ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(MultipartUploadResponseRequestString));
+ }
+ OutIsComplete = Workload->PartsLeft.fetch_sub(1) == 1;
+ if (OutIsComplete)
+ {
+ int64_t TotalUploadedBytes = MultipartUploadResponse.UploadedBytes;
+ int64_t TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes;
+ double TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds;
+ HttpClient::Response MultipartEndResponse = MultipartUploadResponse;
+ while (MultipartEndResponse.IsSuccess())
+ {
+ CbObjectWriter CompletePayloadWriter;
+ CompletePayloadWriter.AddString("blobName"sv, Workload->PartDescription.BlobName);
+ CompletePayloadWriter.AddString("uploadId"sv, Workload->PartDescription.UploadId);
+ CompletePayloadWriter.AddBool("isCompressed"sv, ContentType == ZenContentType::kCompressedBinary);
+ CompletePayloadWriter.BeginArray("partIds"sv);
+ std::unordered_map<std::string, size_t> PartNameToIndex;
+ for (size_t UploadPartIndex = 0; UploadPartIndex < Workload->PartDescription.Parts.size(); UploadPartIndex++)
+ {
+ const MultipartUploadResponse::Part& PartDescription = Workload->PartDescription.Parts[UploadPartIndex];
+ PartNameToIndex.insert({PartDescription.PartId, UploadPartIndex});
+ CompletePayloadWriter.AddString(PartDescription.PartId);
+ }
+ CompletePayloadWriter.EndArray(); // "partIds"
+ CbObject CompletePayload = CompletePayloadWriter.Save();
+
+ std::string MultipartEndResponseRequestString =
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/completeMultipart",
+ Namespace,
+ BucketId,
+ BuildId,
+ PartId,
+ Hash.ToHexString());
+
+ MultipartEndResponse = m_HttpClient.Post(MultipartEndResponseRequestString,
+ CompletePayload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ TotalUploadedBytes += MultipartEndResponse.UploadedBytes;
+ TotalDownloadedBytes += MultipartEndResponse.DownloadedBytes;
+ TotalElapsedSeconds += MultipartEndResponse.ElapsedSeconds;
+ if (MultipartEndResponse.IsSuccess())
+ {
+ CbObject ResponseObject = MultipartEndResponse.AsObject();
+ CbArrayView MissingPartsArrayView = ResponseObject["missingParts"sv].AsArrayView();
+ if (MissingPartsArrayView.Num() == 0)
+ {
+ break;
+ }
+ else
+ {
+ for (CbFieldView PartIdView : MissingPartsArrayView)
+ {
+ std::string RetryPartId(PartIdView.AsString());
+ size_t RetryPartIndex = PartNameToIndex.at(RetryPartId);
+ const MultipartUploadResponse::Part& RetryPart = Workload->PartDescription.Parts[RetryPartIndex];
+ IoBuffer RetryPartPayload =
+ Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte);
+ std::string RetryMultipartUploadResponseRequestString =
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/uploadMultipart{}",
+ Namespace,
+ BucketId,
+ BuildId,
+ RetryPartId,
+ Hash.ToHexString(),
+ RetryPart.QueryString);
+
+ MultipartUploadResponse = m_HttpClient.Put(RetryMultipartUploadResponseRequestString, RetryPartPayload);
+ TotalUploadedBytes = MultipartUploadResponse.UploadedBytes;
+ TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes;
+ TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds;
+ if (!MultipartUploadResponse.IsSuccess())
+ {
+ ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(RetryMultipartUploadResponseRequestString));
+ MultipartEndResponse = MultipartUploadResponse;
+ }
+ }
+ }
+ }
+ else
+ {
+ ZEN_WARN("{}", MultipartEndResponse.ErrorMessage(MultipartEndResponseRequestString));
+ }
+ }
+ MultipartEndResponse.UploadedBytes = TotalUploadedBytes;
+ MultipartEndResponse.DownloadedBytes = TotalDownloadedBytes;
+ MultipartEndResponse.ElapsedSeconds = TotalElapsedSeconds;
+ return detail::ConvertResponse(MultipartEndResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+ }
+ return detail::ConvertResponse(MultipartUploadResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+ });
+ }
+ return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+}
+
+JupiterResult
+JupiterSession::GetMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver,
+ std::vector<std::function<JupiterResult()>>& OutWorkItems)
+{
+ std::string RequestUrl =
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString());
+ HttpClient::Response Response =
+ m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}}));
+ if (Response.IsSuccess())
+ {
+ if (std::string_view ContentRange = Response.Header.Entries["Content-Range"]; !ContentRange.empty())
+ {
+ if (std::string_view::size_type SizeDelimiterPos = ContentRange.find('/'); SizeDelimiterPos != std::string_view::npos)
+ {
+ if (std::optional<uint64_t> TotalSizeMaybe = ParseInt<uint64_t>(ContentRange.substr(SizeDelimiterPos + 1));
+ TotalSizeMaybe.has_value())
+ {
+ uint64_t TotalSize = TotalSizeMaybe.value();
+ uint64_t PayloadSize = Response.ResponsePayload.GetSize();
+
+ Receiver(0, Response.ResponsePayload, TotalSize);
+
+ if (TotalSize > PayloadSize)
+ {
+ struct WorkloadData
+ {
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver;
+ std::atomic<uint64_t> BytesRemaining;
+ };
+
+ std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
+ Workload->Receiver = std::move(Receiver);
+ Workload->BytesRemaining = TotalSize - PayloadSize;
+
+ uint64_t Offset = PayloadSize;
+ while (Offset < TotalSize)
+ {
+ uint64_t PartSize = Min(ChunkSize, TotalSize - Offset);
+ OutWorkItems.emplace_back(
+ [this, Namespace, BucketId, BuildId, PartId, Hash, TotalSize, Workload, Offset, PartSize]()
+ -> JupiterResult {
+ std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}",
+ Namespace,
+ BucketId,
+ BuildId,
+ PartId,
+ Hash.ToHexString());
+ HttpClient::Response Response = m_HttpClient.Get(
+ RequestUrl,
+ HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}}));
+ if (Response.IsSuccess())
+ {
+ uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize());
+ Workload->Receiver(Offset, Response.ResponsePayload, ByteRemaning);
+ }
+ return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
+ });
+ Offset += PartSize;
+ }
+ }
+ return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
+ }
+ }
+ }
+ Receiver(0, Response.ResponsePayload, Response.ResponsePayload.GetSize());
+ }
+ return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
+}
+
+JupiterResult
JupiterSession::GetBuildBlob(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
@@ -447,6 +724,7 @@ JupiterSession::GetBuildBlob(std::string_view Namespace,
HttpClient::Response Response = m_HttpClient.Download(
fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
TempFolderPath);
+
return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv);
}
@@ -502,4 +780,19 @@ JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId
return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv);
}
+JupiterResult
+JupiterSession::GetBlockMetadata(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ IoBuffer Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response =
+ m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/getBlockMetadata", Namespace, BucketId, BuildId, PartId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv);
+}
+
} // namespace zen