aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-10-01 11:50:43 +0200
committerMartin Ridgers <[email protected]>2021-10-01 11:50:43 +0200
commita2dc648979bad70037e19ce75ae506f9455e8fd3 (patch)
tree4d902270db982a24df05eecdf53d0535b601d582
parentRemoved `-fshort-wchar` compiler flag on Linux (diff)
parentAdded upstream cache perf metrics. (diff)
downloadzen-a2dc648979bad70037e19ce75ae506f9455e8fd3.tar.xz
zen-a2dc648979bad70037e19ce75ae506f9455e8fd3.zip
Merged main
-rw-r--r--zen/chunk/chunk.cpp6
-rw-r--r--zen/cmds/copy.cpp2
-rw-r--r--zen/cmds/dedup.cpp4
-rw-r--r--zen/cmds/hash.cpp2
-rw-r--r--zencore/compactbinary.cpp29
-rw-r--r--zencore/filesystem.cpp6
-rw-r--r--zencore/include/zencore/iobuffer.h9
-rw-r--r--zencore/include/zencore/refcount.h2
-rw-r--r--zencore/include/zencore/stats.h146
-rw-r--r--zencore/include/zencore/thread.h20
-rw-r--r--zencore/include/zencore/timer.h20
-rw-r--r--zencore/iobuffer.cpp37
-rw-r--r--zencore/memory.cpp15
-rw-r--r--zencore/stats.cpp367
-rw-r--r--zencore/timer.cpp21
-rw-r--r--zenserver-test/zenserver-test.cpp12
-rw-r--r--zenserver/cache/structuredcache.cpp28
-rw-r--r--zenserver/cache/structuredcache.h3
-rw-r--r--zenserver/config.cpp2
-rw-r--r--zenserver/experimental/usnjournal.cpp2
-rw-r--r--zenserver/projectstore.cpp4
-rw-r--r--zenserver/testing/httptest.cpp17
-rw-r--r--zenserver/testing/httptest.h6
-rw-r--r--zenserver/upstream/jupiter.cpp103
-rw-r--r--zenserver/upstream/jupiter.h15
-rw-r--r--zenserver/upstream/upstreamcache.cpp144
-rw-r--r--zenserver/upstream/upstreamcache.h4
27 files changed, 905 insertions, 121 deletions
diff --git a/zen/chunk/chunk.cpp b/zen/chunk/chunk.cpp
index 18748e921..3283a8b66 100644
--- a/zen/chunk/chunk.cpp
+++ b/zen/chunk/chunk.cpp
@@ -1015,7 +1015,7 @@ ChunkCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
#endif
- uint64_t ElapsedMs = timer.getElapsedTimeMs();
+ uint64_t ElapsedMs = timer.GetElapsedTimeMs();
ReportSummary(Chunker, ElapsedMs);
}
@@ -1041,7 +1041,7 @@ ChunkCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
});
- uint64_t ElapsedMs = timer.getElapsedTimeMs();
+ uint64_t ElapsedMs = timer.GetElapsedTimeMs();
ReportSummary(Chunker, ElapsedMs);
}
@@ -1142,7 +1142,7 @@ TEST_CASE("chunking")
}
double Avg = double(BoundarySum) / BoundaryCount;
- const uint64_t ElapsedTimeMs = timer.getElapsedTimeMs();
+ const uint64_t ElapsedTimeMs = timer.GetElapsedTimeMs();
ZEN_INFO("{:9} : Avg {:9} - {:2.5} ({:6}, {})",
i,
diff --git a/zen/cmds/copy.cpp b/zen/cmds/copy.cpp
index 947d54e07..c9b40408e 100644
--- a/zen/cmds/copy.cpp
+++ b/zen/cmds/copy.cpp
@@ -91,7 +91,7 @@ CopyCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
CopyOptions.EnableClone = !m_NoClone;
zen::CopyFile(FromPath, ToPath, CopyOptions);
- ZEN_INFO("Copy completed in {}", zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
+ ZEN_INFO("Copy completed in {}", zen::NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
return 0;
diff --git a/zen/cmds/dedup.cpp b/zen/cmds/dedup.cpp
index e71314622..089212ed9 100644
--- a/zen/cmds/dedup.cpp
+++ b/zen/cmds/dedup.cpp
@@ -118,7 +118,7 @@ DedupCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
ZEN_INFO("Gathered {} candidates across {} size buckets. Elapsed: {}",
CandidateCount,
FileSizeMap.size(),
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
+ zen::NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
ZEN_INFO("Sorting buckets by size");
@@ -288,7 +288,7 @@ DedupCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
Size.DirEntries->clear();
}
- ZEN_INFO("Elapsed: {} Deduped: {}", zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), zen::NiceBytes(DupeBytes));
+ ZEN_INFO("Elapsed: {} Deduped: {}", zen::NiceTimeSpanMs(Timer.GetElapsedTimeMs()), zen::NiceBytes(DupeBytes));
return 0;
}
diff --git a/zen/cmds/hash.cpp b/zen/cmds/hash.cpp
index 0a7989ffc..7d234c2da 100644
--- a/zen/cmds/hash.cpp
+++ b/zen/cmds/hash.cpp
@@ -89,7 +89,7 @@ HashCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
TotalBytes.combine_each([&](size_t Total) { TotalByteCount += Total; });
- const uint64_t ElapsedMs = Timer.getElapsedTimeMs();
+ const uint64_t ElapsedMs = Timer.GetElapsedTimeMs();
ZEN_INFO("Scanned {} files in {}", FileList.size(), zen::NiceTimeSpanMs(ElapsedMs));
ZEN_INFO("Total bytes {} ({})", zen::NiceBytes(TotalByteCount), zen::NiceByteRate(TotalByteCount, ElapsedMs));
diff --git a/zencore/compactbinary.cpp b/zencore/compactbinary.cpp
index f71c0aaea..f3fbf312c 100644
--- a/zencore/compactbinary.cpp
+++ b/zencore/compactbinary.cpp
@@ -1474,10 +1474,10 @@ public:
Builder << Accessor.AsIntegerNegative();
break;
case CbFieldType::Float32:
- Builder.Append("%.9g"_format(Accessor.AsFloat32()));
+ Builder.Append("{:.9g}"_format(Accessor.AsFloat32()));
break;
case CbFieldType::Float64:
- Builder.Append("%.17g"_format(Accessor.AsFloat64()));
+ Builder.Append("{:.17g}"_format(Accessor.AsFloat64()));
break;
case CbFieldType::BoolFalse:
Builder << "false"sv;
@@ -1831,6 +1831,31 @@ TEST_CASE("uson.json")
CHECK(ValueOne == "ValueOne");
CHECK(ValueTwo == "ValueTwo");
}
+
+ SUBCASE("number")
+ {
+ const double ExpectedFloatValue = 21.21f;
+ const double ExpectedDoubleValue = 42.42;
+
+ CbObjectWriter Writer;
+ Writer << "Float" << ExpectedFloatValue;
+ Writer << "Double" << ExpectedDoubleValue;
+
+ CbObject Obj = Writer.Save();
+
+ StringBuilder<128> Sb;
+ const std::string_view JsonText = Obj.ToJson(Sb).ToView();
+
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(JsonText.data(), JsonError);
+
+ const float FloatValue = float(Json["Float"].number_value());
+ const double DoubleValue = Json["Double"].number_value();
+
+ CHECK(JsonError.empty());
+ CHECK(FloatValue == doctest::Approx(ExpectedFloatValue));
+ CHECK(DoubleValue == doctest::Approx(ExpectedDoubleValue));
+ }
}
#endif
diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp
index 7a1f6c336..2d2603434 100644
--- a/zencore/filesystem.cpp
+++ b/zencore/filesystem.cpp
@@ -138,9 +138,9 @@ WipeDirectory(const wchar_t* DirPath)
}
}
} while (FindNextFileW(hFind, &FindData) == TRUE);
- }
- FindClose(hFind);
+ FindClose(hFind);
+ }
return true;
}
@@ -819,7 +819,7 @@ TEST_CASE("filesystem")
using namespace std::filesystem;
// GetExePath
- path BinPath = GetRunningExecutablePath();
+ path BinPath = GetRunningExecutablePath();
const bool ExpectedExe = BinPath.stem() == "zencore-test" || BinPath.stem() == "zenserver-test";
CHECK(ExpectedExe);
CHECK(is_regular_file(BinPath));
diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h
index 54801f9ac..36ecbd9a7 100644
--- a/zencore/include/zencore/iobuffer.h
+++ b/zencore/include/zencore/iobuffer.h
@@ -290,10 +290,6 @@ IoBufferCore::ExtendedCore() const
class IoBuffer
{
public:
- enum EAssumeOwnershipTag
- {
- AssumeOwnership
- };
enum ECloneTag
{
Clone
@@ -339,11 +335,6 @@ public:
memcpy(const_cast<void*>(m_Core->DataPointer()), DataPtr, SizeBytes);
}
- inline IoBuffer(EAssumeOwnershipTag, const void* DataPtr, size_t Sz) : m_Core(new IoBufferCore(DataPtr, Sz))
- {
- m_Core->SetIsOwnedByThis(true);
- }
-
ZENCORE_API IoBuffer(EFileTag, void* FileHandle, uint64_t ChunkFileOffset, uint64_t ChunkSize);
ZENCORE_API IoBuffer(EBorrowedFileTag, void* FileHandle, uint64_t ChunkFileOffset, uint64_t ChunkSize);
diff --git a/zencore/include/zencore/refcount.h b/zencore/include/zencore/refcount.h
index 0a1e15614..320718f5b 100644
--- a/zencore/include/zencore/refcount.h
+++ b/zencore/include/zencore/refcount.h
@@ -17,7 +17,7 @@ namespace zen {
class RefCounted
{
public:
- RefCounted() = default;
+ RefCounted() = default;
virtual ~RefCounted() = default;
inline uint32_t AddRef() const { return AtomicIncrement(const_cast<RefCounted*>(this)->m_RefCount); }
diff --git a/zencore/include/zencore/stats.h b/zencore/include/zencore/stats.h
index 0554f620d..dfa8dac34 100644
--- a/zencore/include/zencore/stats.h
+++ b/zencore/include/zencore/stats.h
@@ -2,11 +2,16 @@
#pragma once
-#include <atomic>
-#include <type_traits>
#include "zencore.h"
+#include <atomic>
+#include <random>
+
namespace zen {
+class CbObjectWriter;
+}
+
+namespace zen::metrics {
template<typename T>
class Gauge
@@ -76,18 +81,19 @@ public:
Meter();
~Meter();
- double Rate1(); // One-minute rate
- double Rate5(); // Five-minute rate
- double Rate15(); // Fifteen-minute rate
- double MeanRate(); // Mean rate since instantiation of this meter
- void Mark(uint64_t Count = 1); // Register one or more events
+ inline uint64_t Count() const { return m_TotalCount; }
+ double Rate1(); // One-minute rate
+ double Rate5(); // Five-minute rate
+ double Rate15(); // Fifteen-minute rate
+ double MeanRate() const; // Mean rate since instantiation of this meter
+ void Mark(uint64_t Count = 1); // Register one or more events
private:
std::atomic<uint64_t> m_TotalCount{0}; // Accumulator counting number of marks since beginning
std::atomic<uint64_t> m_PendingCount{0}; // Pending EWMA update accumulator
std::atomic<uint64_t> m_StartTick{0}; // Time this was instantiated (for mean)
std::atomic<uint64_t> m_LastTick{0}; // Timestamp of last EWMA tick
- std::atomic<int64_t> m_Remain{0}; // Tracks the "modulo" of tick time
+ std::atomic<int64_t> m_Remainder{0}; // Tracks the "modulo" of tick time
bool m_IsFirstTick = true;
RawEWMA m_RateM1;
RawEWMA m_RateM5;
@@ -97,6 +103,130 @@ private:
void Tick();
};
+/** Moment-in-time snapshot of a distribution
+ */
+class SampleSnapshot
+{
+public:
+ SampleSnapshot(std::vector<double>&& Values);
+ ~SampleSnapshot();
+
+ uint32_t Size() const { return (uint32_t)m_Values.size(); }
+ double GetQuantileValue(double Quantile);
+ double GetMedian() { return GetQuantileValue(0.5); }
+ double Get75Percentile() { return GetQuantileValue(0.75); }
+ double Get95Percentile() { return GetQuantileValue(0.95); }
+ double Get98Percentile() { return GetQuantileValue(0.98); }
+ double Get99Percentile() { return GetQuantileValue(0.99); }
+ double Get999Percentile() { return GetQuantileValue(0.999); }
+ const std::vector<double>& GetValues() const;
+
+private:
+ std::vector<double> m_Values;
+};
+
+/** Randomly selects samples from a stream. Uses Vitter's
+ Algorithm R to produce a statistically representative sample.
+
+ http://www.cs.umd.edu/~samir/498/vitter.pdf - Random Sampling with a Reservoir
+ */
+
+class UniformSample
+{
+public:
+ UniformSample(uint32_t ReservoirSize);
+ ~UniformSample();
+
+ void Clear();
+ uint32_t Size() const;
+ void Update(int64_t Value);
+ SampleSnapshot Snapshot() const;
+
+ template<std::invocable<int64_t> T>
+ void IterateValues(T Callback) const
+ {
+ for (const auto& Value : m_Values)
+ {
+ Callback(Value);
+ }
+ }
+
+private:
+ std::atomic<uint64_t> m_SampleCounter{0};
+ std::vector<std::atomic<int64_t>> m_Values;
+};
+
+/** Track (probabilistic) sample distribution along with min/max
+ */
+class Histogram
+{
+public:
+ Histogram(int32_t SampleCount = 1028);
+ ~Histogram();
+
+ void Clear();
+ void Update(int64_t Value);
+ int64_t Max() const;
+ int64_t Min() const;
+ double Mean() const;
+ uint64_t Count() const;
+ SampleSnapshot Snapshot() const { return m_Sample.Snapshot(); }
+
+private:
+ UniformSample m_Sample;
+ std::atomic<int64_t> m_Min{0};
+ std::atomic<int64_t> m_Max{0};
+ std::atomic<int64_t> m_Sum{0};
+ std::atomic<int64_t> m_Count{0};
+};
+
+/** Track timing and frequency of some operation
+
+ Example usage would be to track frequency and duration of network
+ requests, or function calls.
+
+ */
+class OperationTiming
+{
+public:
+ OperationTiming(int32_t SampleCount = 514);
+ ~OperationTiming();
+
+ void Update(int64_t Duration);
+ int64_t Max() const;
+ int64_t Min() const;
+ double Mean() const;
+ uint64_t Count() const;
+ SampleSnapshot Snapshot() const { return m_Histogram.Snapshot(); }
+
+ double Rate1() { return m_Meter.Rate1(); }
+ double Rate5() { return m_Meter.Rate5(); }
+ double Rate15() { return m_Meter.Rate15(); }
+ double MeanRate() const { return m_Meter.MeanRate(); }
+
+ struct Scope
+ {
+ Scope(OperationTiming& Outer);
+ ~Scope();
+
+ private:
+ OperationTiming& m_Outer;
+ uint64_t m_StartTick;
+ };
+
+private:
+ Meter m_Meter;
+ Histogram m_Histogram;
+};
+
+void EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo);
+void EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo);
+void EmitSnapshot(std::string_view Tag, Meter& Stat, CbObjectWriter& Cbo);
+
+} // namespace zen::metrics
+
+namespace zen {
+
extern void stats_forcelink();
} // namespace zen
diff --git a/zencore/include/zencore/thread.h b/zencore/include/zencore/thread.h
index 7889682cd..410ffbd1e 100644
--- a/zencore/include/zencore/thread.h
+++ b/zencore/include/zencore/thread.h
@@ -75,12 +75,12 @@ public:
ZENCORE_API Event();
ZENCORE_API ~Event();
- Event(Event&& Rhs) : m_EventHandle(Rhs.m_EventHandle) { Rhs.m_EventHandle = nullptr; }
+ Event(Event&& Rhs) noexcept : m_EventHandle(Rhs.m_EventHandle) { Rhs.m_EventHandle = nullptr; }
Event(const Event& Rhs) = delete;
Event& operator=(const Event& Rhs) = delete;
- inline Event& operator=(Event&& Rhs)
+ inline Event& operator=(Event&& Rhs) noexcept
{
std::swap(m_EventHandle, Rhs.m_EventHandle);
return *this;
@@ -133,14 +133,14 @@ public:
ZENCORE_API ~ProcessHandle();
- ZENCORE_API void Initialize(int Pid);
- ZENCORE_API void Initialize(void* ProcessHandle); /// Initialize with an existing handle - takes ownership of the handle
- ZENCORE_API bool IsRunning() const;
- ZENCORE_API bool IsValid() const;
- ZENCORE_API bool Wait(int TimeoutMs = -1);
- ZENCORE_API void Terminate(int ExitCode);
- ZENCORE_API void Reset();
- inline int Pid() const { return m_Pid; }
+ ZENCORE_API void Initialize(int Pid);
+ ZENCORE_API void Initialize(void* ProcessHandle); /// Initialize with an existing handle - takes ownership of the handle
+ ZENCORE_API [[nodiscard]] bool IsRunning() const;
+ ZENCORE_API [[nodiscard]] bool IsValid() const;
+ ZENCORE_API bool Wait(int TimeoutMs = -1);
+ ZENCORE_API void Terminate(int ExitCode);
+ ZENCORE_API void Reset();
+ inline [[nodiscard]] int Pid() const { return m_Pid; }
private:
void* m_ProcessHandle = nullptr;
diff --git a/zencore/include/zencore/timer.h b/zencore/include/zencore/timer.h
index eb284eaee..693b6daaa 100644
--- a/zencore/include/zencore/timer.h
+++ b/zencore/include/zencore/timer.h
@@ -18,30 +18,26 @@ namespace zen {
ZENCORE_API uint64_t GetHifreqTimerValue();
ZENCORE_API uint64_t GetHifreqTimerFrequency();
+ZENCORE_API double GetHifreqTimerToSeconds();
ZENCORE_API uint64_t GetHifreqTimerFrequencySafe(); // May be used during static init
class Stopwatch
{
public:
- Stopwatch() : m_StartValue(GetHifreqTimerValue()) {}
+ inline Stopwatch() : m_StartValue(GetHifreqTimerValue()) {}
- inline uint64_t getElapsedTimeMs() { return (GetHifreqTimerValue() - m_StartValue) * 1000 / GetHifreqTimerFrequency(); }
+ inline uint64_t GetElapsedTimeMs() const { return (GetHifreqTimerValue() - m_StartValue) * 1'000 / GetHifreqTimerFrequency(); }
+ inline uint64_t GetElapsedTimeUs() const { return (GetHifreqTimerValue() - m_StartValue) * 1'000'000 / GetHifreqTimerFrequency(); }
+ inline uint64_t GetElapsedTicks() const { return GetHifreqTimerValue() - m_StartValue; }
+ inline void Reset() { m_StartValue = GetHifreqTimerValue(); }
- inline void reset() { m_StartValue = GetHifreqTimerValue(); }
+ static inline uint64_t GetElapsedTimeMs(uint64_t Ticks) { return Ticks * 1'000 / GetHifreqTimerFrequency(); }
+ static inline uint64_t GetElapsedTimeUs(uint64_t Ticks) { return Ticks * 1'000'000 / GetHifreqTimerFrequency(); }
private:
uint64_t m_StartValue;
};
-// CPU timers
-
-inline uint64_t
-GetCpuTimerValue()
-{
- unsigned int foo;
- return __rdtscp(&foo);
-}
-
void timer_forcelink(); // internal
} // namespace zen
diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp
index a730a316f..244425761 100644
--- a/zencore/iobuffer.cpp
+++ b/zencore/iobuffer.cpp
@@ -14,6 +14,10 @@
#include <memory.h>
#include <system_error>
+#if ZEN_USE_MIMALLOC
+# include <mimalloc.h>
+#endif
+
#if ZEN_PLATFORM_WINDOWS
# include <atlfile.h>
#else
@@ -36,26 +40,41 @@ IoBufferCore::AllocateBuffer(size_t InSize, size_t Alignment)
m_Flags |= kLowLevelAlloc;
return VirtualAlloc(nullptr, InSize, MEM_COMMIT, PAGE_READWRITE);
}
- else
#endif // ZEN_PLATFORM_WINDOWS
- {
- return Memory::Alloc(InSize, Alignment);
- }
+
+#if ZEN_USE_MIMALLOC && 0
+ void* Ptr = mi_aligned_alloc(Alignment, RoundUp(InSize, Alignment));
+#else
+ void* Ptr = Memory::Alloc(InSize, Alignment);
+#endif
+
+ ZEN_ASSERT(Ptr);
+
+ return Ptr;
}
void
IoBufferCore::FreeBuffer()
{
+ if (!m_DataPtr)
+ {
+ return;
+ }
+
#if ZEN_PLATFORM_WINDOWS
if (m_Flags & kLowLevelAlloc)
{
VirtualFree(const_cast<void*>(m_DataPtr), 0, MEM_DECOMMIT);
+
+ return;
}
- else
#endif // ZEN_PLATFORM_WINDOWS
- {
- return Memory::Free(const_cast<void*>(m_DataPtr));
- }
+
+#if ZEN_USE_MIMALLOC && 0
+ return mi_free(const_cast<void*>(m_DataPtr));
+#else
+ return Memory::Free(const_cast<void*>(m_DataPtr));
+#endif
}
//////////////////////////////////////////////////////////////////////////
@@ -436,7 +455,7 @@ IoBufferBuilder::MakeFromTemporaryFile(const path_char_t* FileName)
Handle = DataFile.Detach();
#else
- int Fd = open(FileName, O_RDONLY);
+ int Fd = open(FileName, O_RDONLY);
if (Fd < 0)
{
return {};
diff --git a/zencore/memory.cpp b/zencore/memory.cpp
index 613b6ba67..da78ae3a8 100644
--- a/zencore/memory.cpp
+++ b/zencore/memory.cpp
@@ -6,6 +6,7 @@
#ifdef ZEN_PLATFORM_WINDOWS
# include <malloc.h>
+# include <mimalloc.h>
#else
# include <cstdlib>
#endif
@@ -18,8 +19,11 @@ static void*
AlignedAllocImpl(size_t size, size_t alignment)
{
#if ZEN_PLATFORM_WINDOWS
- // return _aligned_malloc(size, alignment); // MSVC alternative
- return _mm_malloc(size, alignment);
+# if ZEN_USE_MIMALLOC && 0 /* this path is not functional */
+ return mi_aligned_alloc(alignment, size);
+# else
+ return _aligned_malloc(size, alignment);
+# endif
#else
// posix_memalign(&Ret, Alignment, Size); // Apple, AndroidApi<28
return std::aligned_alloc(alignment, size);
@@ -33,8 +37,11 @@ AlignedFreeImpl(void* ptr)
return;
#if ZEN_PLATFORM_WINDOWS
- // _aligned_free(ptr); MSVC alternative
- _mm_free(ptr);
+# if ZEN_USE_MIMALLOC && 0 /* this path is not functional */
+ return mi_free(ptr);
+# else
+ _aligned_free(ptr);
+# endif
#else
// free(ptr) // Apple :)
std::free(ptr);
diff --git a/zencore/stats.cpp b/zencore/stats.cpp
index 9ae2ddd28..34dc2828f 100644
--- a/zencore/stats.cpp
+++ b/zencore/stats.cpp
@@ -1,10 +1,15 @@
// Copyright Epic Games, Inc. All Rights Reserved.
#include "zencore/stats.h"
-#include <cmath>
+
+#include <zencore/compactbinarybuilder.h>
+#include "zencore/intmath.h"
#include "zencore/thread.h"
#include "zencore/timer.h"
+#include <cmath>
+#include <gsl/gsl-lite.hpp>
+
#if ZEN_WITH_TESTS
# include <zencore/testing.h>
#endif
@@ -13,7 +18,7 @@
// Derived from https://github.com/dln/medida/blob/master/src/medida/stats/ewma.cc
//
-namespace zen {
+namespace zen::metrics {
static constinit int kTickIntervalInSeconds = 5;
static constinit double kSecondsPerMinute = 60.0;
@@ -76,18 +81,18 @@ Meter::TickIfNecessary()
if (m_LastTick.compare_exchange_strong(OldTick, NewTick))
{
- m_Remain.fetch_add(Age);
+ m_Remainder.fetch_add(Age);
do
{
- int64_t Remain = m_Remain.load(std::memory_order_relaxed);
+ int64_t Remain = m_Remainder.load(std::memory_order_relaxed);
if (Remain < 0)
{
return;
}
- if (m_Remain.compare_exchange_strong(Remain, Remain - CountPerTick))
+ if (m_Remainder.compare_exchange_strong(Remain, Remain - CountPerTick))
{
Tick();
}
@@ -137,7 +142,7 @@ Meter::Rate15()
}
double
-Meter::MeanRate()
+Meter::MeanRate() const
{
const uint64_t Count = m_TotalCount.load(std::memory_order_relaxed);
@@ -162,9 +167,354 @@ Meter::Mark(uint64_t Count)
//////////////////////////////////////////////////////////////////////////
+// TODO: should consider a cheaper RNG here, this will run for every thread
+// that gets created
+
+thread_local std::mt19937_64 ThreadLocalRng;
+
+UniformSample::UniformSample(uint32_t ReservoirSize) : m_Values(ReservoirSize)
+{
+}
+
+UniformSample::~UniformSample()
+{
+}
+
+void
+UniformSample::Clear()
+{
+ for (auto& Value : m_Values)
+ {
+ Value.store(0);
+ }
+ m_SampleCounter = 0;
+}
+
+uint32_t
+UniformSample::Size() const
+{
+ return gsl::narrow_cast<uint32_t>(Min(m_SampleCounter.load(), m_Values.size()));
+}
+
+void
+UniformSample::Update(int64_t Value)
+{
+ const uint64_t Count = m_SampleCounter++;
+ const uint64_t Size = m_Values.size();
+
+ if (Count < Size)
+ {
+ m_Values[Count] = Value;
+ }
+ else
+ {
+ // Randomly choose an old entry to potentially replace (the probability
+ // of replacing an entry diminishes with time)
+
+ std::uniform_int_distribution<uint64_t> UniformDist(0, Count);
+ uint64_t SampleIndex = UniformDist(ThreadLocalRng);
+
+ if (SampleIndex < Size)
+ {
+ m_Values[SampleIndex].store(Value, std::memory_order_release);
+ }
+ }
+}
+
+SampleSnapshot
+UniformSample::Snapshot() const
+{
+ uint64_t ValuesSize = Size();
+ std::vector<double> Values(ValuesSize);
+
+ for (int i = 0; i < ValuesSize; ++i)
+ {
+ Values[i] = double(m_Values[i]);
+ }
+
+ return SampleSnapshot(std::move(Values));
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+Histogram::Histogram(int32_t SampleCount) : m_Sample(SampleCount)
+{
+}
+
+Histogram::~Histogram()
+{
+}
+
+void
+Histogram::Clear()
+{
+ m_Min = m_Max = m_Sum = m_Count = 0;
+ m_Sample.Clear();
+}
+
+void
+Histogram::Update(int64_t Value)
+{
+ m_Sample.Update(Value);
+
+ if (m_Count == 0)
+ {
+ m_Min = m_Max = Value;
+ }
+ else
+ {
+ int64_t CurrentMax = m_Max.load(std::memory_order_relaxed);
+
+ while ((CurrentMax < Value) && !m_Max.compare_exchange_weak(CurrentMax, Value))
+ {
+ }
+
+ int64_t CurrentMin = m_Min.load(std::memory_order_relaxed);
+
+ while ((CurrentMin > Value) && !m_Min.compare_exchange_weak(CurrentMin, Value))
+ {
+ }
+ }
+
+ m_Sum += Value;
+ ++m_Count;
+}
+
+int64_t
+Histogram::Max() const
+{
+ return m_Max.load(std::memory_order_relaxed);
+}
+
+int64_t
+Histogram::Min() const
+{
+ return m_Min.load(std::memory_order_relaxed);
+}
+
+double
+Histogram::Mean() const
+{
+ return double(m_Sum.load(std::memory_order_relaxed)) / m_Count;
+}
+
+uint64_t
+Histogram::Count() const
+{
+ return m_Count.load(std::memory_order_relaxed);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+SampleSnapshot::SampleSnapshot(std::vector<double>&& Values) : m_Values(std::move(Values))
+{
+ std::sort(begin(m_Values), end(m_Values));
+}
+
+SampleSnapshot::~SampleSnapshot()
+{
+}
+
+double
+SampleSnapshot::GetQuantileValue(double Quantile)
+{
+ ZEN_ASSERT((Quantile >= 0.0) && (Quantile <= 1.0));
+
+ if (m_Values.empty())
+ {
+ return 0.0;
+ }
+
+ const double Pos = Quantile * (m_Values.size() + 1);
+
+ if (Pos < 1)
+ {
+ return m_Values.front();
+ }
+
+ if (Pos >= m_Values.size())
+ {
+ return m_Values.back();
+ }
+
+ const int32_t Index = (int32_t)Pos;
+ const double Lower = m_Values[Index - 1];
+ const double Upper = m_Values[Index];
+
+ // Lerp
+ return Lower + (Pos - std::floor(Pos)) * (Upper - Lower);
+}
+
+const std::vector<double>&
+SampleSnapshot::GetValues() const
+{
+ return m_Values;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+OperationTiming::OperationTiming(int32_t SampleCount) : m_Histogram{SampleCount}
+{
+}
+
+OperationTiming::~OperationTiming()
+{
+}
+
+void
+OperationTiming::Update(int64_t Duration)
+{
+ m_Meter.Mark(1);
+ m_Histogram.Update(Duration);
+}
+
+int64_t
+OperationTiming::Max() const
+{
+ return m_Histogram.Max();
+}
+
+int64_t
+OperationTiming::Min() const
+{
+ return m_Histogram.Min();
+}
+
+double
+OperationTiming::Mean() const
+{
+ return m_Histogram.Mean();
+}
+
+uint64_t
+OperationTiming::Count() const
+{
+ return m_Meter.Count();
+}
+
+OperationTiming::Scope::Scope(OperationTiming& Outer) : m_Outer(Outer), m_StartTick(GetHifreqTimerValue())
+{
+}
+
+OperationTiming::Scope::~Scope()
+{
+ m_Outer.Update(GetHifreqTimerValue() - m_StartTick);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo)
+{
+ Cbo.BeginObject(Tag);
+
+ SampleSnapshot Snap = Stat.Snapshot();
+
+ Cbo << "count" << Stat.Count();
+
+ Cbo << "rate_mean" << Stat.MeanRate();
+ Cbo << "rate_1" << Stat.Rate1() << "rate_5" << Stat.Rate5() << "rate_15" << Stat.Rate15();
+
+ const double ToSeconds = GetHifreqTimerToSeconds();
+
+ Cbo << "t_avg" << Stat.Mean() * ToSeconds;
+ Cbo << "t_min" << Stat.Min() * ToSeconds << "t_max" << Stat.Max() * ToSeconds;
+ Cbo << "t_p75" << Snap.Get75Percentile() * ToSeconds << "t_p95" << Snap.Get95Percentile() * ToSeconds << "t_p99"
+ << Snap.Get99Percentile() * ToSeconds << "t_p999" << Snap.Get999Percentile() * ToSeconds;
+
+ Cbo.EndObject();
+}
+
+void
+EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo)
+{
+ Cbo.BeginObject(Tag);
+
+ SampleSnapshot Snap = Stat.Snapshot();
+
+ Cbo << "count" << Stat.Count() << "avg" << Stat.Mean();
+ Cbo << "min" << Stat.Min() << "max" << Stat.Max();
+ Cbo << "p75" << Snap.Get75Percentile() << "p95" << Snap.Get95Percentile() << "p99" << Snap.Get99Percentile() << "p999"
+ << Snap.Get999Percentile();
+
+ Cbo.EndObject();
+}
+
+void
+EmitSnapshot(std::string_view Tag, Meter& Stat, CbObjectWriter& Cbo)
+{
+ Cbo.BeginObject(Tag);
+
+ Cbo << "count" << Stat.Count() << "rate_mean" << Stat.MeanRate();
+ Cbo << "rate_1" << Stat.Rate1() << "rate_5" << Stat.Rate5() << "rate_15" << Stat.Rate15();
+
+ Cbo.EndObject();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
#if ZEN_WITH_TESTS
-TEST_CASE("EWMA")
+TEST_CASE("Core.Stats.Histogram")
+{
+ Histogram Histo{258};
+
+ SampleSnapshot Snap1 = Histo.Snapshot();
+ CHECK_EQ(Snap1.Size(), 0);
+ CHECK_EQ(Snap1.GetMedian(), 0);
+
+ Histo.Update(1);
+ CHECK_EQ(Histo.Min(), 1);
+ CHECK_EQ(Histo.Max(), 1);
+
+ SampleSnapshot Snap2 = Histo.Snapshot();
+ CHECK_EQ(Snap2.Size(), 1);
+
+ Histo.Update(2);
+ CHECK_EQ(Histo.Min(), 1);
+ CHECK_EQ(Histo.Max(), 2);
+
+ SampleSnapshot Snap3 = Histo.Snapshot();
+ CHECK_EQ(Snap3.Size(), 2);
+
+ Histo.Update(-2);
+ CHECK_EQ(Histo.Min(), -2);
+ CHECK_EQ(Histo.Max(), 2);
+ CHECK_EQ(Histo.Mean(), 1 / 3.0);
+
+ SampleSnapshot Snap4 = Histo.Snapshot();
+ CHECK_EQ(Snap4.Size(), 3);
+ CHECK_EQ(Snap4.GetMedian(), 1);
+ CHECK_EQ(Snap4.Get999Percentile(), 2);
+ CHECK_EQ(Snap4.GetQuantileValue(0), -2);
+}
+
+TEST_CASE("Core.Stats.UniformSample")
+{
+ UniformSample Sample1{100};
+
+ for (int i = 0; i < 100; ++i)
+ {
+ for (int j = 1; j <= 100; ++j)
+ {
+ Sample1.Update(j);
+ }
+ }
+
+ int64_t Sum = 0;
+ int64_t Count = 0;
+
+ Sample1.IterateValues([&](int64_t Value) {
+ ++Count;
+ Sum += Value;
+ });
+
+ double Average = double(Sum) / Count;
+
+ CHECK(fabs(Average - 50) < 10); // What's the right test here? The result could vary massively and still be technically correct
+}
+
+TEST_CASE("Core.Stats.EWMA")
{
SUBCASE("Simple_1")
{
@@ -262,6 +612,9 @@ TEST_CASE("Meter")
[[maybe_unused]] double Rate = Meter1.MeanRate();
}
# endif
+}
+
+namespace zen {
void
stats_forcelink()
diff --git a/zencore/timer.cpp b/zencore/timer.cpp
index 88ec89cb7..5d30d9b29 100644
--- a/zencore/timer.cpp
+++ b/zencore/timer.cpp
@@ -42,7 +42,8 @@ InternalGetHifreqTimerFrequency()
#endif
}
-static uint64_t QpcFreq = InternalGetHifreqTimerFrequency();
+uint64_t QpcFreq = InternalGetHifreqTimerFrequency();
+static const double QpcFactor = 1.0 / InternalGetHifreqTimerFrequency();
uint64_t
GetHifreqTimerFrequency()
@@ -50,6 +51,12 @@ GetHifreqTimerFrequency()
return QpcFreq;
}
+double
+GetHifreqTimerToSeconds()
+{
+ return QpcFactor;
+}
+
uint64_t
GetHifreqTimerFrequencySafe()
{
@@ -73,18 +80,6 @@ timer_forcelink()
{
}
-TEST_CASE("Timer")
-{
- uint64_t s0 = GetHifreqTimerValue();
- uint64_t t0 = GetCpuTimerValue();
- zen::Sleep(1000);
- uint64_t s1 = GetHifreqTimerValue();
- uint64_t t1 = GetCpuTimerValue();
- // double r = double(t1 - t0) / (s1 - s0);
- CHECK_NE(t0, t1);
- CHECK_NE(s0, s1);
-}
-
#endif
} // namespace zen
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 6461b76c2..0e5e73ffc 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -645,8 +645,8 @@ main()
ZEN_INFO("{} requests in {} ({})",
RequestCount,
- zen::NiceTimeSpanMs(timer.getElapsedTimeMs()),
- zen::NiceRate(RequestCount, (uint32_t)timer.getElapsedTimeMs(), "req"));
+ zen::NiceTimeSpanMs(timer.GetElapsedTimeMs()),
+ zen::NiceRate(RequestCount, (uint32_t)timer.GetElapsedTimeMs(), "req"));
return 0;
}
@@ -775,7 +775,7 @@ TEST_CASE("default.single")
IssueTestRequests,
IssueTestRequests);
- uint64_t Elapsed = timer.getElapsedTimeMs();
+ uint64_t Elapsed = timer.GetElapsedTimeMs();
ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req"));
}
@@ -826,7 +826,7 @@ TEST_CASE("multi.basic")
[&] { IssueTestRequests(13337); },
[&] { IssueTestRequests(13338); });
- uint64_t Elapsed = timer.getElapsedTimeMs();
+ uint64_t Elapsed = timer.GetElapsedTimeMs();
ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req"));
}
@@ -899,7 +899,7 @@ TEST_CASE("cas.basic")
++RequestCount;
}
- uint64_t Elapsed = timer.getElapsedTimeMs();
+ uint64_t Elapsed = timer.GetElapsedTimeMs();
ZEN_INFO("{} requests in {} ({})",
RequestCount,
@@ -1074,7 +1074,7 @@ TEST_CASE("project.basic")
SUBCASE("build store op commit") { ZEN_INFO("-------"); }
}
- const uint64_t Elapsed = timer.getElapsedTimeMs();
+ const uint64_t Elapsed = timer.GetElapsedTimeMs();
ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req"));
}
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 74cee6614..dc96aecae 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -6,6 +6,7 @@
#include <zencore/compress.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zenhttp/httpserver.h>
@@ -193,10 +194,17 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
{
CacheRef Ref;
+ metrics::OperationTiming::Scope $(m_HttpRequests);
+
if (!ValidateKeyUri(Request, /* out */ Ref))
{
std::string_view Key = Request.RelativeUri();
+ if (Key.empty())
+ {
+ return HandleStatusRequest(Request);
+ }
+
if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); }))
{
// Bucket reference
@@ -262,11 +270,11 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request,
case kHead:
case kGet:
{
- HandleGetCacheRecord(Request, Ref, Policy);
if (Verb == kHead)
{
Request.SetSuppressResponseBody();
}
+ HandleGetCacheRecord(Request, Ref, Policy);
}
break;
case kPut:
@@ -829,4 +837,22 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
return true;
}
+
+void
+HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request)
+{
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+
+ EmitSnapshot("requests", m_HttpRequests, Cbo);
+ if (m_UpstreamCache)
+ {
+ Cbo.BeginObject("upstream");
+ m_UpstreamCache->GetStatus(Cbo);
+ Cbo.EndObject();
+ }
+
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
} // namespace zen
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 3fdaa1236..47fc173e9 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -2,6 +2,7 @@
#pragma once
+#include <zencore/stats.h>
#include <zenhttp/httpserver.h>
#include <memory>
@@ -78,6 +79,7 @@ private:
void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
+ void HandleStatusRequest(zen::HttpServerRequest& Request);
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
@@ -86,6 +88,7 @@ private:
zen::CidStore& m_CidStore;
std::unique_ptr<UpstreamCache> m_UpstreamCache;
uint64_t m_LastScrubTime = 0;
+ metrics::OperationTiming m_HttpRequests;
};
} // namespace zen
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index 42f59b26c..254032226 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -227,7 +227,7 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
"",
"upstream-stats",
"Collect performance metrics for upstream endpoints",
- cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("false"),
+ cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("true"),
"");
try
diff --git a/zenserver/experimental/usnjournal.cpp b/zenserver/experimental/usnjournal.cpp
index 1e765fbe5..d575e1779 100644
--- a/zenserver/experimental/usnjournal.cpp
+++ b/zenserver/experimental/usnjournal.cpp
@@ -259,7 +259,7 @@ UsnJournalReader::Initialize(std::filesystem::path VolumePath)
}
}
- const auto ElapsedMs = Timer.getElapsedTimeMs();
+ const auto ElapsedMs = Timer.GetElapsedTimeMs();
ZEN_INFO("MFT enumeration of {} completed after {} ({})",
zen::NiceBytes(MftBytesProcessed),
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 1a9eb2c67..7870f9559 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -191,7 +191,7 @@ struct ProjectStore::OplogStorage : public RefCounted
});
ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}",
- NiceTimeSpanMs(Timer.getElapsedTimeMs()),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
m_MaxLsn,
m_NextOpsOffset);
}
@@ -502,7 +502,7 @@ ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry
}
}
- ZEN_DEBUG("added {} file(s) in {}", FileCount, NiceTimeSpanMs(Timer.getElapsedTimeMs()));
+ ZEN_DEBUG("added {} file(s) in {}", FileCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
for (CbFieldView& Entry : Core["meta"sv])
diff --git a/zenserver/testing/httptest.cpp b/zenserver/testing/httptest.cpp
index 18d63a6ef..01866a63b 100644
--- a/zenserver/testing/httptest.cpp
+++ b/zenserver/testing/httptest.cpp
@@ -15,6 +15,23 @@ HttpTestingService::HttpTestingService()
HttpVerb::kGet);
m_Router.RegisterRoute(
+ "metrics",
+ [this](HttpRouterRequest& Req) {
+ metrics::OperationTiming::Scope _(m_TimingStats);
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "get_metrics",
+ [this](HttpRouterRequest& Req) {
+ CbObjectWriter Cbo;
+ EmitSnapshot("requests", m_TimingStats, Cbo);
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
"json",
[this](HttpRouterRequest& Req) {
CbObjectWriter Obj;
diff --git a/zenserver/testing/httptest.h b/zenserver/testing/httptest.h
index f55780d05..f7ea0c31c 100644
--- a/zenserver/testing/httptest.h
+++ b/zenserver/testing/httptest.h
@@ -3,6 +3,7 @@
#pragma once
#include <zencore/logging.h>
+#include <zencore/stats.h>
#include <zenhttp/httpserver.h>
#include <atomic>
@@ -39,8 +40,9 @@ public:
};
private:
- HttpRequestRouter m_Router;
- std::atomic<uint32_t> m_Counter{0};
+ HttpRequestRouter m_Router;
+ std::atomic<uint32_t> m_Counter{0};
+ metrics::OperationTiming m_TimingStats;
RwLock m_RwLock;
std::unordered_map<uint32_t, Ref<PackageHandler>> m_HandlerMap;
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index b93635e76..0397ddaa0 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -299,13 +299,16 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key,
return PutDerivedData(BucketId, Key.ToHexString(), DerivedData);
}
-CloudCacheResult
+PutRefResult
CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
{
const CloudCacheAccessToken& AccessToken = GetAccessToken();
if (!AccessToken.IsValid())
{
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ PutRefResult Result;
+ Result.ErrorCode = 401;
+ Result.Reason = "Invalid access token"sv;
+ return Result;
}
IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
@@ -328,16 +331,102 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
if (Response.error)
{
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ PutRefResult Result;
+ Result.ErrorCode = static_cast<int32_t>(Response.error.code);
+ Result.Reason = std::move(Response.error.message);
+ return Result;
}
else if (!VerifyAccessToken(Response.status_code))
{
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ PutRefResult Result;
+ Result.ErrorCode = 401;
+ Result.Reason = "Invalid access token"sv;
+ return Result;
}
- return {.Bytes = Response.uploaded_bytes,
- .ElapsedSeconds = Response.elapsed,
- .Success = (Response.status_code == 200 || Response.status_code == 201)};
+ PutRefResult Result;
+ Result.Success = (Response.status_code == 200 || Response.status_code == 201);
+ Result.Bytes = Response.uploaded_bytes;
+ Result.ElapsedSeconds = Response.elapsed;
+
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.text, JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ }
+
+ return Result;
+}
+
+FinalizeRefResult
+CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHash)
+{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ FinalizeRefResult Result;
+ Result.ErrorCode = 401;
+ Result.Reason = "Invalid access token"sv;
+ return Result;
+ }
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
+ << Key.ToHexString() << "/finalize/" << RefHash.ToHexString();
+
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value},
+ {"X-Jupiter-IoHash", RefHash.ToHexString()},
+ {"Content-Type", "application/x-ue-cb"}});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ FinalizeRefResult Result;
+ Result.ErrorCode = static_cast<int32_t>(Response.error.code);
+ Result.Reason = std::move(Response.error.message);
+ return Result;
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ FinalizeRefResult Result;
+ Result.ErrorCode = 401;
+ Result.Reason = "Invalid access token"sv;
+ return Result;
+ }
+
+ FinalizeRefResult Result;
+ Result.Success = (Response.status_code == 200 || Response.status_code == 201);
+ Result.Bytes = Response.uploaded_bytes;
+ Result.ElapsedSeconds = Response.elapsed;
+
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.text, JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ }
+
+ return Result;
}
CloudCacheResult
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index d8844279e..9573a1631 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -2,6 +2,7 @@
#pragma once
+#include <zencore/iohash.h>
#include <zencore/logging.h>
#include <zencore/refcount.h>
#include <zencore/thread.h>
@@ -53,6 +54,16 @@ struct CloudCacheResult
bool Success = false;
};
+struct PutRefResult : CloudCacheResult
+{
+ std::vector<IoHash> Needs;
+};
+
+struct FinalizeRefResult : CloudCacheResult
+{
+ std::vector<IoHash> Needs;
+};
+
/**
* Context for performing Jupiter operations
*
@@ -76,11 +87,13 @@ public:
CloudCacheResult PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData);
CloudCacheResult PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData);
- CloudCacheResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
+ PutRefResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
CloudCacheResult PutBlob(const IoHash& Key, IoBuffer Blob);
CloudCacheResult PutCompressedBlob(const IoHash& Key, IoBuffer Blob);
CloudCacheResult PutObject(const IoHash& Key, IoBuffer Object);
+ FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
+
CloudCacheResult DerivedDataExists(std::string_view BucketId, std::string_view Key);
CloudCacheResult DerivedDataExists(std::string_view BucketId, const IoHash& Key);
CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key);
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 0dd16cd06..03054b542 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -9,6 +9,7 @@
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
+#include <zencore/stats.h>
#include <zencore/stream.h>
#include <zencore/timer.h>
@@ -45,6 +46,7 @@ namespace detail {
{
std::lock_guard Lock(m_Lock);
m_Queue.emplace_back(std::move(Item));
+ m_Size++;
}
m_NewItemSignal.notify_one();
@@ -64,6 +66,7 @@ namespace detail {
{
Item = std::move(m_Queue.front());
m_Queue.pop_front();
+ m_Size--;
return true;
}
@@ -80,7 +83,7 @@ namespace detail {
}
}
- std::size_t Num() const
+ std::size_t Size() const
{
std::unique_lock Lock(m_Lock);
return m_Queue.size();
@@ -91,12 +94,15 @@ namespace detail {
std::condition_variable m_NewItemSignal;
std::deque<T> m_Queue;
std::atomic_bool m_CompleteAdding{false};
+ std::atomic_uint32_t m_Size;
};
class JupiterUpstreamEndpoint final : public UpstreamEndpoint
{
public:
- JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc)
+ JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
+ : m_Log(zen::logging::Get("upstream"))
+ , m_UseLegacyDdc(Options.UseLegacyDdc)
{
using namespace fmt::literals;
m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl);
@@ -277,16 +283,82 @@ namespace detail {
Success = false;
for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
{
- if (CloudCacheResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- RecordValue,
- ZenContentType::kCbObject);
+ if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ RecordValue,
+ ZenContentType::kCbObject);
Result.Success)
{
TotalBytes += Result.Bytes;
TotalElapsedSeconds += Result.ElapsedSeconds;
Success = true;
- break;
+
+ if (!Result.Needs.empty())
+ {
+ for (const IoHash& NeededHash : Result.Needs)
+ {
+ Success = false;
+
+ if (auto It =
+ std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash);
+ It != std::end(CacheRecord.PayloadIds))
+ {
+ const size_t Idx = It - std::begin(CacheRecord.PayloadIds);
+
+ if (CloudCacheResult BlobResult =
+ Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ BlobResult.Success)
+ {
+ TotalBytes += BlobResult.Bytes;
+ TotalElapsedSeconds += BlobResult.ElapsedSeconds;
+ Success = true;
+ }
+ else
+ {
+ ZEN_WARN("upload missing payload '{}/{}/{}' FAILED",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ NeededHash);
+ }
+ }
+ else
+ {
+ ZEN_WARN("needed payload '{}/{}/{}' MISSING",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ NeededHash);
+ }
+ }
+
+ const IoHash RefHash = IoHash::HashBuffer(RecordValue);
+
+ if (FinalizeRefResult FinalizeResult =
+ Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
+ FinalizeResult.Success)
+ {
+ TotalBytes += FinalizeResult.Bytes;
+ TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+ Success = true;
+
+ for (const IoHash& MissingHash : FinalizeResult.Needs)
+ {
+ ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ MissingHash);
+ }
+ }
+ else
+ {
+ ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash);
+ Success = false;
+ }
+ }
+
+ if (Success)
+ {
+ break;
+ }
}
}
@@ -302,6 +374,9 @@ namespace detail {
virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
private:
+ spdlog::logger& Log() { return m_Log; }
+
+ spdlog::logger& m_Log;
bool m_UseLegacyDdc;
std::string m_DisplayName;
RefPtr<CloudCacheClient> m_Client;
@@ -515,7 +590,12 @@ struct UpstreamStats
}
UpstreamEndpointStats& Stats = Endpoint.Stats();
- if (Result.Success)
+
+ if (Result.Error)
+ {
+ Stats.ErrorCount++;
+ }
+ else if (Result.Success)
{
Stats.HitCount++;
Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
@@ -549,6 +629,10 @@ struct UpstreamStats
Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
Stats.SecondsUp.fetch_add(Result.ElapsedSeconds);
}
+ else
+ {
+ Stats.ErrorCount++;
+ }
if (m_SampleCount++ % MaxSampleCount)
{
@@ -575,13 +659,13 @@ struct UpstreamStats
const uint64_t TotalCount = HitCount + MissCount;
const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0;
- Logger.info("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
- Ep->DisplayName(),
- HitRate,
- DownBytes,
- DownSpeed,
- UpBytes,
- UpSpeed);
+ Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
+ Ep->DisplayName(),
+ HitRate,
+ DownBytes,
+ DownSpeed,
+ UpBytes,
+ UpSpeed);
}
}
@@ -700,6 +784,36 @@ public:
return {};
}
+ virtual void GetStatus(CbObjectWriter& Status) override
+ {
+ Status << "reading" << m_Options.ReadUpstream;
+ Status << "writing" << m_Options.WriteUpstream;
+ Status << "worker_threads" << m_Options.ThreadCount;
+ Status << "queue_count" << m_UpstreamQueue.Size();
+
+ Status.BeginArray("endpoints");
+ for (const auto& Ep : m_Endpoints)
+ {
+ Status.BeginObject();
+ Status << "name" << Ep->DisplayName();
+ Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv);
+
+ UpstreamEndpointStats& Stats = Ep->Stats();
+ const uint64_t HitCount = Stats.HitCount;
+ const uint64_t MissCount = Stats.MissCount;
+ const uint64_t TotalCount = HitCount + MissCount;
+ const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0;
+
+ Status << "hit_ratio" << HitRate;
+ Status << "downloaded_mb" << Stats.DownBytes;
+ Status << "uploaded_mb" << Stats.UpBytes;
+ Status << "error_count" << Stats.ErrorCount;
+
+ Status.EndObject();
+ }
+ Status.EndArray();
+ }
+
private:
void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
{
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 0e736480b..a6b1e9784 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -12,6 +12,7 @@
namespace zen {
+class CbObjectWriter;
class CidStore;
class ZenCacheStore;
struct CloudCacheClientOptions;
@@ -86,6 +87,7 @@ struct UpstreamEndpointStats
std::atomic_uint64_t HitCount = {};
std::atomic_uint64_t MissCount = {};
std::atomic_uint64_t UpCount = {};
+ std::atomic_uint64_t ErrorCount = {};
std::atomic<double> UpBytes = {};
std::atomic<double> DownBytes = {};
std::atomic<double> SecondsUp = {};
@@ -139,6 +141,8 @@ public:
};
virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
+
+ virtual void GetStatus(CbObjectWriter& CbO) = 0;
};
std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore);