diff options
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 28 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 3 | ||||
| -rw-r--r-- | zenserver/config.cpp | 2 | ||||
| -rw-r--r-- | zenserver/experimental/usnjournal.cpp | 2 | ||||
| -rw-r--r-- | zenserver/projectstore.cpp | 4 | ||||
| -rw-r--r-- | zenserver/testing/httptest.cpp | 17 | ||||
| -rw-r--r-- | zenserver/testing/httptest.h | 6 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 103 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 15 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 144 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 4 |
11 files changed, 298 insertions, 30 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 74cee6614..dc96aecae 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -6,6 +6,7 @@ #include <zencore/compress.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/scopeguard.h> #include <zencore/stream.h> #include <zencore/timer.h> #include <zenhttp/httpserver.h> @@ -193,10 +194,17 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { CacheRef Ref; + metrics::OperationTiming::Scope $(m_HttpRequests); + if (!ValidateKeyUri(Request, /* out */ Ref)) { std::string_view Key = Request.RelativeUri(); + if (Key.empty()) + { + return HandleStatusRequest(Request); + } + if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); })) { // Bucket reference @@ -262,11 +270,11 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, case kHead: case kGet: { - HandleGetCacheRecord(Request, Ref, Policy); if (Verb == kHead) { Request.SetSuppressResponseBody(); } + HandleGetCacheRecord(Request, Ref, Policy); } break; case kPut: @@ -829,4 +837,22 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& return true; } + +void +HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request) +{ + CbObjectWriter Cbo; + Cbo << "ok" << true; + + EmitSnapshot("requests", m_HttpRequests, Cbo); + if (m_UpstreamCache) + { + Cbo.BeginObject("upstream"); + m_UpstreamCache->GetStatus(Cbo); + Cbo.EndObject(); + } + + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + } // namespace zen diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 3fdaa1236..47fc173e9 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -2,6 +2,7 @@ #pragma once +#include <zencore/stats.h> #include <zenhttp/httpserver.h> #include <memory> @@ -78,6 +79,7 @@ private: void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); + void HandleStatusRequest(zen::HttpServerRequest& Request); spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; @@ -86,6 +88,7 @@ private: zen::CidStore& m_CidStore; std::unique_ptr<UpstreamCache> m_UpstreamCache; uint64_t m_LastScrubTime = 0; + metrics::OperationTiming m_HttpRequests; }; } // namespace zen diff --git a/zenserver/config.cpp b/zenserver/config.cpp index 42f59b26c..254032226 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -227,7 +227,7 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z "", "upstream-stats", "Collect performance metrics for upstream endpoints", - cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("false"), + cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("true"), ""); try diff --git a/zenserver/experimental/usnjournal.cpp b/zenserver/experimental/usnjournal.cpp index 1e765fbe5..d575e1779 100644 --- a/zenserver/experimental/usnjournal.cpp +++ b/zenserver/experimental/usnjournal.cpp @@ -259,7 +259,7 @@ UsnJournalReader::Initialize(std::filesystem::path VolumePath) } } - const auto ElapsedMs = Timer.getElapsedTimeMs(); + const auto ElapsedMs = Timer.GetElapsedTimeMs(); ZEN_INFO("MFT enumeration of {} completed after {} ({})", zen::NiceBytes(MftBytesProcessed), diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 1a9eb2c67..7870f9559 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -191,7 +191,7 @@ struct ProjectStore::OplogStorage : public RefCounted }); ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}", - NiceTimeSpanMs(Timer.getElapsedTimeMs()), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), m_MaxLsn, m_NextOpsOffset); } @@ -502,7 +502,7 @@ ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry } } - ZEN_DEBUG("added {} file(s) in {}", FileCount, NiceTimeSpanMs(Timer.getElapsedTimeMs())); + ZEN_DEBUG("added {} file(s) in {}", FileCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } for (CbFieldView& Entry : Core["meta"sv]) diff --git a/zenserver/testing/httptest.cpp b/zenserver/testing/httptest.cpp index 18d63a6ef..01866a63b 100644 --- a/zenserver/testing/httptest.cpp +++ b/zenserver/testing/httptest.cpp @@ -15,6 +15,23 @@ HttpTestingService::HttpTestingService() HttpVerb::kGet); m_Router.RegisterRoute( + "metrics", + [this](HttpRouterRequest& Req) { + metrics::OperationTiming::Scope _(m_TimingStats); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "get_metrics", + [this](HttpRouterRequest& Req) { + CbObjectWriter Cbo; + EmitSnapshot("requests", m_TimingStats, Cbo); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( "json", [this](HttpRouterRequest& Req) { CbObjectWriter Obj; diff --git a/zenserver/testing/httptest.h b/zenserver/testing/httptest.h index f55780d05..f7ea0c31c 100644 --- a/zenserver/testing/httptest.h +++ b/zenserver/testing/httptest.h @@ -3,6 +3,7 @@ #pragma once #include <zencore/logging.h> +#include <zencore/stats.h> #include <zenhttp/httpserver.h> #include <atomic> @@ -39,8 +40,9 @@ public: }; private: - HttpRequestRouter m_Router; - std::atomic<uint32_t> m_Counter{0}; + HttpRequestRouter m_Router; + std::atomic<uint32_t> m_Counter{0}; + metrics::OperationTiming m_TimingStats; RwLock m_RwLock; std::unordered_map<uint32_t, Ref<PackageHandler>> m_HandlerMap; diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index b93635e76..0397ddaa0 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -299,13 +299,16 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, return PutDerivedData(BucketId, Key.ToHexString(), DerivedData); } -CloudCacheResult +PutRefResult CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + PutRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; } IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); @@ -328,16 +331,102 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer if (Response.error) { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + PutRefResult Result; + Result.ErrorCode = static_cast<int32_t>(Response.error.code); + Result.Reason = std::move(Response.error.message); + return Result; } else if (!VerifyAccessToken(Response.status_code)) { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + PutRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; } - return {.Bytes = Response.uploaded_bytes, - .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; + PutRefResult Result; + Result.Success = (Response.status_code == 200 || Response.status_code == 201); + Result.Bytes = Response.uploaded_bytes; + Result.ElapsedSeconds = Response.elapsed; + + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.text, JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + + return Result; +} + +FinalizeRefResult +CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + FinalizeRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" + << Key.ToHexString() << "/finalize/" << RefHash.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, + {"X-Jupiter-IoHash", RefHash.ToHexString()}, + {"Content-Type", "application/x-ue-cb"}}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + FinalizeRefResult Result; + Result.ErrorCode = static_cast<int32_t>(Response.error.code); + Result.Reason = std::move(Response.error.message); + return Result; + } + else if (!VerifyAccessToken(Response.status_code)) + { + FinalizeRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; + } + + FinalizeRefResult Result; + Result.Success = (Response.status_code == 200 || Response.status_code == 201); + Result.Bytes = Response.uploaded_bytes; + Result.ElapsedSeconds = Response.elapsed; + + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.text, JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + + return Result; } CloudCacheResult diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index d8844279e..9573a1631 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -2,6 +2,7 @@ #pragma once +#include <zencore/iohash.h> #include <zencore/logging.h> #include <zencore/refcount.h> #include <zencore/thread.h> @@ -53,6 +54,16 @@ struct CloudCacheResult bool Success = false; }; +struct PutRefResult : CloudCacheResult +{ + std::vector<IoHash> Needs; +}; + +struct FinalizeRefResult : CloudCacheResult +{ + std::vector<IoHash> Needs; +}; + /** * Context for performing Jupiter operations * @@ -76,11 +87,13 @@ public: CloudCacheResult PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData); CloudCacheResult PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData); - CloudCacheResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); + PutRefResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); CloudCacheResult PutBlob(const IoHash& Key, IoBuffer Blob); CloudCacheResult PutCompressedBlob(const IoHash& Key, IoBuffer Blob); CloudCacheResult PutObject(const IoHash& Key, IoBuffer Object); + FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); + CloudCacheResult DerivedDataExists(std::string_view BucketId, std::string_view Key); CloudCacheResult DerivedDataExists(std::string_view BucketId, const IoHash& Key); CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 0dd16cd06..03054b542 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -9,6 +9,7 @@ #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/fmtutils.h> +#include <zencore/stats.h> #include <zencore/stream.h> #include <zencore/timer.h> @@ -45,6 +46,7 @@ namespace detail { { std::lock_guard Lock(m_Lock); m_Queue.emplace_back(std::move(Item)); + m_Size++; } m_NewItemSignal.notify_one(); @@ -64,6 +66,7 @@ namespace detail { { Item = std::move(m_Queue.front()); m_Queue.pop_front(); + m_Size--; return true; } @@ -80,7 +83,7 @@ namespace detail { } } - std::size_t Num() const + std::size_t Size() const { std::unique_lock Lock(m_Lock); return m_Queue.size(); @@ -91,12 +94,15 @@ namespace detail { std::condition_variable m_NewItemSignal; std::deque<T> m_Queue; std::atomic_bool m_CompleteAdding{false}; + std::atomic_uint32_t m_Size; }; class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: - JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc) + JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) + : m_Log(zen::logging::Get("upstream")) + , m_UseLegacyDdc(Options.UseLegacyDdc) { using namespace fmt::literals; m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl); @@ -277,16 +283,82 @@ namespace detail { Success = false; for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) { - if (CloudCacheResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - RecordValue, - ZenContentType::kCbObject); + if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + RecordValue, + ZenContentType::kCbObject); Result.Success) { TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; Success = true; - break; + + if (!Result.Needs.empty()) + { + for (const IoHash& NeededHash : Result.Needs) + { + Success = false; + + if (auto It = + std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash); + It != std::end(CacheRecord.PayloadIds)) + { + const size_t Idx = It - std::begin(CacheRecord.PayloadIds); + + if (CloudCacheResult BlobResult = + Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + BlobResult.Success) + { + TotalBytes += BlobResult.Bytes; + TotalElapsedSeconds += BlobResult.ElapsedSeconds; + Success = true; + } + else + { + ZEN_WARN("upload missing payload '{}/{}/{}' FAILED", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + NeededHash); + } + } + else + { + ZEN_WARN("needed payload '{}/{}/{}' MISSING", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + NeededHash); + } + } + + const IoHash RefHash = IoHash::HashBuffer(RecordValue); + + if (FinalizeRefResult FinalizeResult = + Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + FinalizeResult.Success) + { + TotalBytes += FinalizeResult.Bytes; + TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + Success = true; + + for (const IoHash& MissingHash : FinalizeResult.Needs) + { + ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + MissingHash); + } + } + else + { + ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); + Success = false; + } + } + + if (Success) + { + break; + } } } @@ -302,6 +374,9 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: + spdlog::logger& Log() { return m_Log; } + + spdlog::logger& m_Log; bool m_UseLegacyDdc; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; @@ -515,7 +590,12 @@ struct UpstreamStats } UpstreamEndpointStats& Stats = Endpoint.Stats(); - if (Result.Success) + + if (Result.Error) + { + Stats.ErrorCount++; + } + else if (Result.Success) { Stats.HitCount++; Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); @@ -549,6 +629,10 @@ struct UpstreamStats Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); Stats.SecondsUp.fetch_add(Result.ElapsedSeconds); } + else + { + Stats.ErrorCount++; + } if (m_SampleCount++ % MaxSampleCount) { @@ -575,13 +659,13 @@ struct UpstreamStats const uint64_t TotalCount = HitCount + MissCount; const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0; - Logger.info("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", - Ep->DisplayName(), - HitRate, - DownBytes, - DownSpeed, - UpBytes, - UpSpeed); + Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", + Ep->DisplayName(), + HitRate, + DownBytes, + DownSpeed, + UpBytes, + UpSpeed); } } @@ -700,6 +784,36 @@ public: return {}; } + virtual void GetStatus(CbObjectWriter& Status) override + { + Status << "reading" << m_Options.ReadUpstream; + Status << "writing" << m_Options.WriteUpstream; + Status << "worker_threads" << m_Options.ThreadCount; + Status << "queue_count" << m_UpstreamQueue.Size(); + + Status.BeginArray("endpoints"); + for (const auto& Ep : m_Endpoints) + { + Status.BeginObject(); + Status << "name" << Ep->DisplayName(); + Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); + + UpstreamEndpointStats& Stats = Ep->Stats(); + const uint64_t HitCount = Stats.HitCount; + const uint64_t MissCount = Stats.MissCount; + const uint64_t TotalCount = HitCount + MissCount; + const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0; + + Status << "hit_ratio" << HitRate; + Status << "downloaded_mb" << Stats.DownBytes; + Status << "uploaded_mb" << Stats.UpBytes; + Status << "error_count" << Stats.ErrorCount; + + Status.EndObject(); + } + Status.EndArray(); + } + private: void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) { diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 0e736480b..a6b1e9784 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -12,6 +12,7 @@ namespace zen { +class CbObjectWriter; class CidStore; class ZenCacheStore; struct CloudCacheClientOptions; @@ -86,6 +87,7 @@ struct UpstreamEndpointStats std::atomic_uint64_t HitCount = {}; std::atomic_uint64_t MissCount = {}; std::atomic_uint64_t UpCount = {}; + std::atomic_uint64_t ErrorCount = {}; std::atomic<double> UpBytes = {}; std::atomic<double> DownBytes = {}; std::atomic<double> SecondsUp = {}; @@ -139,6 +141,8 @@ public: }; virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; + + virtual void GetStatus(CbObjectWriter& CbO) = 0; }; std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore); |