diff options
| author | Martin Ridgers <[email protected]> | 2021-10-01 11:50:43 +0200 |
|---|---|---|
| committer | Martin Ridgers <[email protected]> | 2021-10-01 11:50:43 +0200 |
| commit | a2dc648979bad70037e19ce75ae506f9455e8fd3 (patch) | |
| tree | 4d902270db982a24df05eecdf53d0535b601d582 | |
| parent | Removed `-fshort-wchar` compiler flag on Linux (diff) | |
| parent | Added upstream cache perf metrics. (diff) | |
| download | zen-a2dc648979bad70037e19ce75ae506f9455e8fd3.tar.xz zen-a2dc648979bad70037e19ce75ae506f9455e8fd3.zip | |
Merged main
27 files changed, 905 insertions, 121 deletions
diff --git a/zen/chunk/chunk.cpp b/zen/chunk/chunk.cpp index 18748e921..3283a8b66 100644 --- a/zen/chunk/chunk.cpp +++ b/zen/chunk/chunk.cpp @@ -1015,7 +1015,7 @@ ChunkCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } #endif - uint64_t ElapsedMs = timer.getElapsedTimeMs(); + uint64_t ElapsedMs = timer.GetElapsedTimeMs(); ReportSummary(Chunker, ElapsedMs); } @@ -1041,7 +1041,7 @@ ChunkCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } }); - uint64_t ElapsedMs = timer.getElapsedTimeMs(); + uint64_t ElapsedMs = timer.GetElapsedTimeMs(); ReportSummary(Chunker, ElapsedMs); } @@ -1142,7 +1142,7 @@ TEST_CASE("chunking") } double Avg = double(BoundarySum) / BoundaryCount; - const uint64_t ElapsedTimeMs = timer.getElapsedTimeMs(); + const uint64_t ElapsedTimeMs = timer.GetElapsedTimeMs(); ZEN_INFO("{:9} : Avg {:9} - {:2.5} ({:6}, {})", i, diff --git a/zen/cmds/copy.cpp b/zen/cmds/copy.cpp index 947d54e07..c9b40408e 100644 --- a/zen/cmds/copy.cpp +++ b/zen/cmds/copy.cpp @@ -91,7 +91,7 @@ CopyCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) CopyOptions.EnableClone = !m_NoClone; zen::CopyFile(FromPath, ToPath, CopyOptions); - ZEN_INFO("Copy completed in {}", zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); + ZEN_INFO("Copy completed in {}", zen::NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } return 0; diff --git a/zen/cmds/dedup.cpp b/zen/cmds/dedup.cpp index e71314622..089212ed9 100644 --- a/zen/cmds/dedup.cpp +++ b/zen/cmds/dedup.cpp @@ -118,7 +118,7 @@ DedupCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ZEN_INFO("Gathered {} candidates across {} size buckets. Elapsed: {}", CandidateCount, FileSizeMap.size(), - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); + zen::NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } ZEN_INFO("Sorting buckets by size"); @@ -288,7 +288,7 @@ DedupCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) Size.DirEntries->clear(); } - ZEN_INFO("Elapsed: {} Deduped: {}", zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), zen::NiceBytes(DupeBytes)); + ZEN_INFO("Elapsed: {} Deduped: {}", zen::NiceTimeSpanMs(Timer.GetElapsedTimeMs()), zen::NiceBytes(DupeBytes)); return 0; } diff --git a/zen/cmds/hash.cpp b/zen/cmds/hash.cpp index 0a7989ffc..7d234c2da 100644 --- a/zen/cmds/hash.cpp +++ b/zen/cmds/hash.cpp @@ -89,7 +89,7 @@ HashCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) TotalBytes.combine_each([&](size_t Total) { TotalByteCount += Total; }); - const uint64_t ElapsedMs = Timer.getElapsedTimeMs(); + const uint64_t ElapsedMs = Timer.GetElapsedTimeMs(); ZEN_INFO("Scanned {} files in {}", FileList.size(), zen::NiceTimeSpanMs(ElapsedMs)); ZEN_INFO("Total bytes {} ({})", zen::NiceBytes(TotalByteCount), zen::NiceByteRate(TotalByteCount, ElapsedMs)); diff --git a/zencore/compactbinary.cpp b/zencore/compactbinary.cpp index f71c0aaea..f3fbf312c 100644 --- a/zencore/compactbinary.cpp +++ b/zencore/compactbinary.cpp @@ -1474,10 +1474,10 @@ public: Builder << Accessor.AsIntegerNegative(); break; case CbFieldType::Float32: - Builder.Append("%.9g"_format(Accessor.AsFloat32())); + Builder.Append("{:.9g}"_format(Accessor.AsFloat32())); break; case CbFieldType::Float64: - Builder.Append("%.17g"_format(Accessor.AsFloat64())); + Builder.Append("{:.17g}"_format(Accessor.AsFloat64())); break; case CbFieldType::BoolFalse: Builder << "false"sv; @@ -1831,6 +1831,31 @@ TEST_CASE("uson.json") CHECK(ValueOne == "ValueOne"); CHECK(ValueTwo == "ValueTwo"); } + + SUBCASE("number") + { + const double ExpectedFloatValue = 21.21f; + const double ExpectedDoubleValue = 42.42; + + CbObjectWriter Writer; + Writer << "Float" << ExpectedFloatValue; + Writer << "Double" << ExpectedDoubleValue; + + CbObject Obj = Writer.Save(); + + StringBuilder<128> Sb; + const std::string_view JsonText = Obj.ToJson(Sb).ToView(); + + std::string JsonError; + json11::Json Json = json11::Json::parse(JsonText.data(), JsonError); + + const float FloatValue = float(Json["Float"].number_value()); + const double DoubleValue = Json["Double"].number_value(); + + CHECK(JsonError.empty()); + CHECK(FloatValue == doctest::Approx(ExpectedFloatValue)); + CHECK(DoubleValue == doctest::Approx(ExpectedDoubleValue)); + } } #endif diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp index 7a1f6c336..2d2603434 100644 --- a/zencore/filesystem.cpp +++ b/zencore/filesystem.cpp @@ -138,9 +138,9 @@ WipeDirectory(const wchar_t* DirPath) } } } while (FindNextFileW(hFind, &FindData) == TRUE); - } - FindClose(hFind); + FindClose(hFind); + } return true; } @@ -819,7 +819,7 @@ TEST_CASE("filesystem") using namespace std::filesystem; // GetExePath - path BinPath = GetRunningExecutablePath(); + path BinPath = GetRunningExecutablePath(); const bool ExpectedExe = BinPath.stem() == "zencore-test" || BinPath.stem() == "zenserver-test"; CHECK(ExpectedExe); CHECK(is_regular_file(BinPath)); diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h index 54801f9ac..36ecbd9a7 100644 --- a/zencore/include/zencore/iobuffer.h +++ b/zencore/include/zencore/iobuffer.h @@ -290,10 +290,6 @@ IoBufferCore::ExtendedCore() const class IoBuffer { public: - enum EAssumeOwnershipTag - { - AssumeOwnership - }; enum ECloneTag { Clone @@ -339,11 +335,6 @@ public: memcpy(const_cast<void*>(m_Core->DataPointer()), DataPtr, SizeBytes); } - inline IoBuffer(EAssumeOwnershipTag, const void* DataPtr, size_t Sz) : m_Core(new IoBufferCore(DataPtr, Sz)) - { - m_Core->SetIsOwnedByThis(true); - } - ZENCORE_API IoBuffer(EFileTag, void* FileHandle, uint64_t ChunkFileOffset, uint64_t ChunkSize); ZENCORE_API IoBuffer(EBorrowedFileTag, void* FileHandle, uint64_t ChunkFileOffset, uint64_t ChunkSize); diff --git a/zencore/include/zencore/refcount.h b/zencore/include/zencore/refcount.h index 0a1e15614..320718f5b 100644 --- a/zencore/include/zencore/refcount.h +++ b/zencore/include/zencore/refcount.h @@ -17,7 +17,7 @@ namespace zen { class RefCounted { public: - RefCounted() = default; + RefCounted() = default; virtual ~RefCounted() = default; inline uint32_t AddRef() const { return AtomicIncrement(const_cast<RefCounted*>(this)->m_RefCount); } diff --git a/zencore/include/zencore/stats.h b/zencore/include/zencore/stats.h index 0554f620d..dfa8dac34 100644 --- a/zencore/include/zencore/stats.h +++ b/zencore/include/zencore/stats.h @@ -2,11 +2,16 @@ #pragma once -#include <atomic> -#include <type_traits> #include "zencore.h" +#include <atomic> +#include <random> + namespace zen { +class CbObjectWriter; +} + +namespace zen::metrics { template<typename T> class Gauge @@ -76,18 +81,19 @@ public: Meter(); ~Meter(); - double Rate1(); // One-minute rate - double Rate5(); // Five-minute rate - double Rate15(); // Fifteen-minute rate - double MeanRate(); // Mean rate since instantiation of this meter - void Mark(uint64_t Count = 1); // Register one or more events + inline uint64_t Count() const { return m_TotalCount; } + double Rate1(); // One-minute rate + double Rate5(); // Five-minute rate + double Rate15(); // Fifteen-minute rate + double MeanRate() const; // Mean rate since instantiation of this meter + void Mark(uint64_t Count = 1); // Register one or more events private: std::atomic<uint64_t> m_TotalCount{0}; // Accumulator counting number of marks since beginning std::atomic<uint64_t> m_PendingCount{0}; // Pending EWMA update accumulator std::atomic<uint64_t> m_StartTick{0}; // Time this was instantiated (for mean) std::atomic<uint64_t> m_LastTick{0}; // Timestamp of last EWMA tick - std::atomic<int64_t> m_Remain{0}; // Tracks the "modulo" of tick time + std::atomic<int64_t> m_Remainder{0}; // Tracks the "modulo" of tick time bool m_IsFirstTick = true; RawEWMA m_RateM1; RawEWMA m_RateM5; @@ -97,6 +103,130 @@ private: void Tick(); }; +/** Moment-in-time snapshot of a distribution + */ +class SampleSnapshot +{ +public: + SampleSnapshot(std::vector<double>&& Values); + ~SampleSnapshot(); + + uint32_t Size() const { return (uint32_t)m_Values.size(); } + double GetQuantileValue(double Quantile); + double GetMedian() { return GetQuantileValue(0.5); } + double Get75Percentile() { return GetQuantileValue(0.75); } + double Get95Percentile() { return GetQuantileValue(0.95); } + double Get98Percentile() { return GetQuantileValue(0.98); } + double Get99Percentile() { return GetQuantileValue(0.99); } + double Get999Percentile() { return GetQuantileValue(0.999); } + const std::vector<double>& GetValues() const; + +private: + std::vector<double> m_Values; +}; + +/** Randomly selects samples from a stream. Uses Vitter's + Algorithm R to produce a statistically representative sample. + + http://www.cs.umd.edu/~samir/498/vitter.pdf - Random Sampling with a Reservoir + */ + +class UniformSample +{ +public: + UniformSample(uint32_t ReservoirSize); + ~UniformSample(); + + void Clear(); + uint32_t Size() const; + void Update(int64_t Value); + SampleSnapshot Snapshot() const; + + template<std::invocable<int64_t> T> + void IterateValues(T Callback) const + { + for (const auto& Value : m_Values) + { + Callback(Value); + } + } + +private: + std::atomic<uint64_t> m_SampleCounter{0}; + std::vector<std::atomic<int64_t>> m_Values; +}; + +/** Track (probabilistic) sample distribution along with min/max + */ +class Histogram +{ +public: + Histogram(int32_t SampleCount = 1028); + ~Histogram(); + + void Clear(); + void Update(int64_t Value); + int64_t Max() const; + int64_t Min() const; + double Mean() const; + uint64_t Count() const; + SampleSnapshot Snapshot() const { return m_Sample.Snapshot(); } + +private: + UniformSample m_Sample; + std::atomic<int64_t> m_Min{0}; + std::atomic<int64_t> m_Max{0}; + std::atomic<int64_t> m_Sum{0}; + std::atomic<int64_t> m_Count{0}; +}; + +/** Track timing and frequency of some operation + + Example usage would be to track frequency and duration of network + requests, or function calls. + + */ +class OperationTiming +{ +public: + OperationTiming(int32_t SampleCount = 514); + ~OperationTiming(); + + void Update(int64_t Duration); + int64_t Max() const; + int64_t Min() const; + double Mean() const; + uint64_t Count() const; + SampleSnapshot Snapshot() const { return m_Histogram.Snapshot(); } + + double Rate1() { return m_Meter.Rate1(); } + double Rate5() { return m_Meter.Rate5(); } + double Rate15() { return m_Meter.Rate15(); } + double MeanRate() const { return m_Meter.MeanRate(); } + + struct Scope + { + Scope(OperationTiming& Outer); + ~Scope(); + + private: + OperationTiming& m_Outer; + uint64_t m_StartTick; + }; + +private: + Meter m_Meter; + Histogram m_Histogram; +}; + +void EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo); +void EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo); +void EmitSnapshot(std::string_view Tag, Meter& Stat, CbObjectWriter& Cbo); + +} // namespace zen::metrics + +namespace zen { + extern void stats_forcelink(); } // namespace zen diff --git a/zencore/include/zencore/thread.h b/zencore/include/zencore/thread.h index 7889682cd..410ffbd1e 100644 --- a/zencore/include/zencore/thread.h +++ b/zencore/include/zencore/thread.h @@ -75,12 +75,12 @@ public: ZENCORE_API Event(); ZENCORE_API ~Event(); - Event(Event&& Rhs) : m_EventHandle(Rhs.m_EventHandle) { Rhs.m_EventHandle = nullptr; } + Event(Event&& Rhs) noexcept : m_EventHandle(Rhs.m_EventHandle) { Rhs.m_EventHandle = nullptr; } Event(const Event& Rhs) = delete; Event& operator=(const Event& Rhs) = delete; - inline Event& operator=(Event&& Rhs) + inline Event& operator=(Event&& Rhs) noexcept { std::swap(m_EventHandle, Rhs.m_EventHandle); return *this; @@ -133,14 +133,14 @@ public: ZENCORE_API ~ProcessHandle(); - ZENCORE_API void Initialize(int Pid); - ZENCORE_API void Initialize(void* ProcessHandle); /// Initialize with an existing handle - takes ownership of the handle - ZENCORE_API bool IsRunning() const; - ZENCORE_API bool IsValid() const; - ZENCORE_API bool Wait(int TimeoutMs = -1); - ZENCORE_API void Terminate(int ExitCode); - ZENCORE_API void Reset(); - inline int Pid() const { return m_Pid; } + ZENCORE_API void Initialize(int Pid); + ZENCORE_API void Initialize(void* ProcessHandle); /// Initialize with an existing handle - takes ownership of the handle + ZENCORE_API [[nodiscard]] bool IsRunning() const; + ZENCORE_API [[nodiscard]] bool IsValid() const; + ZENCORE_API bool Wait(int TimeoutMs = -1); + ZENCORE_API void Terminate(int ExitCode); + ZENCORE_API void Reset(); + inline [[nodiscard]] int Pid() const { return m_Pid; } private: void* m_ProcessHandle = nullptr; diff --git a/zencore/include/zencore/timer.h b/zencore/include/zencore/timer.h index eb284eaee..693b6daaa 100644 --- a/zencore/include/zencore/timer.h +++ b/zencore/include/zencore/timer.h @@ -18,30 +18,26 @@ namespace zen { ZENCORE_API uint64_t GetHifreqTimerValue(); ZENCORE_API uint64_t GetHifreqTimerFrequency(); +ZENCORE_API double GetHifreqTimerToSeconds(); ZENCORE_API uint64_t GetHifreqTimerFrequencySafe(); // May be used during static init class Stopwatch { public: - Stopwatch() : m_StartValue(GetHifreqTimerValue()) {} + inline Stopwatch() : m_StartValue(GetHifreqTimerValue()) {} - inline uint64_t getElapsedTimeMs() { return (GetHifreqTimerValue() - m_StartValue) * 1000 / GetHifreqTimerFrequency(); } + inline uint64_t GetElapsedTimeMs() const { return (GetHifreqTimerValue() - m_StartValue) * 1'000 / GetHifreqTimerFrequency(); } + inline uint64_t GetElapsedTimeUs() const { return (GetHifreqTimerValue() - m_StartValue) * 1'000'000 / GetHifreqTimerFrequency(); } + inline uint64_t GetElapsedTicks() const { return GetHifreqTimerValue() - m_StartValue; } + inline void Reset() { m_StartValue = GetHifreqTimerValue(); } - inline void reset() { m_StartValue = GetHifreqTimerValue(); } + static inline uint64_t GetElapsedTimeMs(uint64_t Ticks) { return Ticks * 1'000 / GetHifreqTimerFrequency(); } + static inline uint64_t GetElapsedTimeUs(uint64_t Ticks) { return Ticks * 1'000'000 / GetHifreqTimerFrequency(); } private: uint64_t m_StartValue; }; -// CPU timers - -inline uint64_t -GetCpuTimerValue() -{ - unsigned int foo; - return __rdtscp(&foo); -} - void timer_forcelink(); // internal } // namespace zen diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp index a730a316f..244425761 100644 --- a/zencore/iobuffer.cpp +++ b/zencore/iobuffer.cpp @@ -14,6 +14,10 @@ #include <memory.h> #include <system_error> +#if ZEN_USE_MIMALLOC +# include <mimalloc.h> +#endif + #if ZEN_PLATFORM_WINDOWS # include <atlfile.h> #else @@ -36,26 +40,41 @@ IoBufferCore::AllocateBuffer(size_t InSize, size_t Alignment) m_Flags |= kLowLevelAlloc; return VirtualAlloc(nullptr, InSize, MEM_COMMIT, PAGE_READWRITE); } - else #endif // ZEN_PLATFORM_WINDOWS - { - return Memory::Alloc(InSize, Alignment); - } + +#if ZEN_USE_MIMALLOC && 0 + void* Ptr = mi_aligned_alloc(Alignment, RoundUp(InSize, Alignment)); +#else + void* Ptr = Memory::Alloc(InSize, Alignment); +#endif + + ZEN_ASSERT(Ptr); + + return Ptr; } void IoBufferCore::FreeBuffer() { + if (!m_DataPtr) + { + return; + } + #if ZEN_PLATFORM_WINDOWS if (m_Flags & kLowLevelAlloc) { VirtualFree(const_cast<void*>(m_DataPtr), 0, MEM_DECOMMIT); + + return; } - else #endif // ZEN_PLATFORM_WINDOWS - { - return Memory::Free(const_cast<void*>(m_DataPtr)); - } + +#if ZEN_USE_MIMALLOC && 0 + return mi_free(const_cast<void*>(m_DataPtr)); +#else + return Memory::Free(const_cast<void*>(m_DataPtr)); +#endif } ////////////////////////////////////////////////////////////////////////// @@ -436,7 +455,7 @@ IoBufferBuilder::MakeFromTemporaryFile(const path_char_t* FileName) Handle = DataFile.Detach(); #else - int Fd = open(FileName, O_RDONLY); + int Fd = open(FileName, O_RDONLY); if (Fd < 0) { return {}; diff --git a/zencore/memory.cpp b/zencore/memory.cpp index 613b6ba67..da78ae3a8 100644 --- a/zencore/memory.cpp +++ b/zencore/memory.cpp @@ -6,6 +6,7 @@ #ifdef ZEN_PLATFORM_WINDOWS # include <malloc.h> +# include <mimalloc.h> #else # include <cstdlib> #endif @@ -18,8 +19,11 @@ static void* AlignedAllocImpl(size_t size, size_t alignment) { #if ZEN_PLATFORM_WINDOWS - // return _aligned_malloc(size, alignment); // MSVC alternative - return _mm_malloc(size, alignment); +# if ZEN_USE_MIMALLOC && 0 /* this path is not functional */ + return mi_aligned_alloc(alignment, size); +# else + return _aligned_malloc(size, alignment); +# endif #else // posix_memalign(&Ret, Alignment, Size); // Apple, AndroidApi<28 return std::aligned_alloc(alignment, size); @@ -33,8 +37,11 @@ AlignedFreeImpl(void* ptr) return; #if ZEN_PLATFORM_WINDOWS - // _aligned_free(ptr); MSVC alternative - _mm_free(ptr); +# if ZEN_USE_MIMALLOC && 0 /* this path is not functional */ + return mi_free(ptr); +# else + _aligned_free(ptr); +# endif #else // free(ptr) // Apple :) std::free(ptr); diff --git a/zencore/stats.cpp b/zencore/stats.cpp index 9ae2ddd28..34dc2828f 100644 --- a/zencore/stats.cpp +++ b/zencore/stats.cpp @@ -1,10 +1,15 @@ // Copyright Epic Games, Inc. All Rights Reserved. #include "zencore/stats.h" -#include <cmath> + +#include <zencore/compactbinarybuilder.h> +#include "zencore/intmath.h" #include "zencore/thread.h" #include "zencore/timer.h" +#include <cmath> +#include <gsl/gsl-lite.hpp> + #if ZEN_WITH_TESTS # include <zencore/testing.h> #endif @@ -13,7 +18,7 @@ // Derived from https://github.com/dln/medida/blob/master/src/medida/stats/ewma.cc // -namespace zen { +namespace zen::metrics { static constinit int kTickIntervalInSeconds = 5; static constinit double kSecondsPerMinute = 60.0; @@ -76,18 +81,18 @@ Meter::TickIfNecessary() if (m_LastTick.compare_exchange_strong(OldTick, NewTick)) { - m_Remain.fetch_add(Age); + m_Remainder.fetch_add(Age); do { - int64_t Remain = m_Remain.load(std::memory_order_relaxed); + int64_t Remain = m_Remainder.load(std::memory_order_relaxed); if (Remain < 0) { return; } - if (m_Remain.compare_exchange_strong(Remain, Remain - CountPerTick)) + if (m_Remainder.compare_exchange_strong(Remain, Remain - CountPerTick)) { Tick(); } @@ -137,7 +142,7 @@ Meter::Rate15() } double -Meter::MeanRate() +Meter::MeanRate() const { const uint64_t Count = m_TotalCount.load(std::memory_order_relaxed); @@ -162,9 +167,354 @@ Meter::Mark(uint64_t Count) ////////////////////////////////////////////////////////////////////////// +// TODO: should consider a cheaper RNG here, this will run for every thread +// that gets created + +thread_local std::mt19937_64 ThreadLocalRng; + +UniformSample::UniformSample(uint32_t ReservoirSize) : m_Values(ReservoirSize) +{ +} + +UniformSample::~UniformSample() +{ +} + +void +UniformSample::Clear() +{ + for (auto& Value : m_Values) + { + Value.store(0); + } + m_SampleCounter = 0; +} + +uint32_t +UniformSample::Size() const +{ + return gsl::narrow_cast<uint32_t>(Min(m_SampleCounter.load(), m_Values.size())); +} + +void +UniformSample::Update(int64_t Value) +{ + const uint64_t Count = m_SampleCounter++; + const uint64_t Size = m_Values.size(); + + if (Count < Size) + { + m_Values[Count] = Value; + } + else + { + // Randomly choose an old entry to potentially replace (the probability + // of replacing an entry diminishes with time) + + std::uniform_int_distribution<uint64_t> UniformDist(0, Count); + uint64_t SampleIndex = UniformDist(ThreadLocalRng); + + if (SampleIndex < Size) + { + m_Values[SampleIndex].store(Value, std::memory_order_release); + } + } +} + +SampleSnapshot +UniformSample::Snapshot() const +{ + uint64_t ValuesSize = Size(); + std::vector<double> Values(ValuesSize); + + for (int i = 0; i < ValuesSize; ++i) + { + Values[i] = double(m_Values[i]); + } + + return SampleSnapshot(std::move(Values)); +} + +////////////////////////////////////////////////////////////////////////// + +Histogram::Histogram(int32_t SampleCount) : m_Sample(SampleCount) +{ +} + +Histogram::~Histogram() +{ +} + +void +Histogram::Clear() +{ + m_Min = m_Max = m_Sum = m_Count = 0; + m_Sample.Clear(); +} + +void +Histogram::Update(int64_t Value) +{ + m_Sample.Update(Value); + + if (m_Count == 0) + { + m_Min = m_Max = Value; + } + else + { + int64_t CurrentMax = m_Max.load(std::memory_order_relaxed); + + while ((CurrentMax < Value) && !m_Max.compare_exchange_weak(CurrentMax, Value)) + { + } + + int64_t CurrentMin = m_Min.load(std::memory_order_relaxed); + + while ((CurrentMin > Value) && !m_Min.compare_exchange_weak(CurrentMin, Value)) + { + } + } + + m_Sum += Value; + ++m_Count; +} + +int64_t +Histogram::Max() const +{ + return m_Max.load(std::memory_order_relaxed); +} + +int64_t +Histogram::Min() const +{ + return m_Min.load(std::memory_order_relaxed); +} + +double +Histogram::Mean() const +{ + return double(m_Sum.load(std::memory_order_relaxed)) / m_Count; +} + +uint64_t +Histogram::Count() const +{ + return m_Count.load(std::memory_order_relaxed); +} + +////////////////////////////////////////////////////////////////////////// + +SampleSnapshot::SampleSnapshot(std::vector<double>&& Values) : m_Values(std::move(Values)) +{ + std::sort(begin(m_Values), end(m_Values)); +} + +SampleSnapshot::~SampleSnapshot() +{ +} + +double +SampleSnapshot::GetQuantileValue(double Quantile) +{ + ZEN_ASSERT((Quantile >= 0.0) && (Quantile <= 1.0)); + + if (m_Values.empty()) + { + return 0.0; + } + + const double Pos = Quantile * (m_Values.size() + 1); + + if (Pos < 1) + { + return m_Values.front(); + } + + if (Pos >= m_Values.size()) + { + return m_Values.back(); + } + + const int32_t Index = (int32_t)Pos; + const double Lower = m_Values[Index - 1]; + const double Upper = m_Values[Index]; + + // Lerp + return Lower + (Pos - std::floor(Pos)) * (Upper - Lower); +} + +const std::vector<double>& +SampleSnapshot::GetValues() const +{ + return m_Values; +} + +////////////////////////////////////////////////////////////////////////// + +OperationTiming::OperationTiming(int32_t SampleCount) : m_Histogram{SampleCount} +{ +} + +OperationTiming::~OperationTiming() +{ +} + +void +OperationTiming::Update(int64_t Duration) +{ + m_Meter.Mark(1); + m_Histogram.Update(Duration); +} + +int64_t +OperationTiming::Max() const +{ + return m_Histogram.Max(); +} + +int64_t +OperationTiming::Min() const +{ + return m_Histogram.Min(); +} + +double +OperationTiming::Mean() const +{ + return m_Histogram.Mean(); +} + +uint64_t +OperationTiming::Count() const +{ + return m_Meter.Count(); +} + +OperationTiming::Scope::Scope(OperationTiming& Outer) : m_Outer(Outer), m_StartTick(GetHifreqTimerValue()) +{ +} + +OperationTiming::Scope::~Scope() +{ + m_Outer.Update(GetHifreqTimerValue() - m_StartTick); +} + +////////////////////////////////////////////////////////////////////////// + +void +EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo) +{ + Cbo.BeginObject(Tag); + + SampleSnapshot Snap = Stat.Snapshot(); + + Cbo << "count" << Stat.Count(); + + Cbo << "rate_mean" << Stat.MeanRate(); + Cbo << "rate_1" << Stat.Rate1() << "rate_5" << Stat.Rate5() << "rate_15" << Stat.Rate15(); + + const double ToSeconds = GetHifreqTimerToSeconds(); + + Cbo << "t_avg" << Stat.Mean() * ToSeconds; + Cbo << "t_min" << Stat.Min() * ToSeconds << "t_max" << Stat.Max() * ToSeconds; + Cbo << "t_p75" << Snap.Get75Percentile() * ToSeconds << "t_p95" << Snap.Get95Percentile() * ToSeconds << "t_p99" + << Snap.Get99Percentile() * ToSeconds << "t_p999" << Snap.Get999Percentile() * ToSeconds; + + Cbo.EndObject(); +} + +void +EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo) +{ + Cbo.BeginObject(Tag); + + SampleSnapshot Snap = Stat.Snapshot(); + + Cbo << "count" << Stat.Count() << "avg" << Stat.Mean(); + Cbo << "min" << Stat.Min() << "max" << Stat.Max(); + Cbo << "p75" << Snap.Get75Percentile() << "p95" << Snap.Get95Percentile() << "p99" << Snap.Get99Percentile() << "p999" + << Snap.Get999Percentile(); + + Cbo.EndObject(); +} + +void +EmitSnapshot(std::string_view Tag, Meter& Stat, CbObjectWriter& Cbo) +{ + Cbo.BeginObject(Tag); + + Cbo << "count" << Stat.Count() << "rate_mean" << Stat.MeanRate(); + Cbo << "rate_1" << Stat.Rate1() << "rate_5" << Stat.Rate5() << "rate_15" << Stat.Rate15(); + + Cbo.EndObject(); +} + +////////////////////////////////////////////////////////////////////////// + #if ZEN_WITH_TESTS -TEST_CASE("EWMA") +TEST_CASE("Core.Stats.Histogram") +{ + Histogram Histo{258}; + + SampleSnapshot Snap1 = Histo.Snapshot(); + CHECK_EQ(Snap1.Size(), 0); + CHECK_EQ(Snap1.GetMedian(), 0); + + Histo.Update(1); + CHECK_EQ(Histo.Min(), 1); + CHECK_EQ(Histo.Max(), 1); + + SampleSnapshot Snap2 = Histo.Snapshot(); + CHECK_EQ(Snap2.Size(), 1); + + Histo.Update(2); + CHECK_EQ(Histo.Min(), 1); + CHECK_EQ(Histo.Max(), 2); + + SampleSnapshot Snap3 = Histo.Snapshot(); + CHECK_EQ(Snap3.Size(), 2); + + Histo.Update(-2); + CHECK_EQ(Histo.Min(), -2); + CHECK_EQ(Histo.Max(), 2); + CHECK_EQ(Histo.Mean(), 1 / 3.0); + + SampleSnapshot Snap4 = Histo.Snapshot(); + CHECK_EQ(Snap4.Size(), 3); + CHECK_EQ(Snap4.GetMedian(), 1); + CHECK_EQ(Snap4.Get999Percentile(), 2); + CHECK_EQ(Snap4.GetQuantileValue(0), -2); +} + +TEST_CASE("Core.Stats.UniformSample") +{ + UniformSample Sample1{100}; + + for (int i = 0; i < 100; ++i) + { + for (int j = 1; j <= 100; ++j) + { + Sample1.Update(j); + } + } + + int64_t Sum = 0; + int64_t Count = 0; + + Sample1.IterateValues([&](int64_t Value) { + ++Count; + Sum += Value; + }); + + double Average = double(Sum) / Count; + + CHECK(fabs(Average - 50) < 10); // What's the right test here? The result could vary massively and still be technically correct +} + +TEST_CASE("Core.Stats.EWMA") { SUBCASE("Simple_1") { @@ -262,6 +612,9 @@ TEST_CASE("Meter") [[maybe_unused]] double Rate = Meter1.MeanRate(); } # endif +} + +namespace zen { void stats_forcelink() diff --git a/zencore/timer.cpp b/zencore/timer.cpp index 88ec89cb7..5d30d9b29 100644 --- a/zencore/timer.cpp +++ b/zencore/timer.cpp @@ -42,7 +42,8 @@ InternalGetHifreqTimerFrequency() #endif } -static uint64_t QpcFreq = InternalGetHifreqTimerFrequency(); +uint64_t QpcFreq = InternalGetHifreqTimerFrequency(); +static const double QpcFactor = 1.0 / InternalGetHifreqTimerFrequency(); uint64_t GetHifreqTimerFrequency() @@ -50,6 +51,12 @@ GetHifreqTimerFrequency() return QpcFreq; } +double +GetHifreqTimerToSeconds() +{ + return QpcFactor; +} + uint64_t GetHifreqTimerFrequencySafe() { @@ -73,18 +80,6 @@ timer_forcelink() { } -TEST_CASE("Timer") -{ - uint64_t s0 = GetHifreqTimerValue(); - uint64_t t0 = GetCpuTimerValue(); - zen::Sleep(1000); - uint64_t s1 = GetHifreqTimerValue(); - uint64_t t1 = GetCpuTimerValue(); - // double r = double(t1 - t0) / (s1 - s0); - CHECK_NE(t0, t1); - CHECK_NE(s0, s1); -} - #endif } // namespace zen diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 6461b76c2..0e5e73ffc 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -645,8 +645,8 @@ main() ZEN_INFO("{} requests in {} ({})", RequestCount, - zen::NiceTimeSpanMs(timer.getElapsedTimeMs()), - zen::NiceRate(RequestCount, (uint32_t)timer.getElapsedTimeMs(), "req")); + zen::NiceTimeSpanMs(timer.GetElapsedTimeMs()), + zen::NiceRate(RequestCount, (uint32_t)timer.GetElapsedTimeMs(), "req")); return 0; } @@ -775,7 +775,7 @@ TEST_CASE("default.single") IssueTestRequests, IssueTestRequests); - uint64_t Elapsed = timer.getElapsedTimeMs(); + uint64_t Elapsed = timer.GetElapsedTimeMs(); ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); } @@ -826,7 +826,7 @@ TEST_CASE("multi.basic") [&] { IssueTestRequests(13337); }, [&] { IssueTestRequests(13338); }); - uint64_t Elapsed = timer.getElapsedTimeMs(); + uint64_t Elapsed = timer.GetElapsedTimeMs(); ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); } @@ -899,7 +899,7 @@ TEST_CASE("cas.basic") ++RequestCount; } - uint64_t Elapsed = timer.getElapsedTimeMs(); + uint64_t Elapsed = timer.GetElapsedTimeMs(); ZEN_INFO("{} requests in {} ({})", RequestCount, @@ -1074,7 +1074,7 @@ TEST_CASE("project.basic") SUBCASE("build store op commit") { ZEN_INFO("-------"); } } - const uint64_t Elapsed = timer.getElapsedTimeMs(); + const uint64_t Elapsed = timer.GetElapsedTimeMs(); ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); } diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 74cee6614..dc96aecae 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -6,6 +6,7 @@ #include <zencore/compress.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/scopeguard.h> #include <zencore/stream.h> #include <zencore/timer.h> #include <zenhttp/httpserver.h> @@ -193,10 +194,17 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { CacheRef Ref; + metrics::OperationTiming::Scope $(m_HttpRequests); + if (!ValidateKeyUri(Request, /* out */ Ref)) { std::string_view Key = Request.RelativeUri(); + if (Key.empty()) + { + return HandleStatusRequest(Request); + } + if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); })) { // Bucket reference @@ -262,11 +270,11 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, case kHead: case kGet: { - HandleGetCacheRecord(Request, Ref, Policy); if (Verb == kHead) { Request.SetSuppressResponseBody(); } + HandleGetCacheRecord(Request, Ref, Policy); } break; case kPut: @@ -829,4 +837,22 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& return true; } + +void +HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request) +{ + CbObjectWriter Cbo; + Cbo << "ok" << true; + + EmitSnapshot("requests", m_HttpRequests, Cbo); + if (m_UpstreamCache) + { + Cbo.BeginObject("upstream"); + m_UpstreamCache->GetStatus(Cbo); + Cbo.EndObject(); + } + + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + } // namespace zen diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 3fdaa1236..47fc173e9 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -2,6 +2,7 @@ #pragma once +#include <zencore/stats.h> #include <zenhttp/httpserver.h> #include <memory> @@ -78,6 +79,7 @@ private: void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); + void HandleStatusRequest(zen::HttpServerRequest& Request); spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; @@ -86,6 +88,7 @@ private: zen::CidStore& m_CidStore; std::unique_ptr<UpstreamCache> m_UpstreamCache; uint64_t m_LastScrubTime = 0; + metrics::OperationTiming m_HttpRequests; }; } // namespace zen diff --git a/zenserver/config.cpp b/zenserver/config.cpp index 42f59b26c..254032226 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -227,7 +227,7 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z "", "upstream-stats", "Collect performance metrics for upstream endpoints", - cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("false"), + cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("true"), ""); try diff --git a/zenserver/experimental/usnjournal.cpp b/zenserver/experimental/usnjournal.cpp index 1e765fbe5..d575e1779 100644 --- a/zenserver/experimental/usnjournal.cpp +++ b/zenserver/experimental/usnjournal.cpp @@ -259,7 +259,7 @@ UsnJournalReader::Initialize(std::filesystem::path VolumePath) } } - const auto ElapsedMs = Timer.getElapsedTimeMs(); + const auto ElapsedMs = Timer.GetElapsedTimeMs(); ZEN_INFO("MFT enumeration of {} completed after {} ({})", zen::NiceBytes(MftBytesProcessed), diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 1a9eb2c67..7870f9559 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -191,7 +191,7 @@ struct ProjectStore::OplogStorage : public RefCounted }); ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}", - NiceTimeSpanMs(Timer.getElapsedTimeMs()), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), m_MaxLsn, m_NextOpsOffset); } @@ -502,7 +502,7 @@ ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry } } - ZEN_DEBUG("added {} file(s) in {}", FileCount, NiceTimeSpanMs(Timer.getElapsedTimeMs())); + ZEN_DEBUG("added {} file(s) in {}", FileCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } for (CbFieldView& Entry : Core["meta"sv]) diff --git a/zenserver/testing/httptest.cpp b/zenserver/testing/httptest.cpp index 18d63a6ef..01866a63b 100644 --- a/zenserver/testing/httptest.cpp +++ b/zenserver/testing/httptest.cpp @@ -15,6 +15,23 @@ HttpTestingService::HttpTestingService() HttpVerb::kGet); m_Router.RegisterRoute( + "metrics", + [this](HttpRouterRequest& Req) { + metrics::OperationTiming::Scope _(m_TimingStats); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "get_metrics", + [this](HttpRouterRequest& Req) { + CbObjectWriter Cbo; + EmitSnapshot("requests", m_TimingStats, Cbo); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( "json", [this](HttpRouterRequest& Req) { CbObjectWriter Obj; diff --git a/zenserver/testing/httptest.h b/zenserver/testing/httptest.h index f55780d05..f7ea0c31c 100644 --- a/zenserver/testing/httptest.h +++ b/zenserver/testing/httptest.h @@ -3,6 +3,7 @@ #pragma once #include <zencore/logging.h> +#include <zencore/stats.h> #include <zenhttp/httpserver.h> #include <atomic> @@ -39,8 +40,9 @@ public: }; private: - HttpRequestRouter m_Router; - std::atomic<uint32_t> m_Counter{0}; + HttpRequestRouter m_Router; + std::atomic<uint32_t> m_Counter{0}; + metrics::OperationTiming m_TimingStats; RwLock m_RwLock; std::unordered_map<uint32_t, Ref<PackageHandler>> m_HandlerMap; diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index b93635e76..0397ddaa0 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -299,13 +299,16 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, return PutDerivedData(BucketId, Key.ToHexString(), DerivedData); } -CloudCacheResult +PutRefResult CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + PutRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; } IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); @@ -328,16 +331,102 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer if (Response.error) { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + PutRefResult Result; + Result.ErrorCode = static_cast<int32_t>(Response.error.code); + Result.Reason = std::move(Response.error.message); + return Result; } else if (!VerifyAccessToken(Response.status_code)) { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + PutRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; } - return {.Bytes = Response.uploaded_bytes, - .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; + PutRefResult Result; + Result.Success = (Response.status_code == 200 || Response.status_code == 201); + Result.Bytes = Response.uploaded_bytes; + Result.ElapsedSeconds = Response.elapsed; + + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.text, JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + + return Result; +} + +FinalizeRefResult +CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + FinalizeRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" + << Key.ToHexString() << "/finalize/" << RefHash.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, + {"X-Jupiter-IoHash", RefHash.ToHexString()}, + {"Content-Type", "application/x-ue-cb"}}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + FinalizeRefResult Result; + Result.ErrorCode = static_cast<int32_t>(Response.error.code); + Result.Reason = std::move(Response.error.message); + return Result; + } + else if (!VerifyAccessToken(Response.status_code)) + { + FinalizeRefResult Result; + Result.ErrorCode = 401; + Result.Reason = "Invalid access token"sv; + return Result; + } + + FinalizeRefResult Result; + Result.Success = (Response.status_code == 200 || Response.status_code == 201); + Result.Bytes = Response.uploaded_bytes; + Result.ElapsedSeconds = Response.elapsed; + + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.text, JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + + return Result; } CloudCacheResult diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index d8844279e..9573a1631 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -2,6 +2,7 @@ #pragma once +#include <zencore/iohash.h> #include <zencore/logging.h> #include <zencore/refcount.h> #include <zencore/thread.h> @@ -53,6 +54,16 @@ struct CloudCacheResult bool Success = false; }; +struct PutRefResult : CloudCacheResult +{ + std::vector<IoHash> Needs; +}; + +struct FinalizeRefResult : CloudCacheResult +{ + std::vector<IoHash> Needs; +}; + /** * Context for performing Jupiter operations * @@ -76,11 +87,13 @@ public: CloudCacheResult PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData); CloudCacheResult PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData); - CloudCacheResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); + PutRefResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); CloudCacheResult PutBlob(const IoHash& Key, IoBuffer Blob); CloudCacheResult PutCompressedBlob(const IoHash& Key, IoBuffer Blob); CloudCacheResult PutObject(const IoHash& Key, IoBuffer Object); + FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); + CloudCacheResult DerivedDataExists(std::string_view BucketId, std::string_view Key); CloudCacheResult DerivedDataExists(std::string_view BucketId, const IoHash& Key); CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 0dd16cd06..03054b542 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -9,6 +9,7 @@ #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/fmtutils.h> +#include <zencore/stats.h> #include <zencore/stream.h> #include <zencore/timer.h> @@ -45,6 +46,7 @@ namespace detail { { std::lock_guard Lock(m_Lock); m_Queue.emplace_back(std::move(Item)); + m_Size++; } m_NewItemSignal.notify_one(); @@ -64,6 +66,7 @@ namespace detail { { Item = std::move(m_Queue.front()); m_Queue.pop_front(); + m_Size--; return true; } @@ -80,7 +83,7 @@ namespace detail { } } - std::size_t Num() const + std::size_t Size() const { std::unique_lock Lock(m_Lock); return m_Queue.size(); @@ -91,12 +94,15 @@ namespace detail { std::condition_variable m_NewItemSignal; std::deque<T> m_Queue; std::atomic_bool m_CompleteAdding{false}; + std::atomic_uint32_t m_Size; }; class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: - JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc) + JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) + : m_Log(zen::logging::Get("upstream")) + , m_UseLegacyDdc(Options.UseLegacyDdc) { using namespace fmt::literals; m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl); @@ -277,16 +283,82 @@ namespace detail { Success = false; for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) { - if (CloudCacheResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - RecordValue, - ZenContentType::kCbObject); + if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + RecordValue, + ZenContentType::kCbObject); Result.Success) { TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; Success = true; - break; + + if (!Result.Needs.empty()) + { + for (const IoHash& NeededHash : Result.Needs) + { + Success = false; + + if (auto It = + std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash); + It != std::end(CacheRecord.PayloadIds)) + { + const size_t Idx = It - std::begin(CacheRecord.PayloadIds); + + if (CloudCacheResult BlobResult = + Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + BlobResult.Success) + { + TotalBytes += BlobResult.Bytes; + TotalElapsedSeconds += BlobResult.ElapsedSeconds; + Success = true; + } + else + { + ZEN_WARN("upload missing payload '{}/{}/{}' FAILED", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + NeededHash); + } + } + else + { + ZEN_WARN("needed payload '{}/{}/{}' MISSING", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + NeededHash); + } + } + + const IoHash RefHash = IoHash::HashBuffer(RecordValue); + + if (FinalizeRefResult FinalizeResult = + Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + FinalizeResult.Success) + { + TotalBytes += FinalizeResult.Bytes; + TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + Success = true; + + for (const IoHash& MissingHash : FinalizeResult.Needs) + { + ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + MissingHash); + } + } + else + { + ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); + Success = false; + } + } + + if (Success) + { + break; + } } } @@ -302,6 +374,9 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: + spdlog::logger& Log() { return m_Log; } + + spdlog::logger& m_Log; bool m_UseLegacyDdc; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; @@ -515,7 +590,12 @@ struct UpstreamStats } UpstreamEndpointStats& Stats = Endpoint.Stats(); - if (Result.Success) + + if (Result.Error) + { + Stats.ErrorCount++; + } + else if (Result.Success) { Stats.HitCount++; Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); @@ -549,6 +629,10 @@ struct UpstreamStats Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); Stats.SecondsUp.fetch_add(Result.ElapsedSeconds); } + else + { + Stats.ErrorCount++; + } if (m_SampleCount++ % MaxSampleCount) { @@ -575,13 +659,13 @@ struct UpstreamStats const uint64_t TotalCount = HitCount + MissCount; const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0; - Logger.info("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", - Ep->DisplayName(), - HitRate, - DownBytes, - DownSpeed, - UpBytes, - UpSpeed); + Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", + Ep->DisplayName(), + HitRate, + DownBytes, + DownSpeed, + UpBytes, + UpSpeed); } } @@ -700,6 +784,36 @@ public: return {}; } + virtual void GetStatus(CbObjectWriter& Status) override + { + Status << "reading" << m_Options.ReadUpstream; + Status << "writing" << m_Options.WriteUpstream; + Status << "worker_threads" << m_Options.ThreadCount; + Status << "queue_count" << m_UpstreamQueue.Size(); + + Status.BeginArray("endpoints"); + for (const auto& Ep : m_Endpoints) + { + Status.BeginObject(); + Status << "name" << Ep->DisplayName(); + Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); + + UpstreamEndpointStats& Stats = Ep->Stats(); + const uint64_t HitCount = Stats.HitCount; + const uint64_t MissCount = Stats.MissCount; + const uint64_t TotalCount = HitCount + MissCount; + const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0; + + Status << "hit_ratio" << HitRate; + Status << "downloaded_mb" << Stats.DownBytes; + Status << "uploaded_mb" << Stats.UpBytes; + Status << "error_count" << Stats.ErrorCount; + + Status.EndObject(); + } + Status.EndArray(); + } + private: void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) { diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 0e736480b..a6b1e9784 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -12,6 +12,7 @@ namespace zen { +class CbObjectWriter; class CidStore; class ZenCacheStore; struct CloudCacheClientOptions; @@ -86,6 +87,7 @@ struct UpstreamEndpointStats std::atomic_uint64_t HitCount = {}; std::atomic_uint64_t MissCount = {}; std::atomic_uint64_t UpCount = {}; + std::atomic_uint64_t ErrorCount = {}; std::atomic<double> UpBytes = {}; std::atomic<double> DownBytes = {}; std::atomic<double> SecondsUp = {}; @@ -139,6 +141,8 @@ public: }; virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; + + virtual void GetStatus(CbObjectWriter& CbO) = 0; }; std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore); |