aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzousar <[email protected]>2022-01-27 07:51:15 -0700
committerzousar <[email protected]>2022-01-27 07:51:15 -0700
commit8c82202ff7796e08b2ffbe281a837bd4309efa99 (patch)
treed50e31505afa141d3dc612cda74298384c0323c3
parentMerge branch 'main' into non-elevated-asio (diff)
parentFixed ZenServerEntry::*ListenPort compile errors (diff)
downloadzen-8c82202ff7796e08b2ffbe281a837bd4309efa99.tar.xz
zen-8c82202ff7796e08b2ffbe281a837bd4309efa99.zip
Merge branch 'main' into non-elevated-asio
-rw-r--r--thirdparty/trace/trace.h343
-rw-r--r--zen/cmds/top.cpp4
-rw-r--r--zenserver/cache/structuredcache.cpp154
-rw-r--r--zenserver/upstream/upstreamcache.cpp27
-rw-r--r--zenutil/cache/cachepolicy.cpp20
-rw-r--r--zenutil/include/zenutil/cache/cachepolicy.h4
6 files changed, 439 insertions, 113 deletions
diff --git a/thirdparty/trace/trace.h b/thirdparty/trace/trace.h
index fb538b3af..caa862ffe 100644
--- a/thirdparty/trace/trace.h
+++ b/thirdparty/trace/trace.h
@@ -347,6 +347,7 @@ struct FInitializeDesc
};
typedef void* AllocFunc(SIZE_T, uint32);
typedef void FreeFunc(void*, SIZE_T);
+typedef void ChannelIterFunc(const ANSICHAR*, bool, void*);
struct FStatistics
{
uint64 BytesSent;
@@ -357,6 +358,7 @@ struct FStatistics
};
UE_TRACE_API void SetMemoryHooks(AllocFunc Alloc, FreeFunc Free) UE_TRACE_IMPL();
UE_TRACE_API void Initialize(const FInitializeDesc& Desc) UE_TRACE_IMPL();
+UE_TRACE_API void StartWorkerThread() UE_TRACE_IMPL();
UE_TRACE_API void Shutdown() UE_TRACE_IMPL();
UE_TRACE_API void Update() UE_TRACE_IMPL();
UE_TRACE_API void GetStatistics(FStatistics& Out) UE_TRACE_IMPL();
@@ -366,6 +368,7 @@ UE_TRACE_API bool IsTracing() UE_TRACE_IMPL(false);
UE_TRACE_API bool Stop() UE_TRACE_IMPL(false);
UE_TRACE_API bool IsChannel(const TCHAR* ChanneName) UE_TRACE_IMPL(false);
UE_TRACE_API bool ToggleChannel(const TCHAR* ChannelName, bool bEnabled) UE_TRACE_IMPL(false);
+UE_TRACE_API void EnumerateChannels(ChannelIterFunc IterFunc, void* User);
UE_TRACE_API void ThreadRegister(const TCHAR* Name, uint32 SystemId, int32 SortHint) UE_TRACE_IMPL();
UE_TRACE_API void ThreadGroupBegin(const TCHAR* Name) UE_TRACE_IMPL();
UE_TRACE_API void ThreadGroupEnd() UE_TRACE_IMPL();
@@ -388,6 +391,7 @@ UE_TRACE_API void ThreadGroupEnd() UE_TRACE_IMPL();
#if UE_TRACE_ENABLED
namespace UE {
namespace Trace {
+typedef void ChannelIterFunc(const ANSICHAR*, bool, void*);
/*
A named channel which can be used to filter trace events. Channels can be
combined using the '|' operator which allows expressions like
@@ -421,6 +425,7 @@ public:
static bool Toggle(const ANSICHAR* ChannelName, bool bEnabled);
static void ToggleAll(bool bEnabled);
static FChannel* FindChannel(const ANSICHAR* ChannelName);
+ static void EnumerateChannels(ChannelIterFunc Func, void* User);
bool Toggle(bool bEnabled);
bool IsEnabled() const;
explicit operator bool () const;
@@ -2043,7 +2048,7 @@ FChannel* FChannel::FindChannel(const ANSICHAR* ChannelName)
};
for (FChannel* Channel : ChannelLists)
{
- for (; Channel != nullptr; Channel = (FChannel*)(Channel->Next))
+ for (; Channel != nullptr; Channel = Channel->Next)
{
if (Channel->Name.Hash == ChannelNameHash)
{
@@ -2053,10 +2058,26 @@ FChannel* FChannel::FindChannel(const ANSICHAR* ChannelName)
}
return nullptr;
}
+void FChannel::EnumerateChannels(ChannelIterFunc Func, void* User)
+{
+ using namespace Private;
+ FChannel* ChannelLists[] =
+ {
+ AtomicLoadAcquire(&GNewChannelList),
+ AtomicLoadAcquire(&GHeadChannel),
+ };
+ for (FChannel* Channel : ChannelLists)
+ {
+ for (; Channel != nullptr; Channel = Channel->Next)
+ {
+ Func(Channel->Name.Ptr, Channel->IsEnabled(), User);
+ }
+ }
+}
bool FChannel::Toggle(bool bEnabled)
{
using namespace Private;
- AtomicAddRelaxed(&Enabled, bEnabled ? 1 : -1);
+ AtomicStoreRelaxed(&Enabled, bEnabled ? 1 : -1);
UE_TRACE_LOG(Trace, ChannelToggle, TraceLogChannel)
<< ChannelToggle.Id(Name.Hash)
<< ChannelToggle.IsEnabled(IsEnabled());
@@ -3055,6 +3076,7 @@ namespace Private
{
void Writer_MemorySetHooks(AllocFunc, FreeFunc);
void Writer_Initialize(const FInitializeDesc&);
+void Writer_WorkerCreate();
void Writer_Shutdown();
void Writer_Update();
bool Writer_SendTo(const ANSICHAR*, uint32);
@@ -3132,6 +3154,14 @@ bool ToggleChannel(const TCHAR* ChannelName, bool bEnabled)
ToAnsiCheap(ChannelNameA, ChannelName);
return FChannel::Toggle(ChannelNameA, bEnabled);
}
+void EnumerateChannels(ChannelIterFunc IterFunc, void* User)
+{
+ FChannel::EnumerateChannels(IterFunc, User);
+}
+void StartWorkerThread()
+{
+ Private::Writer_WorkerCreate();
+}
UE_TRACE_CHANNEL_EXTERN(TraceLogChannel)
UE_TRACE_EVENT_BEGIN($Trace, ThreadInfo, NoSync|Important)
UE_TRACE_EVENT_FIELD(uint32, ThreadId)
@@ -3533,14 +3563,18 @@ static bool Writer_UpdateConnection()
Writer_DescribeEvents();
Writer_CacheOnConnect();
Writer_TailOnConnect();
- Writer_SendSync();
GSyncPacketCountdown = GNumSyncPackets;
return true;
}
static UPTRINT GWorkerThread; // = 0;
static volatile bool GWorkerThreadQuit; // = false;
+static volatile unsigned int GUpdateInProgress = 1; // Don't allow updates until initalized
static void Writer_WorkerUpdate()
{
+ if (!AtomicCompareExchangeAcquire(&GUpdateInProgress, 1u, 0u))
+ {
+ return;
+ }
Writer_UpdateControl();
Writer_UpdateConnection();
Writer_DescribeAnnounce();
@@ -3554,6 +3588,7 @@ static void Writer_WorkerUpdate()
Writer_FlushSendBuffer();
}
#endif
+ AtomicExchangeRelease(&GUpdateInProgress, 0u);
}
static void Writer_WorkerThread()
{
@@ -3565,7 +3600,7 @@ static void Writer_WorkerThread()
ThreadSleep(SleepMs);
}
}
-static void Writer_WorkerCreate()
+void Writer_WorkerCreate()
{
if (GWorkerThread)
{
@@ -3664,6 +3699,7 @@ void Writer_Initialize(const FInitializeDesc& Desc)
{
Writer_WorkerCreate();
}
+ GUpdateInProgress = 0;
}
void Writer_Shutdown()
{
@@ -3709,7 +3745,7 @@ bool Writer_WriteTo(const ANSICHAR* Path)
}
bool Writer_IsTracing()
{
- return (GDataHandle != 0);
+ return GDataHandle != 0 || GPendingDataHandle != 0;
}
bool Writer_Stop()
{
@@ -4359,7 +4395,13 @@ UPTRINT TcpSocketConnect(const ANSICHAR* Host, uint16 Port)
{
struct FAddrInfoPtr
{
- ~FAddrInfoPtr() { freeaddrinfo(Value); }
+ ~FAddrInfoPtr()
+ {
+ if (Value != nullptr)
+ {
+ freeaddrinfo(Value);
+ }
+ }
addrinfo* operator -> () { return Value; }
addrinfo** operator & () { return &Value; }
addrinfo* Value;
@@ -4371,6 +4413,7 @@ UPTRINT TcpSocketConnect(const ANSICHAR* Host, uint16 Port)
Hints.ai_protocol = IPPROTO_TCP;
if (getaddrinfo(Host, nullptr, &Hints, &Info))
{
+ Info.Value = nullptr;
return 0;
}
if (&Info == nullptr)
@@ -4962,6 +5005,8 @@ void Writer_ShutdownSharedBuffers()
// Copyright Epic Games, Inc. All Rights Reserved.
+// {{{1 amalgamation-tail //////////////////////////////////////////////////////
+
#if PLATFORM_WINDOWS
# pragma warning(pop)
#endif
@@ -4978,6 +5023,10 @@ void Writer_ShutdownSharedBuffers()
#endif // TRACE_UE_COMPAT_LAYER
+
+
+// {{{1 library-setup //////////////////////////////////////////////////////////
+
#include <string_view>
#define TRACE_EVENT_DEFINE UE_TRACE_EVENT_DEFINE
@@ -4998,15 +5047,108 @@ namespace trace = UE::Trace;
#define TRACE_PRIVATE_CONCAT(x, y) TRACE_PRIVATE_CONCAT_(x, y)
#define TRACE_PRIVATE_UNIQUE_VAR(name) TRACE_PRIVATE_CONCAT($trace_##name, __LINE__)
+
+
+// {{{1 session-header /////////////////////////////////////////////////////////
+
+namespace UE {
+namespace Trace {
+
+enum class Build
+{
+ Unknown,
+ Debug,
+ DebugGame,
+ Development,
+ Shipping,
+ Test
+};
+
+void DescribeSession(
+ const std::string_view& AppName,
+ Build Variant=Build::Unknown,
+ const std::string_view& CommandLine="",
+ const std::string_view& BuildVersion="unknown_ver");
+
+} // namespace Trace
+} // namespace UE
+
+// {{{1 session-source /////////////////////////////////////////////////////////
+
+#if TRACE_IMPLEMENT
+
+namespace UE {
+namespace Trace {
+namespace Private {
+
+TRACE_EVENT_BEGIN(Diagnostics, Session2, NoSync|Important)
+ TRACE_EVENT_FIELD(uint8, ConfigurationType)
+ TRACE_EVENT_FIELD(trace::AnsiString, AppName)
+ TRACE_EVENT_FIELD(trace::AnsiString, BuildVersion)
+ TRACE_EVENT_FIELD(trace::AnsiString, Platform)
+ TRACE_EVENT_FIELD(trace::AnsiString, CommandLine)
+TRACE_EVENT_END()
+
+} // namespace Private
+
+////////////////////////////////////////////////////////////////////////////////
+void DescribeSession(
+ const std::string_view& AppName,
+ Build Variant,
+ const std::string_view& CommandLine,
+ const std::string_view& BuildVersion)
+{
+ using namespace Private;
+ using namespace std::literals;
+
+ std::string_view Platform;
+#if PLATFORM_WINDOWS
+ Platform = "Windows"sv;
+#elif PLATFORM_UNIX
+ Platform = "Linux"sv;
+#elif PLATFORM_MAC
+ Platform = "Mac"sv;
+#else
+ Platform = "Unknown"sv;
+#endif
+
+ int DataSize = 0;
+ DataSize += AppName.size();
+ DataSize += BuildVersion.size();
+ DataSize += Platform.size();
+ DataSize += CommandLine.size();
+
+ TRACE_LOG(Diagnostics, Session2, true, DataSize)
+ << Session2.AppName(AppName.data(), int(AppName.size()))
+ << Session2.BuildVersion(BuildVersion.data(), int(BuildVersion.size()))
+ << Session2.Platform(Platform.data(), int(Platform.size()))
+ << Session2.CommandLine(CommandLine.data(), int(CommandLine.size()))
+ << Session2.ConfigurationType(uint8(Variant));
+}
+
+} // namespace Trace
+} // namespace UE
+
+#endif // TRACE_IMPLEMENT
+
+
+
+// {{{1 cpu-header /////////////////////////////////////////////////////////////
+
TRACE_CHANNEL_EXTERN(CpuChannel)
namespace UE {
namespace Trace {
+enum CpuScopeFlags : int
+{
+ CpuFlush = 1 << 0,
+};
+
struct TraceCpuScope
{
~TraceCpuScope();
- void Enter(int ScopeId);
+ void Enter(int ScopeId, int Flags=0);
int _ScopeId = 0;
};
@@ -5015,54 +5157,44 @@ int ScopeNew(const std::string_view& Name);
} // namespace Trace
} // namespace UE
-#define TRACE_CPU_SCOPE(name) \
- using namespace std::literals; \
+#define TRACE_CPU_SCOPE(name, ...) \
trace::TraceCpuScope TRACE_PRIVATE_UNIQUE_VAR(cpu_scope); \
if (CpuChannel) { \
+ using namespace std::literals; \
static int TRACE_PRIVATE_UNIQUE_VAR(scope_id); \
if (0 == TRACE_PRIVATE_UNIQUE_VAR(scope_id)) \
TRACE_PRIVATE_UNIQUE_VAR(scope_id) = trace::ScopeNew(name##sv); \
- TRACE_PRIVATE_UNIQUE_VAR(cpu_scope).Enter(TRACE_PRIVATE_UNIQUE_VAR(scope_id)); \
+ TRACE_PRIVATE_UNIQUE_VAR(cpu_scope).Enter(TRACE_PRIVATE_UNIQUE_VAR(scope_id), ##__VA_ARGS__); \
} \
do {} while (0)
+// {{{1 cpu-source /////////////////////////////////////////////////////////////
+
#if TRACE_IMPLEMENT
-////////////////////////////////////////////////////////////////////////////////
TRACE_CHANNEL_DEFINE(CpuChannel)
+namespace UE {
+namespace Trace {
+namespace Private {
+
TRACE_EVENT_BEGIN(CpuProfiler, EventSpec, NoSync|Important)
TRACE_EVENT_FIELD(uint32, Id)
- TRACE_EVENT_FIELD(UE::Trace::AnsiString, Name)
+ TRACE_EVENT_FIELD(trace::AnsiString, Name)
TRACE_EVENT_END()
TRACE_EVENT_BEGIN(CpuProfiler, EventBatch, NoSync)
TRACE_EVENT_FIELD(uint8[], Data)
TRACE_EVENT_END()
-namespace UE {
-namespace Trace {
-namespace Private {
-
+////////////////////////////////////////////////////////////////////////////////
static int32_t encode32_7bit(int32_t value, void* __restrict out)
{
// Calculate the number of bytes
-#if 0
- int32_t msb_test = (value << sizeof(value)) | 0x10;
-#if _MSC_VER
- unsigned long bit_index;
- _BitScanReverse(&bit_index, msb_test);
-#else
- int32_t leading_zeros = __builtin_clz(msb_test);
- int32_t bit_index = ((sizeof(value) * 8) - 1) - leading_zeros;
-#endif
- int32_t length = (bit_index + 3) / 7;
-#else
int32_t length = 1;
length += (value >= (1 << 7));
length += (value >= (1 << 14));
length += (value >= (1 << 21));
-#endif
// Add a gap every eigth bit for the continuations
int32_t ret = value;
@@ -5079,20 +5211,10 @@ static int32_t encode32_7bit(int32_t value, void* __restrict out)
return length;
}
+////////////////////////////////////////////////////////////////////////////////
static int32_t encode64_7bit(int64_t value, void* __restrict out)
{
// Calculate the output length
-#if 0
- int64_t msb_test = (value << sizeof(value)) | 0x100ull;
-#if _MSC_VER
- unsigned long bit_index;
- _BitScanReverse64(&bit_index, msb_test);
-#else
- int32_t leading_zeros = __builtin_clzll(msb_test);
- int32_t bit_index = ((sizeof(value) * 8) - 1) - leading_zeros;
-#endif
- int32_t length = (bit_index - 1) / 7;
-#else
uint32_t length = 1;
length += (value >= (1ll << 7));
length += (value >= (1ll << 14));
@@ -5101,7 +5223,6 @@ static int32_t encode64_7bit(int64_t value, void* __restrict out)
length += (value >= (1ll << 35));
length += (value >= (1ll << 42));
length += (value >= (1ll << 49));
-#endif
// Add a gap every eigth bit for the continuations
int64_t ret = value;
@@ -5123,13 +5244,13 @@ static int32_t encode64_7bit(int64_t value, void* __restrict out)
class ThreadBuffer
{
public:
- static void Enter(uint64_t Timestamp, uint32_t ScopeId) { TlsInstance.EnterImpl(Timestamp, ScopeId); }
- static void Leave(uint64_t Timestamp) { TlsInstance.LeaveImpl(Timestamp); }
+ static void Enter(uint64_t Timestamp, uint32_t ScopeId, int Flags);
+ static void Leave(uint64_t Timestamp);
private:
~ThreadBuffer();
void Flush(bool Force);
- void EnterImpl(uint64_t Timestamp, uint32_t ScopeId);
+ void EnterImpl(uint64_t Timestamp, uint32_t ScopeId, int Flags);
void LeaveImpl(uint64_t Timestamp);
enum
{
@@ -5148,6 +5269,18 @@ private:
thread_local ThreadBuffer ThreadBuffer::TlsInstance;
////////////////////////////////////////////////////////////////////////////////
+inline void ThreadBuffer::Enter(uint64_t Timestamp, uint32_t ScopeId, int Flags)
+{
+ TlsInstance.EnterImpl(Timestamp, ScopeId, Flags);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+inline void ThreadBuffer::Leave(uint64_t Timestamp)
+{
+ TlsInstance.LeaveImpl(Timestamp);
+}
+
+////////////////////////////////////////////////////////////////////////////////
ThreadBuffer::~ThreadBuffer()
{
Flush(true);
@@ -5156,6 +5289,8 @@ ThreadBuffer::~ThreadBuffer()
////////////////////////////////////////////////////////////////////////////////
void ThreadBuffer::Flush(bool Force)
{
+ using namespace Private;
+
if (!Force && (Cursor <= (Buffer + BufferSize - Overflow)))
return;
@@ -5167,14 +5302,15 @@ void ThreadBuffer::Flush(bool Force)
}
////////////////////////////////////////////////////////////////////////////////
-void ThreadBuffer::EnterImpl(uint64_t Timestamp, uint32_t ScopeId)
+void ThreadBuffer::EnterImpl(uint64_t Timestamp, uint32_t ScopeId, int Flags)
{
Timestamp -= PrevTimestamp;
PrevTimestamp += Timestamp;
Cursor += encode64_7bit((Timestamp) << 1 | EnterLsb, Cursor);
Cursor += encode32_7bit(ScopeId, Cursor);
- Flush(false);
+ bool ShouldFlush = (Flags & CpuScopeFlags::CpuFlush);
+ Flush(ShouldFlush);
}
////////////////////////////////////////////////////////////////////////////////
@@ -5194,8 +5330,10 @@ void ThreadBuffer::LeaveImpl(uint64_t Timestamp)
////////////////////////////////////////////////////////////////////////////////
int ScopeNew(const std::string_view& Name)
{
+ using namespace Private;
+
static int volatile NextSpecId = 1;
- int SpecId = Private::AtomicAddRelaxed(&NextSpecId, 1);
+ int SpecId = AtomicAddRelaxed(&NextSpecId, 1);
uint32 NameSize = uint32(Name.size());
TRACE_LOG(CpuProfiler, EventSpec, true, NameSize)
@@ -5220,15 +5358,124 @@ TraceCpuScope::~TraceCpuScope()
}
////////////////////////////////////////////////////////////////////////////////
-void TraceCpuScope::Enter(int ScopeId)
+void TraceCpuScope::Enter(int ScopeId, int Flags)
{
using namespace Private;
_ScopeId = ScopeId;
uint64 Timestamp = TimeGetTimestamp();
- ThreadBuffer::Enter(Timestamp, ScopeId);
+ ThreadBuffer::Enter(Timestamp, ScopeId, Flags);
+}
+
+} // namespace Trace
+} // namespace UE
+
+#endif // TRACE_IMPLEMENT
+
+
+
+// {{{1 log-header /////////////////////////////////////////////////////////////
+
+TRACE_CHANNEL_EXTERN(LogChannel)
+
+namespace UE {
+namespace Trace {
+namespace Private {
+
+void LogMessageImpl(int Id, const void* ParamBuffer, int ParamSize);
+int LogMessageNew(const std::string_view& Format, const std::string_view& File, int Line);
+
+template <typename... ARGS>
+void LogMessage(int Id, ARGS&&... Args)
+{
+ LogMessageImpl(Id, nullptr, 0);
+}
+
+} // namespace Private
+} // namespace Trace
+} // namespace UE
+
+#define TRACE_LOG_MESSAGE(format, ...) \
+ if (LogChannel) { \
+ using namespace std::literals; \
+ static int message_id; \
+ if (message_id == 0) \
+ message_id = trace::Private::LogMessageNew( \
+ format##sv, \
+ TRACE_PRIVATE_CONCAT(__FILE__, sv), \
+ __LINE__); \
+ trace::Private::LogMessage(message_id, ##__VA_ARGS__); \
+ } \
+ do {} while (0)
+
+// {{{1 log-source /////////////////////////////////////////////////////////////
+
+#if TRACE_IMPLEMENT
+
+TRACE_CHANNEL_DEFINE(LogChannel)
+
+namespace UE {
+namespace Trace {
+namespace Private {
+
+#if 0
+TRACE_EVENT_BEGIN(Logging, LogCategory, NoSync|Important)
+ TRACE_EVENT_FIELD(const void*, CategoryPointer)
+ TRACE_EVENT_FIELD(uint8, DefaultVerbosity)
+ TRACE_EVENT_FIELD(UE::Trace::AnsiString, Name)
+TRACE_EVENT_END()
+#endif
+
+TRACE_EVENT_BEGIN(Logging, LogMessageSpec, NoSync|Important)
+ TRACE_EVENT_FIELD(uint32, LogPoint)
+ //TRACE_EVENT_FIELD(uint16, CategoryPointer)
+ TRACE_EVENT_FIELD(uint16, Line)
+ TRACE_EVENT_FIELD(UE::Trace::AnsiString, FileName)
+ TRACE_EVENT_FIELD(UE::Trace::AnsiString, FormatString)
+TRACE_EVENT_END()
+
+TRACE_EVENT_BEGIN(Logging, LogMessage, NoSync)
+ TRACE_EVENT_FIELD(uint32, LogPoint)
+ TRACE_EVENT_FIELD(uint64, Cycle)
+ TRACE_EVENT_FIELD(uint8[], FormatArgs)
+TRACE_EVENT_END()
+
+////////////////////////////////////////////////////////////////////////////////
+void LogMessageImpl(int Id, const void* ParamBuffer, int ParamSize)
+{
+ (void)ParamBuffer;
+ (void)ParamSize;
+
+ uint64 Timestamp = TimeGetTimestamp();
+
+ TRACE_LOG(Logging, LogMessage, true)
+ << LogMessage.LogPoint(Id)
+ << LogMessage.Cycle(Timestamp);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int LogMessageNew(
+ const std::string_view& Format,
+ const std::string_view& File,
+ int Line)
+{
+ static int volatile NextId = 1;
+ int Id = AtomicAddRelaxed(&NextId, 1);
+
+ int DataSize = 0;
+ DataSize += Format.size();
+ DataSize += File.size();
+
+ TRACE_LOG(Logging, LogMessageSpec, true, DataSize)
+ << LogMessageSpec.LogPoint(Id)
+ << LogMessageSpec.Line(uint16(Line))
+ << LogMessageSpec.FileName(File.data(), int(File.size()))
+ << LogMessageSpec.FormatString(Format.data(), int(Format.size()));
+
+ return Id;
}
+} // namespace Private
} // namespace Trace
} // namespace UE
diff --git a/zen/cmds/top.cpp b/zen/cmds/top.cpp
index 315d8cb38..f5b9d654a 100644
--- a/zen/cmds/top.cpp
+++ b/zen/cmds/top.cpp
@@ -43,7 +43,7 @@ TopCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) {
- ZEN_CONSOLE("{:5} {:6} {:24}", Entry.ListenPort, Entry.Pid, Entry.GetSessionId());
+ ZEN_CONSOLE("{:5} {:6} {:24}", Entry.EffectiveListenPort, Entry.Pid, Entry.GetSessionId());
});
zen::Sleep(1000);
@@ -78,7 +78,7 @@ PsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return 0;
}
- State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) { ZEN_CONSOLE("Port {} : pid {}", Entry.ListenPort, Entry.Pid); });
+ State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) { ZEN_CONSOLE("Port {} : pid {}", Entry.EffectiveListenPort, Entry.Pid); });
return 0;
}
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 228d33202..0f385116b 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -822,19 +822,14 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
CbPackage RpcResponse;
- CacheRecordPolicy Policy;
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ CacheRecordPolicy BatchPolicy = CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView());
std::vector<CacheKey> CacheKeys;
std::vector<IoBuffer> CacheValues;
std::vector<size_t> UpstreamRequests;
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
- Policy = CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView());
-
- const bool PartialRecord = EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::PartialRecord);
- const bool QueryRemote = EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote);
-
for (CbFieldView KeyView : Params["CacheKeys"sv])
{
CbObjectView KeyObject = KeyView.AsObjectView();
@@ -851,44 +846,84 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys)
{
ZenCacheValue CacheValue;
- uint32_t MissingCount = 0;
+ uint32_t MissingCount = 0;
+ uint32_t MissingReadFromUpstreamCount = 0;
- if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
+ if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
{
CbObjectView CacheRecord(CacheValue.Value.Data());
- CacheRecord.IterateAttachments([this, &MissingCount, &RpcResponse](CbFieldView AttachmentHash) {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
- {
- ZEN_ASSERT(Chunk.GetSize() > 0);
- RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
- }
- else
- {
- MissingCount++;
- }
- });
+ CacheRecord.IterateAttachments(
+ [this, &MissingCount, &MissingReadFromUpstreamCount, &RpcResponse, &BatchPolicy](CbFieldView AttachmentHash) {
+ CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy();
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal))
+ {
+ // A value that is requested without the Query flag (such as None/Disable) does not count as missing, because we
+ // didn't ask for it and thus the record is complete in its absence.
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ MissingReadFromUpstreamCount++;
+ MissingCount++;
+ }
+ }
+ else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
+ {
+ if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash()))
+ {
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ MissingReadFromUpstreamCount++;
+ }
+ MissingCount++;
+ }
+ }
+ else
+ {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ ZEN_ASSERT(Chunk.GetSize() > 0);
+ RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ }
+ else
+ {
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ MissingReadFromUpstreamCount++;
+ }
+ MissingCount++;
+ }
+ }
+ });
}
- if (CacheValue.Value && (MissingCount == 0 || PartialRecord))
+ if ((!CacheValue.Value && EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryRemote)) ||
+ MissingReadFromUpstreamCount != 0)
+ {
+ UpstreamRequests.push_back(KeyIndex);
+ }
+ else if (CacheValue.Value && (MissingCount == 0 || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)))
{
ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL) {}",
Key.Bucket,
Key.Hash,
NiceBytes(CacheValue.Value.Size()),
ToString(CacheValue.Value.GetContentType()),
- MissingCount ? "(PARTIAl)" : ""sv);
+ MissingCount ? "(PARTIAL)" : ""sv);
CacheValues[KeyIndex] = std::move(CacheValue.Value);
m_CacheStats.HitCount++;
}
- else if (QueryRemote)
- {
- UpstreamRequests.push_back(KeyIndex);
- }
else
{
- ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAl)"sv : ""sv);
- m_CacheStats.MissCount++;
+ if (!EnumHasAnyFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::Query))
+ {
+ // If they requested no query, do not record this as a miss
+ ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash);
+ }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAL)"sv : ""sv);
+ m_CacheStats.MissCount++;
+ }
}
++KeyIndex;
@@ -896,7 +931,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
if (!UpstreamRequests.empty())
{
- const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, PartialRecord](CacheRecordGetCompleteParams&& Params) {
+ const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, &BatchPolicy](CacheRecordGetCompleteParams&& Params) {
ZEN_ASSERT(Params.KeyIndex < CacheValues.size());
IoBuffer CacheValue;
@@ -904,37 +939,52 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
if (Params.Record)
{
- Params.Record.IterateAttachments([this, &RpcResponse, &Params, &Count](CbFieldView HashView) {
- if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash()))
+ Params.Record.IterateAttachments([this, &RpcResponse, &Params, &Count, &BatchPolicy](CbFieldView HashView) {
+ CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy();
+ bool FoundInUpstream = false;
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
{
- if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
+ if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash()))
{
- auto InsertResult = m_CidStore.AddChunk(Compressed);
- if (InsertResult.New)
+ FoundInUpstream = true;
+ if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
{
- Count.New++;
- }
- Count.Valid++;
+ FoundInUpstream = true;
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
+ {
+ auto InsertResult = m_CidStore.AddChunk(Compressed);
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ }
+ Count.Valid++;
- RpcResponse.AddAttachment(CbAttachment(Compressed));
- }
- else
- {
- ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'",
- HashView.AsHash(),
- Params.Key.Bucket,
- Params.Key.Hash);
- Count.Invalid++;
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
+ {
+ RpcResponse.AddAttachment(CbAttachment(Compressed));
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'",
+ HashView.AsHash(),
+ Params.Key.Bucket,
+ Params.Key.Hash);
+ Count.Invalid++;
+ }
}
}
- else if (m_CidStore.ContainsChunk(HashView.AsHash()))
+ if (!FoundInUpstream && EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal) &&
+ m_CidStore.ContainsChunk(HashView.AsHash()))
{
+ // We added the attachment for this Value in the local loop before calling m_UpstreamCache
Count.Valid++;
}
Count.Total++;
});
- if ((Count.Valid == Count.Total) || PartialRecord)
+ if ((Count.Valid == Count.Total) || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))
{
CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer();
}
@@ -952,9 +1002,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
Count.Total);
CacheValue.SetContentType(ZenContentType::kCbObject);
-
CacheValues[Params.KeyIndex] = CacheValue;
- m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue});
+ if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::StoreLocal))
+ {
+ m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue});
+ }
m_CacheStats.HitCount++;
m_CacheStats.UpstreamHitCount++;
@@ -967,7 +1019,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
}
};
- m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete));
+ m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, BatchPolicy, std::move(OnCacheRecordGetComplete));
}
CbObjectWriter ResponseObject;
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 8b02a437a..091406db3 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -1046,7 +1046,7 @@ public:
virtual void GetCacheRecords(std::span<CacheKey> CacheKeys,
std::span<size_t> KeyIndex,
- const CacheRecordPolicy& Policy,
+ const CacheRecordPolicy& DownstreamPolicy,
OnCacheRecordGetComplete&& OnComplete) override final
{
ZEN_TRACE_CPU("Upstream::GetCacheRecords");
@@ -1057,6 +1057,8 @@ public:
if (m_Options.ReadUpstream)
{
+ CacheRecordPolicy UpstreamPolicy = DownstreamPolicy.ConvertToUpstream();
+
for (auto& Endpoint : m_Endpoints)
{
if (RemainingKeys.empty())
@@ -1075,18 +1077,19 @@ public:
{
metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming);
- Result = Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) {
- if (Params.Record)
- {
- OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
+ Result =
+ Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, UpstreamPolicy, [&](CacheRecordGetCompleteParams&& Params) {
+ if (Params.Record)
+ {
+ OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
- Stats.CacheHitCount.Increment(1);
- }
- else
- {
- Missing.push_back(Params.KeyIndex);
- }
- });
+ Stats.CacheHitCount.Increment(1);
+ }
+ else
+ {
+ Missing.push_back(Params.KeyIndex);
+ }
+ });
}
Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size()));
diff --git a/zenutil/cache/cachepolicy.cpp b/zenutil/cache/cachepolicy.cpp
index ba345485a..3bf7a0c67 100644
--- a/zenutil/cache/cachepolicy.cpp
+++ b/zenutil/cache/cachepolicy.cpp
@@ -4,6 +4,7 @@
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
+#include <zencore/enumflags.h>
#include <zencore/string.h>
#include <algorithm>
@@ -206,6 +207,25 @@ CacheRecordPolicy::Load(CbObjectView Object, CachePolicy DefaultPolicy)
return Builder.Build();
}
+CacheRecordPolicy
+CacheRecordPolicy::ConvertToUpstream() const
+{
+ auto DownstreamToUpstream = [](CachePolicy P) {
+ // Remote|Local -> Set Remote
+ // Delete Skip Flags
+ // Maintain Remaining Flags
+ return (EnumHasAllFlags(P, CachePolicy::QueryRemote) ? CachePolicy::QueryLocal : CachePolicy::None) |
+ (EnumHasAllFlags(P, CachePolicy::StoreRemote) ? CachePolicy::StoreLocal : CachePolicy::None) |
+ (P & ~(CachePolicy::SkipData | CachePolicy::SkipMeta));
+ };
+ CacheRecordPolicyBuilder Builder(DownstreamToUpstream(GetDefaultValuePolicy()));
+ for (const CacheValuePolicy& ValuePolicy : GetValuePolicies())
+ {
+ Builder.AddValuePolicy(ValuePolicy.Id, DownstreamToUpstream(ValuePolicy.Policy));
+ }
+ return Builder.Build();
+}
+
void
CacheRecordPolicyBuilder::AddValuePolicy(const CacheValuePolicy& Policy)
{
diff --git a/zenutil/include/zenutil/cache/cachepolicy.h b/zenutil/include/zenutil/cache/cachepolicy.h
index f967f707b..b3602edbd 100644
--- a/zenutil/include/zenutil/cache/cachepolicy.h
+++ b/zenutil/include/zenutil/cache/cachepolicy.h
@@ -144,6 +144,10 @@ public:
*/
static CacheRecordPolicy Load(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default);
+ /** Return *this converted into the equivalent policy that the upstream should use when forwarding a put or get to an upstream server.
+ */
+ CacheRecordPolicy ConvertToUpstream() const;
+
private:
friend class CacheRecordPolicyBuilder;