diff options
| author | Stefan Boberg <[email protected]> | 2022-01-28 13:30:07 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2022-01-28 13:30:07 +0100 |
| commit | 31a2c8a818a904969f17d24dbec7c50dcd688638 (patch) | |
| tree | 3c1e58e1cc24c86134acfd40c8bf12f22b4f16d6 /zenutil | |
| parent | Structured cache PUTs now preserve content type for binary and compressed binary (diff) | |
| parent | Compile fix (diff) | |
| download | zen-31a2c8a818a904969f17d24dbec7c50dcd688638.tar.xz zen-31a2c8a818a904969f17d24dbec7c50dcd688638.zip | |
Merge branch 'main' of https://github.com/EpicGames/zen
Diffstat (limited to 'zenutil')
| -rw-r--r-- | zenutil/cache/cachepolicy.cpp | 20 | ||||
| -rw-r--r-- | zenutil/include/zenutil/cache/cachekey.h | 6 | ||||
| -rw-r--r-- | zenutil/include/zenutil/cache/cachepolicy.h | 4 | ||||
| -rw-r--r-- | zenutil/include/zenutil/zenserverprocess.h | 9 | ||||
| -rw-r--r-- | zenutil/zenserverprocess.cpp | 28 |
5 files changed, 47 insertions, 20 deletions
diff --git a/zenutil/cache/cachepolicy.cpp b/zenutil/cache/cachepolicy.cpp index ba345485a..3bf7a0c67 100644 --- a/zenutil/cache/cachepolicy.cpp +++ b/zenutil/cache/cachepolicy.cpp @@ -4,6 +4,7 @@ #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> +#include <zencore/enumflags.h> #include <zencore/string.h> #include <algorithm> @@ -206,6 +207,25 @@ CacheRecordPolicy::Load(CbObjectView Object, CachePolicy DefaultPolicy) return Builder.Build(); } +CacheRecordPolicy +CacheRecordPolicy::ConvertToUpstream() const +{ + auto DownstreamToUpstream = [](CachePolicy P) { + // Remote|Local -> Set Remote + // Delete Skip Flags + // Maintain Remaining Flags + return (EnumHasAllFlags(P, CachePolicy::QueryRemote) ? CachePolicy::QueryLocal : CachePolicy::None) | + (EnumHasAllFlags(P, CachePolicy::StoreRemote) ? CachePolicy::StoreLocal : CachePolicy::None) | + (P & ~(CachePolicy::SkipData | CachePolicy::SkipMeta)); + }; + CacheRecordPolicyBuilder Builder(DownstreamToUpstream(GetDefaultValuePolicy())); + for (const CacheValuePolicy& ValuePolicy : GetValuePolicies()) + { + Builder.AddValuePolicy(ValuePolicy.Id, DownstreamToUpstream(ValuePolicy.Policy)); + } + return Builder.Build(); +} + void CacheRecordPolicyBuilder::AddValuePolicy(const CacheValuePolicy& Policy) { diff --git a/zenutil/include/zenutil/cache/cachekey.h b/zenutil/include/zenutil/cache/cachekey.h index fb36c7759..a0a83a883 100644 --- a/zenutil/include/zenutil/cache/cachekey.h +++ b/zenutil/include/zenutil/cache/cachekey.h @@ -44,7 +44,7 @@ struct CacheChunkRequest { CacheKey Key; IoHash ChunkId; - Oid PayloadId; + Oid ValueId; uint64_t RawOffset = 0ull; uint64_t RawSize = ~uint64_t(0); CachePolicy Policy = CachePolicy::Default; @@ -69,11 +69,11 @@ operator<(const CacheChunkRequest& A, const CacheChunkRequest& B) { return false; } - if (A.PayloadId < B.PayloadId) + if (A.ValueId < B.ValueId) { return true; } - if (B.PayloadId < A.PayloadId) + if (B.ValueId < A.ValueId) { return false; } diff --git a/zenutil/include/zenutil/cache/cachepolicy.h b/zenutil/include/zenutil/cache/cachepolicy.h index f967f707b..b3602edbd 100644 --- a/zenutil/include/zenutil/cache/cachepolicy.h +++ b/zenutil/include/zenutil/cache/cachepolicy.h @@ -144,6 +144,10 @@ public: */ static CacheRecordPolicy Load(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default); + /** Return *this converted into the equivalent policy that the upstream should use when forwarding a put or get to an upstream server. + */ + CacheRecordPolicy ConvertToUpstream() const; + private: friend class CacheRecordPolicyBuilder; diff --git a/zenutil/include/zenutil/zenserverprocess.h b/zenutil/include/zenutil/zenserverprocess.h index 55b9a50cd..2a3146e2d 100644 --- a/zenutil/include/zenutil/zenserverprocess.h +++ b/zenutil/include/zenutil/zenserverprocess.h @@ -100,11 +100,12 @@ public: // additional state. For example, you can use the session ID // to introduce additional named objects std::atomic<uint32_t> Pid; - std::atomic<uint16_t> ListenPort; + std::atomic<uint16_t> DesiredListenPort; std::atomic<uint16_t> Flags; uint8_t SessionId[12]; std::atomic<uint32_t> SponsorPids[8]; - uint8_t Padding[12]; + std::atomic<uint16_t> EffectiveListenPort; + uint8_t Padding[10]; enum class FlagsEnum : uint16_t { @@ -125,8 +126,8 @@ public: void Initialize(); [[nodiscard]] bool InitializeReadOnly(); - [[nodiscard]] ZenServerEntry* Lookup(int ListenPort); - ZenServerEntry* Register(int ListenPort); + [[nodiscard]] ZenServerEntry* Lookup(int DesiredListenPort); + ZenServerEntry* Register(int DesiredListenPort); void Sweep(); void Snapshot(std::function<void(const ZenServerEntry&)>&& Callback); inline bool IsReadOnly() const { return m_IsReadOnly; } diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp index fe6236d18..5bddc72bc 100644 --- a/zenutil/zenserverprocess.cpp +++ b/zenutil/zenserverprocess.cpp @@ -230,11 +230,11 @@ ZenServerState::InitializeReadOnly() } ZenServerState::ZenServerEntry* -ZenServerState::Lookup(int ListenPort) +ZenServerState::Lookup(int DesiredListenPort) { for (int i = 0; i < m_MaxEntryCount; ++i) { - if (m_Data[i].ListenPort == ListenPort) + if (m_Data[i].DesiredListenPort == DesiredListenPort) { return &m_Data[i]; } @@ -244,7 +244,7 @@ ZenServerState::Lookup(int ListenPort) } ZenServerState::ZenServerEntry* -ZenServerState::Register(int ListenPort) +ZenServerState::Register(int DesiredListenPort) { if (m_Data == nullptr) { @@ -259,17 +259,18 @@ ZenServerState::Register(int ListenPort) { ZenServerEntry& Entry = m_Data[i]; - if (Entry.ListenPort.load(std::memory_order_relaxed) == 0) + if (Entry.DesiredListenPort.load(std::memory_order_relaxed) == 0) { uint16_t Expected = 0; - if (Entry.ListenPort.compare_exchange_strong(Expected, uint16_t(ListenPort))) + if (Entry.DesiredListenPort.compare_exchange_strong(Expected, uint16_t(DesiredListenPort))) { // Successfully allocated entry m_OurEntry = &Entry; - Entry.Pid = Pid; - Entry.Flags = 0; + Entry.Pid = Pid; + Entry.EffectiveListenPort = 0; + Entry.Flags = 0; const Oid SesId = GetSessionId(); memcpy(Entry.SessionId, &SesId, sizeof SesId); @@ -296,11 +297,11 @@ ZenServerState::Sweep() { ZenServerEntry& Entry = m_Data[i]; - if (Entry.ListenPort) + if (Entry.DesiredListenPort) { if (IsProcessRunning(Entry.Pid) == false) { - ZEN_DEBUG("Sweep - pid {} not running, reclaiming entry (port {})", Entry.Pid, Entry.ListenPort); + ZEN_DEBUG("Sweep - pid {} not running, reclaiming entry (port {})", Entry.Pid, Entry.DesiredListenPort); Entry.Reset(); } @@ -320,7 +321,7 @@ ZenServerState::Snapshot(std::function<void(const ZenServerEntry&)>&& Callback) { ZenServerEntry& Entry = m_Data[i]; - if (Entry.ListenPort) + if (Entry.DesiredListenPort) { Callback(Entry); } @@ -330,9 +331,10 @@ ZenServerState::Snapshot(std::function<void(const ZenServerEntry&)>&& Callback) void ZenServerState::ZenServerEntry::Reset() { - Pid = 0; - ListenPort = 0; - Flags = 0; + Pid = 0; + DesiredListenPort = 0; + Flags = 0; + EffectiveListenPort = 0; } void |