aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp270
1 files changed, 246 insertions, 24 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 0d62f297c..792d764cb 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -2,23 +2,39 @@
#pragma once
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/compress.h>
#include <zencore/fmtutils.h>
#include <zencore/httpserver.h>
+#include <zencore/timer.h>
-#include "cachestore.h"
#include "structuredcache.h"
+#include "structuredcachestore.h"
#include "upstream/jupiter.h"
+#include "zenstore/cidstore.h"
#include <spdlog/spdlog.h>
#include <filesystem>
namespace zen {
-HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore)
+using namespace std::literals;
+
+HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore)
: m_CasStore(InStore)
, m_CacheStore(InStore, RootPath)
+, m_CidStore(InCidStore)
{
spdlog::info("initializing structured cache at '{}'", RootPath);
+
+#if 0
+ m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv,
+ "ue4.ddc"sv /* namespace */,
+ "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv /* provider */,
+ "0oao91lrhqPiAlaGD0x7"sv /* client id */,
+ "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */);
+#endif
}
HttpStructuredCacheService::~HttpStructuredCacheService()
@@ -42,6 +58,21 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request)
return Request.WriteResponse(zen::HttpResponse::BadRequest); // invalid URL
}
+ if (Ref.PayloadId == IoHash::Zero)
+ {
+ return HandleCacheRecordRequest(Request, Ref);
+ }
+ else
+ {
+ return HandleCachePayloadRequest(Request, Ref);
+ }
+
+ return;
+}
+
+void
+HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref)
+{
switch (auto Verb = Request.RequestVerb())
{
using enum zen::HttpVerb;
@@ -49,25 +80,20 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request)
case kHead:
case kGet:
{
- CacheValue Value;
- bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
+ ZenCacheValue Value;
+ bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
if (!Success)
{
- Request.WriteResponse(zen::HttpResponse::NotFound);
+ return Request.WriteResponse(zen::HttpResponse::NotFound);
}
- else
+
+ if (Verb == kHead)
{
- if (Verb == kHead)
- {
- Request.SetSuppressResponseBody();
- Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Value.Value);
- }
- else
- {
- Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Value.Value);
- }
+ Request.SetSuppressResponseBody();
}
+
+ return Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Value.Value);
}
break;
@@ -75,12 +101,99 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request)
{
if (zen::IoBuffer Body = Request.ReadPayload())
{
- CacheValue Value;
+ if (Body.Size() == 0)
+ {
+ return Request.WriteResponse(zen::HttpResponse::BadRequest);
+ }
+
+ ZenCacheValue Value;
Value.Value = Body;
+ HttpContentType ContentType = Request.RequestContentType();
+
+ bool IsCompactBinary;
+
+ switch (ContentType)
+ {
+ case HttpContentType::kUnknownContentType:
+ case HttpContentType::kBinary:
+ IsCompactBinary = false;
+ break;
+
+ case HttpContentType::kCbObject:
+ IsCompactBinary = true;
+ break;
+
+ default:
+ return Request.WriteResponse(zen::HttpResponse::BadRequest);
+ }
+
+ // Compute index data
+
+ if (IsCompactBinary)
+ {
+ // Validate payload before accessing it
+ zen::CbValidateError ValidationResult =
+ zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All);
+
+ if (ValidationResult != CbValidateError::None)
+ {
+ // TODO: add details in response
+ return Request.WriteResponse(HttpResponse::BadRequest);
+ }
+
+ // Extract data for index
+ zen::CbObjectView Cbo(Body.Data());
+
+ std::vector<IoHash> References;
+ Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
+
+ if (!References.empty())
+ {
+ zen::CbObjectWriter Idx;
+ Idx.BeginArray("r");
+
+ for (const IoHash& Hash : References)
+ {
+ Idx.AddHash(Hash);
+ }
+
+ Idx.EndArray();
+ }
+
+ // TODO: store references in index
+ }
+
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
- Request.WriteResponse(zen::HttpResponse::Created);
+ // This is currently synchronous for simplicity and debuggability but should be
+ // made asynchronous
+
+ if (m_Cloud)
+ {
+ CloudCacheSession Session(m_Cloud);
+
+ zen::Stopwatch Timer;
+
+ try
+ {
+ Session.Put(Ref.BucketSegment, Ref.HashKey, Value);
+ spdlog::debug("upstream PUT ({}) succeeded after {:5}!",
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()));
+ }
+ catch (std::exception& e)
+ {
+ spdlog::debug("upstream PUT ({}) failed after {:5}: '{}'",
+ Ref.HashKey,
+ zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()),
+ e.what());
+
+ throw;
+ }
+ }
+
+ return Request.WriteResponse(zen::HttpResponse::Created);
}
else
{
@@ -97,26 +210,136 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request)
}
}
-[[nodiscard]] bool
+void
+HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref)
+{
+ // Note: the URL references the uncompressed payload hash - so this maintains the mapping
+ // from uncompressed CAS identity to the stored payload hash
+ //
+ // this is a PITA but a consequence of the fact that the client side code is not able to
+ // address data by compressed hash
+
+ switch (auto Verb = Request.RequestVerb())
+ {
+ using enum zen::HttpVerb;
+
+ case kHead:
+ case kGet:
+ {
+ // TODO: need to map from uncompressed content address into the storage
+ // (compressed) content address
+
+ zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId);
+
+ if (!Payload)
+ {
+ return Request.WriteResponse(zen::HttpResponse::NotFound);
+ }
+
+ if (Verb == kHead)
+ {
+ Request.SetSuppressResponseBody();
+ }
+
+ return Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Payload);
+ }
+ break;
+
+ case kPut:
+ {
+ if (zen::IoBuffer Body = Request.ReadPayload())
+ {
+ if (Body.Size() == 0)
+ {
+ return Request.WriteResponse(zen::HttpResponse::BadRequest);
+ }
+
+ zen::IoHash ChunkHash = zen::IoHash::HashMemory(Body);
+
+ zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(Body));
+
+ if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId)
+ {
+ // the URL specified content id and content hashes don't match!
+ return Request.WriteResponse(HttpResponse::BadRequest);
+ }
+
+ zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash);
+
+ m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash);
+
+ if (Result.New)
+ {
+ return Request.WriteResponse(zen::HttpResponse::Created);
+ }
+ else
+ {
+ return Request.WriteResponse(zen::HttpResponse::OK);
+ }
+ }
+ }
+ break;
+
+ case kPost:
+ break;
+
+ default:
+ break;
+ }
+}
+
+bool
HttpStructuredCacheService::ValidateUri(zen::HttpServerRequest& Request, CacheRef& OutRef)
{
std::string_view Key = Request.RelativeUri();
- std::string_view::size_type BucketSplitOffset = Key.find_last_of('/');
+ std::string_view::size_type BucketSplitOffset = Key.find_first_of('/');
if (BucketSplitOffset == std::string_view::npos)
{
return false;
}
- OutRef.BucketSegment = Key.substr(0, BucketSplitOffset);
- std::string_view HashSegment = Key.substr(BucketSplitOffset + 1);
+ OutRef.BucketSegment = Key.substr(0, BucketSplitOffset);
+
+ std::string_view HashSegment;
+ std::string_view PayloadSegment;
- if (HashSegment.size() != (2 * sizeof OutRef.HashKey.Hash))
+ std::string_view::size_type PayloadSplitOffset = Key.find_last_of('/');
+
+ // We know there is a slash so no need to check for npos return
+
+ if (PayloadSplitOffset == BucketSplitOffset)
+ {
+ // Basic cache record lookup
+ HashSegment = Key.substr(BucketSplitOffset + 1);
+ }
+ else
+ {
+ // Cache record + payload lookup
+ HashSegment = Key.substr(BucketSplitOffset + 1, PayloadSplitOffset - BucketSplitOffset - 1);
+ PayloadSegment = Key.substr(PayloadSplitOffset + 1);
+ }
+
+ if (HashSegment.size() != zen::IoHash::StringLength)
{
return false;
}
- bool IsOk = zen::ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash);
+ if (!PayloadSegment.empty() && PayloadSegment.size() == zen::IoHash::StringLength)
+ {
+ const bool IsOk = zen::ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash);
+
+ if (!IsOk)
+ {
+ return false;
+ }
+ }
+ else
+ {
+ OutRef.PayloadId = zen::IoHash::Zero;
+ }
+
+ const bool IsOk = zen::ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash);
if (!IsOk)
{
@@ -125,5 +348,4 @@ HttpStructuredCacheService::ValidateUri(zen::HttpServerRequest& Request, CacheRe
return true;
}
-
} // namespace zen