diff options
| author | Martin Ridgers <[email protected]> | 2021-11-03 13:19:44 +0100 |
|---|---|---|
| committer | Martin Ridgers <[email protected]> | 2021-11-03 13:19:44 +0100 |
| commit | f5ab857c933901e02b7918f765d5c9221a755ead (patch) | |
| tree | 6db9b4c9c0ea791a5971d627c2cc7f090235e0cd /zenserver | |
| parent | There is no "StringBuilderImpl<C>::operator << (const C*)". (diff) | |
| parent | Type consistency around signed/unsigned comparison (diff) | |
| download | zen-f5ab857c933901e02b7918f765d5c9221a755ead.tar.xz zen-f5ab857c933901e02b7918f765d5c9221a755ead.zip | |
Merged main
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/diag/formatters.h | 20 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 10 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 197 |
3 files changed, 125 insertions, 102 deletions
diff --git a/zenserver/diag/formatters.h b/zenserver/diag/formatters.h index 42f928efe..759df58d3 100644 --- a/zenserver/diag/formatters.h +++ b/zenserver/diag/formatters.h @@ -2,6 +2,11 @@ #pragma once +#include <zencore/compactbinary.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/iobuffer.h> +#include <zencore/string.h> + ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> #include <fmt/format.h> @@ -17,7 +22,7 @@ struct fmt::formatter<cpr::Response> { using namespace std::literals; - if (Response.status_code == 200) + if (Response.status_code == 200 || Response.status_code == 201) { return fmt::format_to(Ctx.out(), "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s", @@ -32,18 +37,21 @@ struct fmt::formatter<cpr::Response> const auto It = Response.header.find("Content-Type"); const std::string_view ContentType = It != Response.header.end() ? It->second : "<None>"sv; - const bool IsBinary = ContentType == "application/x-ue-cb"sv || ContentType == "application/x-ue-comp"sv || - ContentType == "application/octet-stream"; - - if (IsBinary) + if (ContentType == "application/x-ue-cb"sv) { + zen::IoBuffer Body(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + zen::CbObjectView Obj(Body.Data()); + zen::ExtendableStringBuilder<256> Sb; + std::string_view Json = Obj.ToJson(Sb).ToView(); + return fmt::format_to(Ctx.out(), - "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Reason: '{}'", + "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Response: '{}', Reason: '{}'", Response.url.str(), Response.status_code, Response.uploaded_bytes, Response.downloaded_bytes, Response.elapsed, + Json, Response.reason); } else diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index c59b2fedf..11f8a3171 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -86,12 +86,12 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke } ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key << ".raw"; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -136,6 +136,7 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -171,6 +172,7 @@ CloudCacheSession::GetBlob(const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -202,6 +204,7 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -233,6 +236,7 @@ CloudCacheSession::GetObject(const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -382,6 +386,7 @@ CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, con Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetBody(cpr::Body{}); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); @@ -545,6 +550,7 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index db1981ec7..00555f2ce 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -175,8 +175,10 @@ namespace detail { IoBuffer RecordValue, std::span<IoBuffer const> Payloads) override { + using namespace fmt::literals; + ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); - const uint32_t MaxAttempts = 3; + const int32_t MaxAttempts = 3; try { @@ -204,117 +206,112 @@ namespace detail { } else { - bool Success = false; int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) - { - Success = false; - for (uint32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) + const auto PutBlobs = [&](std::span<IoHash> PayloadIds, std::string& OutReason) -> bool { + for (const IoHash& PayloadId : PayloadIds) { - if (CloudCacheResult Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); - Result.Success) + const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId); + + if (It == std::end(CacheRecord.PayloadIds)) { - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - Success = true; - break; + OutReason = "payload '{}' MISSING from local cache"_format(PayloadId); + return false; } - } - if (!Success) - { - return {.Reason = "Failed to upload payload", - .Bytes = TotalBytes, - .ElapsedSeconds = TotalElapsedSeconds, - .Success = false}; + const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It); + + CloudCacheResult BlobResult; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) + { + BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + } + + if (!BlobResult.Success) + { + OutReason = "upload payload '{}' FAILED, reason '{}'"_format(PayloadId, BlobResult.Reason); + return false; + } + + TotalBytes += BlobResult.Bytes; + TotalElapsedSeconds += BlobResult.ElapsedSeconds; } + + return true; + }; + + PutRefResult RefResult; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) + { + RefResult = + Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject); } - Success = false; - for (uint32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) + if (!RefResult.Success) { - if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - RecordValue, - ZenContentType::kCbObject); - Result.Success) - { - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - Success = true; + return {.Reason = "upload cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + RefResult.Reason), + .Success = false}; + } - if (!Result.Needs.empty()) - { - for (const IoHash& NeededHash : Result.Needs) - { - Success = false; - - if (auto It = - std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash); - It != std::end(CacheRecord.PayloadIds)) - { - const size_t Idx = It - std::begin(CacheRecord.PayloadIds); - - if (CloudCacheResult BlobResult = - Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); - BlobResult.Success) - { - TotalBytes += BlobResult.Bytes; - TotalElapsedSeconds += BlobResult.ElapsedSeconds; - Success = true; - } - else - { - ZEN_WARN("upload missing payload '{}/{}/{}' FAILED", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - NeededHash); - } - } - else - { - ZEN_WARN("needed payload '{}/{}/{}' MISSING", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - NeededHash); - } - } + TotalBytes += RefResult.Bytes; + TotalElapsedSeconds += RefResult.ElapsedSeconds; - const IoHash RefHash = IoHash::HashBuffer(RecordValue); + std::string Reason; + if (!PutBlobs(RefResult.Needs, Reason)) + { + return {.Reason = std::move(Reason), .Success = false}; + } - if (FinalizeRefResult FinalizeResult = - Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); - FinalizeResult.Success) - { - TotalBytes += FinalizeResult.Bytes; - TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; - Success = true; - - for (const IoHash& MissingHash : FinalizeResult.Needs) - { - ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - MissingHash); - } - } - else - { - ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); - Success = false; - } - } + const IoHash RefHash = IoHash::HashBuffer(RecordValue); + FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + + if (!FinalizeResult.Success) + { + return {.Reason = "finalize cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + FinalizeResult.Reason), + .Success = false}; + } - if (Success) + if (!FinalizeResult.Needs.empty()) + { + if (!PutBlobs(FinalizeResult.Needs, Reason)) + { + return {.Reason = std::move(Reason), .Success = false}; + } + + FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + + if (!FinalizeResult.Success) + { + return {.Reason = "finalize '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + FinalizeResult.Reason), + .Success = false}; + } + + if (!FinalizeResult.Needs.empty()) + { + ExtendableStringBuilder<256> Sb; + for (const IoHash& MissingHash : FinalizeResult.Needs) { - break; + Sb << MissingHash.ToHexString() << ","; } + + return {.Reason = "finalize '{}/{}' FAILED, still needs payload(s) '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Sb.ToString()), + .Success = false}; } } - return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Success}; + TotalBytes += FinalizeResult.Bytes; + TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + + return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true}; } } catch (std::exception& Err) @@ -482,7 +479,7 @@ namespace detail { std::span<IoBuffer const> Payloads) override { ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); - const uint32_t MaxAttempts = 3; + const int32_t MaxAttempts = 3; try { @@ -890,6 +887,15 @@ private: { const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + + if (!Result.Success) + { + ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Endpoint->DisplayName(), + Result.Reason); + } } } } @@ -907,7 +913,10 @@ private: } catch (std::exception& e) { - ZEN_WARN("process upstream ({}/{}) FAILED '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, e.what()); + ZEN_WARN("upload cache record '{}/{}' FAILED, reason '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + e.what()); } } |