aboutsummaryrefslogtreecommitdiff
path: root/zenutil
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2022-01-28 13:30:07 +0100
committerStefan Boberg <[email protected]>2022-01-28 13:30:07 +0100
commit31a2c8a818a904969f17d24dbec7c50dcd688638 (patch)
tree3c1e58e1cc24c86134acfd40c8bf12f22b4f16d6 /zenutil
parentStructured cache PUTs now preserve content type for binary and compressed binary (diff)
parentCompile fix (diff)
downloadzen-31a2c8a818a904969f17d24dbec7c50dcd688638.tar.xz
zen-31a2c8a818a904969f17d24dbec7c50dcd688638.zip
Merge branch 'main' of https://github.com/EpicGames/zen
Diffstat (limited to 'zenutil')
-rw-r--r--zenutil/cache/cachepolicy.cpp20
-rw-r--r--zenutil/include/zenutil/cache/cachekey.h6
-rw-r--r--zenutil/include/zenutil/cache/cachepolicy.h4
-rw-r--r--zenutil/include/zenutil/zenserverprocess.h9
-rw-r--r--zenutil/zenserverprocess.cpp28
5 files changed, 47 insertions, 20 deletions
diff --git a/zenutil/cache/cachepolicy.cpp b/zenutil/cache/cachepolicy.cpp
index ba345485a..3bf7a0c67 100644
--- a/zenutil/cache/cachepolicy.cpp
+++ b/zenutil/cache/cachepolicy.cpp
@@ -4,6 +4,7 @@
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
+#include <zencore/enumflags.h>
#include <zencore/string.h>
#include <algorithm>
@@ -206,6 +207,25 @@ CacheRecordPolicy::Load(CbObjectView Object, CachePolicy DefaultPolicy)
return Builder.Build();
}
+CacheRecordPolicy
+CacheRecordPolicy::ConvertToUpstream() const
+{
+ auto DownstreamToUpstream = [](CachePolicy P) {
+ // Remote|Local -> Set Remote
+ // Delete Skip Flags
+ // Maintain Remaining Flags
+ return (EnumHasAllFlags(P, CachePolicy::QueryRemote) ? CachePolicy::QueryLocal : CachePolicy::None) |
+ (EnumHasAllFlags(P, CachePolicy::StoreRemote) ? CachePolicy::StoreLocal : CachePolicy::None) |
+ (P & ~(CachePolicy::SkipData | CachePolicy::SkipMeta));
+ };
+ CacheRecordPolicyBuilder Builder(DownstreamToUpstream(GetDefaultValuePolicy()));
+ for (const CacheValuePolicy& ValuePolicy : GetValuePolicies())
+ {
+ Builder.AddValuePolicy(ValuePolicy.Id, DownstreamToUpstream(ValuePolicy.Policy));
+ }
+ return Builder.Build();
+}
+
void
CacheRecordPolicyBuilder::AddValuePolicy(const CacheValuePolicy& Policy)
{
diff --git a/zenutil/include/zenutil/cache/cachekey.h b/zenutil/include/zenutil/cache/cachekey.h
index fb36c7759..a0a83a883 100644
--- a/zenutil/include/zenutil/cache/cachekey.h
+++ b/zenutil/include/zenutil/cache/cachekey.h
@@ -44,7 +44,7 @@ struct CacheChunkRequest
{
CacheKey Key;
IoHash ChunkId;
- Oid PayloadId;
+ Oid ValueId;
uint64_t RawOffset = 0ull;
uint64_t RawSize = ~uint64_t(0);
CachePolicy Policy = CachePolicy::Default;
@@ -69,11 +69,11 @@ operator<(const CacheChunkRequest& A, const CacheChunkRequest& B)
{
return false;
}
- if (A.PayloadId < B.PayloadId)
+ if (A.ValueId < B.ValueId)
{
return true;
}
- if (B.PayloadId < A.PayloadId)
+ if (B.ValueId < A.ValueId)
{
return false;
}
diff --git a/zenutil/include/zenutil/cache/cachepolicy.h b/zenutil/include/zenutil/cache/cachepolicy.h
index f967f707b..b3602edbd 100644
--- a/zenutil/include/zenutil/cache/cachepolicy.h
+++ b/zenutil/include/zenutil/cache/cachepolicy.h
@@ -144,6 +144,10 @@ public:
*/
static CacheRecordPolicy Load(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default);
+ /** Return *this converted into the equivalent policy that the upstream should use when forwarding a put or get to an upstream server.
+ */
+ CacheRecordPolicy ConvertToUpstream() const;
+
private:
friend class CacheRecordPolicyBuilder;
diff --git a/zenutil/include/zenutil/zenserverprocess.h b/zenutil/include/zenutil/zenserverprocess.h
index 55b9a50cd..2a3146e2d 100644
--- a/zenutil/include/zenutil/zenserverprocess.h
+++ b/zenutil/include/zenutil/zenserverprocess.h
@@ -100,11 +100,12 @@ public:
// additional state. For example, you can use the session ID
// to introduce additional named objects
std::atomic<uint32_t> Pid;
- std::atomic<uint16_t> ListenPort;
+ std::atomic<uint16_t> DesiredListenPort;
std::atomic<uint16_t> Flags;
uint8_t SessionId[12];
std::atomic<uint32_t> SponsorPids[8];
- uint8_t Padding[12];
+ std::atomic<uint16_t> EffectiveListenPort;
+ uint8_t Padding[10];
enum class FlagsEnum : uint16_t
{
@@ -125,8 +126,8 @@ public:
void Initialize();
[[nodiscard]] bool InitializeReadOnly();
- [[nodiscard]] ZenServerEntry* Lookup(int ListenPort);
- ZenServerEntry* Register(int ListenPort);
+ [[nodiscard]] ZenServerEntry* Lookup(int DesiredListenPort);
+ ZenServerEntry* Register(int DesiredListenPort);
void Sweep();
void Snapshot(std::function<void(const ZenServerEntry&)>&& Callback);
inline bool IsReadOnly() const { return m_IsReadOnly; }
diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp
index fe6236d18..5bddc72bc 100644
--- a/zenutil/zenserverprocess.cpp
+++ b/zenutil/zenserverprocess.cpp
@@ -230,11 +230,11 @@ ZenServerState::InitializeReadOnly()
}
ZenServerState::ZenServerEntry*
-ZenServerState::Lookup(int ListenPort)
+ZenServerState::Lookup(int DesiredListenPort)
{
for (int i = 0; i < m_MaxEntryCount; ++i)
{
- if (m_Data[i].ListenPort == ListenPort)
+ if (m_Data[i].DesiredListenPort == DesiredListenPort)
{
return &m_Data[i];
}
@@ -244,7 +244,7 @@ ZenServerState::Lookup(int ListenPort)
}
ZenServerState::ZenServerEntry*
-ZenServerState::Register(int ListenPort)
+ZenServerState::Register(int DesiredListenPort)
{
if (m_Data == nullptr)
{
@@ -259,17 +259,18 @@ ZenServerState::Register(int ListenPort)
{
ZenServerEntry& Entry = m_Data[i];
- if (Entry.ListenPort.load(std::memory_order_relaxed) == 0)
+ if (Entry.DesiredListenPort.load(std::memory_order_relaxed) == 0)
{
uint16_t Expected = 0;
- if (Entry.ListenPort.compare_exchange_strong(Expected, uint16_t(ListenPort)))
+ if (Entry.DesiredListenPort.compare_exchange_strong(Expected, uint16_t(DesiredListenPort)))
{
// Successfully allocated entry
m_OurEntry = &Entry;
- Entry.Pid = Pid;
- Entry.Flags = 0;
+ Entry.Pid = Pid;
+ Entry.EffectiveListenPort = 0;
+ Entry.Flags = 0;
const Oid SesId = GetSessionId();
memcpy(Entry.SessionId, &SesId, sizeof SesId);
@@ -296,11 +297,11 @@ ZenServerState::Sweep()
{
ZenServerEntry& Entry = m_Data[i];
- if (Entry.ListenPort)
+ if (Entry.DesiredListenPort)
{
if (IsProcessRunning(Entry.Pid) == false)
{
- ZEN_DEBUG("Sweep - pid {} not running, reclaiming entry (port {})", Entry.Pid, Entry.ListenPort);
+ ZEN_DEBUG("Sweep - pid {} not running, reclaiming entry (port {})", Entry.Pid, Entry.DesiredListenPort);
Entry.Reset();
}
@@ -320,7 +321,7 @@ ZenServerState::Snapshot(std::function<void(const ZenServerEntry&)>&& Callback)
{
ZenServerEntry& Entry = m_Data[i];
- if (Entry.ListenPort)
+ if (Entry.DesiredListenPort)
{
Callback(Entry);
}
@@ -330,9 +331,10 @@ ZenServerState::Snapshot(std::function<void(const ZenServerEntry&)>&& Callback)
void
ZenServerState::ZenServerEntry::Reset()
{
- Pid = 0;
- ListenPort = 0;
- Flags = 0;
+ Pid = 0;
+ DesiredListenPort = 0;
+ Flags = 0;
+ EffectiveListenPort = 0;
}
void