aboutsummaryrefslogtreecommitdiff
path: root/zenserver
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-11-03 13:19:44 +0100
committerMartin Ridgers <[email protected]>2021-11-03 13:19:44 +0100
commitf5ab857c933901e02b7918f765d5c9221a755ead (patch)
tree6db9b4c9c0ea791a5971d627c2cc7f090235e0cd /zenserver
parentThere is no "StringBuilderImpl<C>::operator << (const C*)". (diff)
parentType consistency around signed/unsigned comparison (diff)
downloadzen-f5ab857c933901e02b7918f765d5c9221a755ead.tar.xz
zen-f5ab857c933901e02b7918f765d5c9221a755ead.zip
Merged main
Diffstat (limited to 'zenserver')
-rw-r--r--zenserver/diag/formatters.h20
-rw-r--r--zenserver/upstream/jupiter.cpp10
-rw-r--r--zenserver/upstream/upstreamcache.cpp197
3 files changed, 125 insertions, 102 deletions
diff --git a/zenserver/diag/formatters.h b/zenserver/diag/formatters.h
index 42f928efe..759df58d3 100644
--- a/zenserver/diag/formatters.h
+++ b/zenserver/diag/formatters.h
@@ -2,6 +2,11 @@
#pragma once
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/iobuffer.h>
+#include <zencore/string.h>
+
ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
#include <fmt/format.h>
@@ -17,7 +22,7 @@ struct fmt::formatter<cpr::Response>
{
using namespace std::literals;
- if (Response.status_code == 200)
+ if (Response.status_code == 200 || Response.status_code == 201)
{
return fmt::format_to(Ctx.out(),
"Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s",
@@ -32,18 +37,21 @@ struct fmt::formatter<cpr::Response>
const auto It = Response.header.find("Content-Type");
const std::string_view ContentType = It != Response.header.end() ? It->second : "<None>"sv;
- const bool IsBinary = ContentType == "application/x-ue-cb"sv || ContentType == "application/x-ue-comp"sv ||
- ContentType == "application/octet-stream";
-
- if (IsBinary)
+ if (ContentType == "application/x-ue-cb"sv)
{
+ zen::IoBuffer Body(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ zen::CbObjectView Obj(Body.Data());
+ zen::ExtendableStringBuilder<256> Sb;
+ std::string_view Json = Obj.ToJson(Sb).ToView();
+
return fmt::format_to(Ctx.out(),
- "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Reason: '{}'",
+ "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Response: '{}', Reason: '{}'",
Response.url.str(),
Response.status_code,
Response.uploaded_bytes,
Response.downloaded_bytes,
Response.elapsed,
+ Json,
Response.reason);
}
else
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index c59b2fedf..11f8a3171 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -86,12 +86,12 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke
}
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key << ".raw";
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key;
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -136,6 +136,7 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}});
+ Session.SetOption(cpr::Body{});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -171,6 +172,7 @@ CloudCacheSession::GetBlob(const IoHash& Key)
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}});
+ Session.SetOption(cpr::Body{});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -202,6 +204,7 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key)
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}});
+ Session.SetOption(cpr::Body{});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -233,6 +236,7 @@ CloudCacheSession::GetObject(const IoHash& Key)
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
+ Session.SetOption(cpr::Body{});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -382,6 +386,7 @@ CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, con
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value},
{"X-Jupiter-IoHash", RefHash.ToHexString()},
{"Content-Type", "application/x-ue-cb"}});
+ Session.SetBody(cpr::Body{});
cpr::Response Response = Session.Post();
ZEN_DEBUG("POST {}", Response);
@@ -545,6 +550,7 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key)
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
+ Session.SetOption(cpr::Body{});
cpr::Response Response = Session.Head();
ZEN_DEBUG("HEAD {}", Response);
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index db1981ec7..00555f2ce 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -175,8 +175,10 @@ namespace detail {
IoBuffer RecordValue,
std::span<IoBuffer const> Payloads) override
{
+ using namespace fmt::literals;
+
ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
- const uint32_t MaxAttempts = 3;
+ const int32_t MaxAttempts = 3;
try
{
@@ -204,117 +206,112 @@ namespace detail {
}
else
{
- bool Success = false;
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
- {
- Success = false;
- for (uint32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
+ const auto PutBlobs = [&](std::span<IoHash> PayloadIds, std::string& OutReason) -> bool {
+ for (const IoHash& PayloadId : PayloadIds)
{
- if (CloudCacheResult Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
- Result.Success)
+ const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId);
+
+ if (It == std::end(CacheRecord.PayloadIds))
{
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
- Success = true;
- break;
+ OutReason = "payload '{}' MISSING from local cache"_format(PayloadId);
+ return false;
}
- }
- if (!Success)
- {
- return {.Reason = "Failed to upload payload",
- .Bytes = TotalBytes,
- .ElapsedSeconds = TotalElapsedSeconds,
- .Success = false};
+ const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It);
+
+ CloudCacheResult BlobResult;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
+ {
+ BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ }
+
+ if (!BlobResult.Success)
+ {
+ OutReason = "upload payload '{}' FAILED, reason '{}'"_format(PayloadId, BlobResult.Reason);
+ return false;
+ }
+
+ TotalBytes += BlobResult.Bytes;
+ TotalElapsedSeconds += BlobResult.ElapsedSeconds;
}
+
+ return true;
+ };
+
+ PutRefResult RefResult;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++)
+ {
+ RefResult =
+ Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject);
}
- Success = false;
- for (uint32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
+ if (!RefResult.Success)
{
- if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- RecordValue,
- ZenContentType::kCbObject);
- Result.Success)
- {
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
- Success = true;
+ return {.Reason = "upload cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ RefResult.Reason),
+ .Success = false};
+ }
- if (!Result.Needs.empty())
- {
- for (const IoHash& NeededHash : Result.Needs)
- {
- Success = false;
-
- if (auto It =
- std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash);
- It != std::end(CacheRecord.PayloadIds))
- {
- const size_t Idx = It - std::begin(CacheRecord.PayloadIds);
-
- if (CloudCacheResult BlobResult =
- Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
- BlobResult.Success)
- {
- TotalBytes += BlobResult.Bytes;
- TotalElapsedSeconds += BlobResult.ElapsedSeconds;
- Success = true;
- }
- else
- {
- ZEN_WARN("upload missing payload '{}/{}/{}' FAILED",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- NeededHash);
- }
- }
- else
- {
- ZEN_WARN("needed payload '{}/{}/{}' MISSING",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- NeededHash);
- }
- }
+ TotalBytes += RefResult.Bytes;
+ TotalElapsedSeconds += RefResult.ElapsedSeconds;
- const IoHash RefHash = IoHash::HashBuffer(RecordValue);
+ std::string Reason;
+ if (!PutBlobs(RefResult.Needs, Reason))
+ {
+ return {.Reason = std::move(Reason), .Success = false};
+ }
- if (FinalizeRefResult FinalizeResult =
- Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
- FinalizeResult.Success)
- {
- TotalBytes += FinalizeResult.Bytes;
- TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
- Success = true;
-
- for (const IoHash& MissingHash : FinalizeResult.Needs)
- {
- ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- MissingHash);
- }
- }
- else
- {
- ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash);
- Success = false;
- }
- }
+ const IoHash RefHash = IoHash::HashBuffer(RecordValue);
+ FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
+
+ if (!FinalizeResult.Success)
+ {
+ return {.Reason = "finalize cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ FinalizeResult.Reason),
+ .Success = false};
+ }
- if (Success)
+ if (!FinalizeResult.Needs.empty())
+ {
+ if (!PutBlobs(FinalizeResult.Needs, Reason))
+ {
+ return {.Reason = std::move(Reason), .Success = false};
+ }
+
+ FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
+
+ if (!FinalizeResult.Success)
+ {
+ return {.Reason = "finalize '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ FinalizeResult.Reason),
+ .Success = false};
+ }
+
+ if (!FinalizeResult.Needs.empty())
+ {
+ ExtendableStringBuilder<256> Sb;
+ for (const IoHash& MissingHash : FinalizeResult.Needs)
{
- break;
+ Sb << MissingHash.ToHexString() << ",";
}
+
+ return {.Reason = "finalize '{}/{}' FAILED, still needs payload(s) '{}'"_format(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Sb.ToString()),
+ .Success = false};
}
}
- return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Success};
+ TotalBytes += FinalizeResult.Bytes;
+ TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+
+ return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true};
}
}
catch (std::exception& Err)
@@ -482,7 +479,7 @@ namespace detail {
std::span<IoBuffer const> Payloads) override
{
ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
- const uint32_t MaxAttempts = 3;
+ const int32_t MaxAttempts = 3;
try
{
@@ -890,6 +887,15 @@ private:
{
const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+
+ if (!Result.Success)
+ {
+ ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Endpoint->DisplayName(),
+ Result.Reason);
+ }
}
}
}
@@ -907,7 +913,10 @@ private:
}
catch (std::exception& e)
{
- ZEN_WARN("process upstream ({}/{}) FAILED '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, e.what());
+ ZEN_WARN("upload cache record '{}/{}' FAILED, reason '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ e.what());
}
}