diff options
| author | Per Larsson <[email protected]> | 2021-10-01 11:06:32 +0200 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-10-01 11:06:32 +0200 |
| commit | 0d3ce5ffecfd894b3a2b0421c562f2bb06fdc1c8 (patch) | |
| tree | acedad27ed8f61c3694395d18b13240cb401c25b | |
| parent | Clang format fix. (diff) | |
| download | zen-0d3ce5ffecfd894b3a2b0421c562f2bb06fdc1c8.tar.xz zen-0d3ce5ffecfd894b3a2b0421c562f2bb06fdc1c8.zip | |
Try upload missing payloads and finalize Jupiter cache ref.
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 103 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 15 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 114 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 1 |
4 files changed, 210 insertions, 23 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index b93635e76..0397ddaa0 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -299,13 +299,16 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, return PutDerivedData(BucketId, Key.ToHexString(), DerivedData); } -CloudCacheResult +PutRefResult CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + PutRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; } IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); @@ -328,16 +331,102 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer if (Response.error) { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + PutRefResult Result; + Result.ErrorCode = static_cast<int32_t>(Response.error.code); + Result.Reason = std::move(Response.error.message); + return Result; } else if (!VerifyAccessToken(Response.status_code)) { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + PutRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; } - return {.Bytes = Response.uploaded_bytes, - .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; + PutRefResult Result; + Result.Success = (Response.status_code == 200 || Response.status_code == 201); + Result.Bytes = Response.uploaded_bytes; + Result.ElapsedSeconds = Response.elapsed; + + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.text, JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + + return Result; +} + +FinalizeRefResult +CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + FinalizeRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" + << Key.ToHexString() << "/finalize/" << RefHash.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, + {"X-Jupiter-IoHash", RefHash.ToHexString()}, + {"Content-Type", "application/x-ue-cb"}}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + FinalizeRefResult Result; + Result.ErrorCode = static_cast<int32_t>(Response.error.code); + Result.Reason = std::move(Response.error.message); + return Result; + } + else if (!VerifyAccessToken(Response.status_code)) + { + FinalizeRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; + } + + FinalizeRefResult Result; + Result.Success = (Response.status_code == 200 || Response.status_code == 201); + Result.Bytes = Response.uploaded_bytes; + Result.ElapsedSeconds = Response.elapsed; + + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.text, JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + + return Result; } CloudCacheResult diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index d8844279e..9573a1631 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -2,6 +2,7 @@ #pragma once +#include <zencore/iohash.h> #include <zencore/logging.h> #include <zencore/refcount.h> #include <zencore/thread.h> @@ -53,6 +54,16 @@ struct CloudCacheResult bool Success = false; }; +struct PutRefResult : CloudCacheResult +{ + std::vector<IoHash> Needs; +}; + +struct FinalizeRefResult : CloudCacheResult +{ + std::vector<IoHash> Needs; +}; + /** * Context for performing Jupiter operations * @@ -76,11 +87,13 @@ public: CloudCacheResult PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData); CloudCacheResult PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData); - CloudCacheResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); + PutRefResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); CloudCacheResult PutBlob(const IoHash& Key, IoBuffer Blob); CloudCacheResult PutCompressedBlob(const IoHash& Key, IoBuffer Blob); CloudCacheResult PutObject(const IoHash& Key, IoBuffer Object); + FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); + CloudCacheResult DerivedDataExists(std::string_view BucketId, std::string_view Key); CloudCacheResult DerivedDataExists(std::string_view BucketId, const IoHash& Key); CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 0dd16cd06..0329569d8 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -9,6 +9,7 @@ #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/fmtutils.h> +#include <zencore/stats.h> #include <zencore/stream.h> #include <zencore/timer.h> @@ -45,6 +46,7 @@ namespace detail { { std::lock_guard Lock(m_Lock); m_Queue.emplace_back(std::move(Item)); + m_Size++; } m_NewItemSignal.notify_one(); @@ -64,6 +66,7 @@ namespace detail { { Item = std::move(m_Queue.front()); m_Queue.pop_front(); + m_Size--; return true; } @@ -80,7 +83,7 @@ namespace detail { } } - std::size_t Num() const + std::size_t Size() const { std::unique_lock Lock(m_Lock); return m_Queue.size(); @@ -91,12 +94,15 @@ namespace detail { std::condition_variable m_NewItemSignal; std::deque<T> m_Queue; std::atomic_bool m_CompleteAdding{false}; + std::atomic_uint32_t m_Size; }; class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: - JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc) + JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) + : m_Log(zen::logging::Get("upstream")) + , m_UseLegacyDdc(Options.UseLegacyDdc) { using namespace fmt::literals; m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl); @@ -277,16 +283,82 @@ namespace detail { Success = false; for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) { - if (CloudCacheResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - RecordValue, - ZenContentType::kCbObject); + if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + RecordValue, + ZenContentType::kCbObject); Result.Success) { TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; Success = true; - break; + + if (!Result.Needs.empty()) + { + for (const IoHash& NeededHash : Result.Needs) + { + Success = false; + + if (auto It = + std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash); + It != std::end(CacheRecord.PayloadIds)) + { + const size_t Idx = It - std::begin(CacheRecord.PayloadIds); + + if (CloudCacheResult BlobResult = + Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + BlobResult.Success) + { + TotalBytes += BlobResult.Bytes; + TotalElapsedSeconds += BlobResult.ElapsedSeconds; + Success = true; + } + else + { + ZEN_WARN("upload missing payload '{}/{}/{}' FAILED", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + NeededHash); + } + } + else + { + ZEN_WARN("needed payload '{}/{}/{}' MISSING", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + NeededHash); + } + } + + const IoHash RefHash = IoHash::HashBuffer(RecordValue); + + if (FinalizeRefResult FinalizeResult = + Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + FinalizeResult.Success) + { + TotalBytes += FinalizeResult.Bytes; + TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + Success = true; + + for (const IoHash& MissingHash : FinalizeResult.Needs) + { + ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + MissingHash); + } + } + else + { + ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); + Success = false; + } + } + + if (Success) + { + break; + } } } @@ -302,6 +374,9 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: + spdlog::logger& Log() { return m_Log; } + + spdlog::logger& m_Log; bool m_UseLegacyDdc; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; @@ -515,7 +590,12 @@ struct UpstreamStats } UpstreamEndpointStats& Stats = Endpoint.Stats(); - if (Result.Success) + + if (Result.Error) + { + Stats.ErrorCount++; + } + else if (Result.Success) { Stats.HitCount++; Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); @@ -549,6 +629,10 @@ struct UpstreamStats Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); Stats.SecondsUp.fetch_add(Result.ElapsedSeconds); } + else + { + Stats.ErrorCount++; + } if (m_SampleCount++ % MaxSampleCount) { @@ -575,13 +659,13 @@ struct UpstreamStats const uint64_t TotalCount = HitCount + MissCount; const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0; - Logger.info("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", - Ep->DisplayName(), - HitRate, - DownBytes, - DownSpeed, - UpBytes, - UpSpeed); + Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", + Ep->DisplayName(), + HitRate, + DownBytes, + DownSpeed, + UpBytes, + UpSpeed); } } diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 0e736480b..ce8f28db4 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -86,6 +86,7 @@ struct UpstreamEndpointStats std::atomic_uint64_t HitCount = {}; std::atomic_uint64_t MissCount = {}; std::atomic_uint64_t UpCount = {}; + std::atomic_uint64_t ErrorCount = {}; std::atomic<double> UpBytes = {}; std::atomic<double> DownBytes = {}; std::atomic<double> SecondsUp = {}; |