aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp343
1 files changed, 167 insertions, 176 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index def1adb90..1e3eb5dcd 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -12,12 +12,16 @@
#include "structuredcache.h"
#include "structuredcachestore.h"
#include "upstream/jupiter.h"
+#include "upstream/upstreamcache.h"
#include "upstream/zen.h"
#include "zenstore/cidstore.h"
#include <spdlog/spdlog.h>
#include <algorithm>
+#include <atomic>
#include <filesystem>
+#include <queue>
+#include <thread>
namespace zen {
@@ -44,31 +48,19 @@ MapToHttpContentType(zen::ZenContentType Type)
}
};
-HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore)
+//////////////////////////////////////////////////////////////////////////
+
+HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InCacheStore,
+ zen::CasStore& InStore,
+ zen::CidStore& InCidStore,
+ std::unique_ptr<UpstreamCache> UpstreamCache)
: m_Log("cache", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks()))
+, m_CacheStore(InCacheStore)
, m_CasStore(InStore)
-, m_CacheStore(InStore, RootPath)
, m_CidStore(InCidStore)
+, m_UpstreamCache(std::move(UpstreamCache))
{
m_Log.set_level(spdlog::level::debug);
-
- m_Log.info("initializing structured cache at '{}'", RootPath);
-
-#if 0
- m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv,
- "ue4.ddc"sv /* namespace */,
- "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv /* provider */,
- "0oao91lrhqPiAlaGD0x7"sv /* client id */,
- "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */);
-#endif
-
-#if 0
- std::string_view UpstreamSpec = "http://arn-wd-15192.epicgames.net"sv;
-
- m_Log.info("Using upstream Zen cache at '{}'", UpstreamSpec);
-
- m_ZenClient = new ZenStructuredCacheClient(UpstreamSpec);
-#endif
}
HttpStructuredCacheService::~HttpStructuredCacheService()
@@ -161,66 +153,55 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
ZenCacheValue Value;
bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
- if (!Success && m_ZenClient)
+ if (!Success && m_UpstreamCache)
{
- ZenStructuredCacheSession Session(*m_ZenClient);
-
- zen::Stopwatch Timer;
+ const ZenContentType CacheRecordType =
+ Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : ZenContentType::kCbObject;
- try
+ if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType);
+ UpstreamResult.Success)
{
- Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey);
+ Value.Value = UpstreamResult.Value;
+ Success = true;
- m_Log.debug("Zen upstream GET ({}/{}) succeeded after {:5}!",
- Ref.BucketSegment,
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
-
- // TODO: this is incomplete and needs to propagate any referenced content
-
- Success = true;
- }
- catch (std::exception& e)
- {
- m_Log.warn("Zen upstream GET ({}/{}) FAILED after {:5}: '{}'",
- Ref.BucketSegment,
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- e.what());
-
- throw;
- }
- }
-
- if (m_Cloud)
- {
- // Note that this is not fully functional, pending implementation work on
- // the Jupiter end
-
- CloudCacheSession Session(m_Cloud);
-
- zen::Stopwatch Timer;
-
- try
- {
- Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey);
-
- m_Log.debug("Cloud upstream GET ({}/{}) succeeded after {:5}!",
- Ref.BucketSegment,
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
+ if (CacheRecordType == ZenContentType::kCbObject)
+ {
+ const zen::CbValidateError ValidationResult =
+ zen::ValidateCompactBinary(MemoryView(UpstreamResult.Value.Data(), UpstreamResult.Value.Size()),
+ zen::CbValidateMode::All);
- // TODO: this is incomplete and needs to propagate any referenced content
- }
- catch (std::exception& e)
- {
- m_Log.warn("Cloud upstream GET ({}/{}) FAILED after {:5}: '{}'",
- Ref.BucketSegment,
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- e.what());
+ if (ValidationResult == CbValidateError::None)
+ {
+ zen::CbObjectView Cbo(UpstreamResult.Value.Data());
+
+ std::vector<IoHash> References;
+ Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
+
+ if (!References.empty())
+ {
+ zen::CbObjectWriter Idx;
+ Idx.BeginArray("references");
+ for (const IoHash& Hash : References)
+ {
+ Idx.AddHash(Hash);
+ }
+ Idx.EndArray();
+
+ Value.IndexData = Idx.Save();
+ }
+ }
+ else
+ {
+ Value.Value = IoBuffer();
+ Success = false;
+ m_Log.warn("Upstream cache record '{}/{}' failed validation", Ref.BucketSegment, Ref.HashKey);
+ }
+ }
- throw;
+ if (Success)
+ {
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
+ }
}
}
@@ -248,140 +229,130 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
case kPut:
{
- if (zen::IoBuffer Body = Request.ReadPayload())
- {
- if (Body.Size() == 0)
- {
- return Request.WriteResponse(zen::HttpResponse::BadRequest);
- }
+ zen::IoBuffer Body = Request.ReadPayload();
- ZenCacheValue Value;
- Value.Value = Body;
+ if (!Body || Body.Size() == 0)
+ {
+ return Request.WriteResponse(zen::HttpResponse::BadRequest);
+ }
- HttpContentType ContentType = Request.RequestContentType();
+ const HttpContentType ContentType = Request.RequestContentType();
- bool IsCompactBinary;
+ bool IsCompactBinary = false;
- switch (ContentType)
- {
- case HttpContentType::kUnknownContentType:
- case HttpContentType::kBinary:
- IsCompactBinary = false;
- break;
+ switch (ContentType)
+ {
+ case HttpContentType::kUnknownContentType:
+ case HttpContentType::kBinary:
+ IsCompactBinary = false;
+ break;
- case HttpContentType::kCbObject:
- IsCompactBinary = true;
- break;
+ case HttpContentType::kCbObject:
+ IsCompactBinary = true;
+ break;
- default:
- return Request.WriteResponse(zen::HttpResponse::BadRequest);
- }
+ default:
+ return Request.WriteResponse(zen::HttpResponse::BadRequest);
+ }
- // Compute index data
+ if (!IsCompactBinary)
+ {
+ // TODO: create a cache record and put value in CAS?
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
+ m_Log.debug("PUT (binary) - '{}/{}' ({} bytes, {})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Body.Size(),
+ Body.GetContentType());
- if (IsCompactBinary)
+ if (m_UpstreamCache)
{
- // Validate payload before accessing it
- zen::CbValidateError ValidationResult =
- zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All);
-
- if (ValidationResult != CbValidateError::None)
- {
- m_Log.warn("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size());
-
- // TODO: add details in response
- return Request.WriteResponse(HttpResponse::BadRequest);
- }
-
- // Extract data for index
- zen::CbObjectView Cbo(Body.Data());
+ auto Result = m_UpstreamCache->EnqueueUpstream(
+ {.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}});
+ ZEN_ASSERT(Result.Success);
+ }
- std::vector<IoHash> References;
- Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
+ return Request.WriteResponse(zen::HttpResponse::Created);
+ }
- if (!References.empty())
- {
- zen::CbObjectWriter Idx;
- Idx.BeginArray("r");
+ // Validate payload before accessing it
+ const zen::CbValidateError ValidationResult =
+ zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All);
- for (const IoHash& Hash : References)
- {
- Idx.AddHash(Hash);
- }
+ if (ValidationResult != CbValidateError::None)
+ {
+ m_Log.warn("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size());
- Idx.EndArray();
+ // TODO: add details in response, kText || kCbObject?
+ return Request.WriteResponse(HttpResponse::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv);
+ }
- // TODO: store references in index
- }
- }
+ // Extract referenced payload hashes
+ zen::CbObjectView Cbo(Body.Data());
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
+ std::vector<IoHash> References;
+ std::vector<IoHash> MissingRefs;
+ Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
- m_Log.debug("PUT - '{}/{}' ({} bytes, {})",
- Ref.BucketSegment,
- Ref.HashKey,
- Value.Value.Size(),
- Value.Value.GetContentType());
+ ZenCacheValue CacheValue;
+ CacheValue.Value = Body;
- // absolutely be made asynchronous. By default these should be deferred
- // because the client should not care if the data has propagated upstream or
- // not
+ if (!References.empty())
+ {
+ zen::CbObjectWriter Idx;
+ Idx.BeginArray("references");
- if (m_ZenClient)
+ for (const IoHash& Hash : References)
{
- ZenStructuredCacheSession Session(*m_ZenClient);
-
- zen::Stopwatch Timer;
-
- try
- {
- Session.Put(Ref.BucketSegment, Ref.HashKey, Value);
-
- m_Log.debug("Zen upstream PUT ({}) succeeded after {:5}!",
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
- }
- catch (std::exception& e)
+ Idx.AddHash(Hash);
+ if (!m_CidStore.ContainsChunk(Hash))
{
- m_Log.warn("Zen upstream PUT ({}) FAILED after {:5}: '{}'",
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- e.what());
-
- throw;
+ MissingRefs.push_back(Hash);
}
}
- if (m_Cloud)
- {
- CloudCacheSession Session(m_Cloud);
+ Idx.EndArray();
- zen::Stopwatch Timer;
+ CacheValue.IndexData = Idx.Save();
+ }
- try
- {
- Session.Put(Ref.BucketSegment, Ref.HashKey, Value);
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
- m_Log.debug("upstream PUT ({}) succeeded after {:5}!",
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
- }
- catch (std::exception& e)
- {
- m_Log.warn("upstream PUT ({}) FAILED after {:5}: '{}'",
- Ref.HashKey,
- zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
- e.what());
+ m_Log.debug("PUT (cache record) - '{}/{}' ({} bytes, {}, ({}/{} refs/missing))",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ CacheValue.Value.Size(),
+ CacheValue.Value.GetContentType(),
+ References.size(),
+ MissingRefs.size());
- throw;
- }
+ if (MissingRefs.empty())
+ {
+ // Only enqueue valid cache records, i.e. all referenced payloads exists
+ if (m_UpstreamCache)
+ {
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ .CacheKey = {Ref.BucketSegment, Ref.HashKey},
+ .PayloadIds = std::move(References)});
+ ZEN_ASSERT(Result.Success);
}
return Request.WriteResponse(zen::HttpResponse::Created);
}
else
{
- return;
+ // TODO: Binary attachments?
+ zen::CbObjectWriter Response;
+ Response.BeginArray("needs");
+ for (const IoHash& MissingRef : MissingRefs)
+ {
+ Response.AddHash(MissingRef);
+ m_Log.debug("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef);
+ }
+ Response.EndArray();
+
+ // Return Created | BadRequest?
+ return Request.WriteResponse(zen::HttpResponse::Created, Response.Save());
}
}
break;
@@ -412,6 +383,26 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
{
zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId);
+ if (!Payload && m_UpstreamCache)
+ {
+ if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId});
+ UpstreamResult.Success)
+ {
+ if (zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Payload)))
+ {
+ Payload = UpstreamResult.Payload;
+ zen::IoHash ChunkHash = zen::IoHash::HashMemory(Payload);
+ zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash);
+
+ m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash);
+ }
+ else
+ {
+ m_Log.warn("got uncompressed upstream cache payload");
+ }
+ }
+ }
+
if (!Payload)
{
m_Log.debug("MISS - '{}/{}/{}' ({} bytes, {})",
@@ -512,7 +503,7 @@ HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, Cach
return false;
}
- OutRef.BucketSegment = Key.substr(0, BucketSplitOffset);
+ OutRef.BucketSegment = ToLower(Key.substr(0, BucketSplitOffset));
if (!std::all_of(begin(OutRef.BucketSegment), end(OutRef.BucketSegment), [](const char c) { return std::isalnum(c); }))
{