aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2022-05-20 12:42:56 +0200
committerStefan Boberg <[email protected]>2022-05-20 12:42:56 +0200
commit5b271be0169b842cdc3d576e48bf0ddc2f122852 (patch)
tree16f501d2190f19a7281ce3f30365817464e146cb /zenserver/cache
parentAdded ZEN_USE_CATCH2 define (diff)
parentfix mac compilation error (diff)
downloadzen-5b271be0169b842cdc3d576e48bf0ddc2f122852.tar.xz
zen-5b271be0169b842cdc3d576e48bf0ddc2f122852.zip
Merge branch 'main' into use-catch2
Diffstat (limited to 'zenserver/cache')
-rw-r--r--zenserver/cache/structuredcache.cpp946
-rw-r--r--zenserver/cache/structuredcache.h54
-rw-r--r--zenserver/cache/structuredcachestore.cpp2237
-rw-r--r--zenserver/cache/structuredcachestore.h135
4 files changed, 2614 insertions, 758 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 8ae531720..e11499289 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -35,6 +35,11 @@
#include <gsl/gsl-lite.hpp>
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+#endif
+
namespace zen {
using namespace std::literals;
@@ -65,11 +70,232 @@ struct AttachmentCount
struct PutRequestData
{
+ std::string Namespace;
CacheKey Key;
CbObjectView RecordObject;
CacheRecordPolicy Policy;
};
+namespace {
+ static constexpr std::string_view HttpZCacheRPCPrefix = "$rpc"sv;
+
+ struct HttpRequestData
+ {
+ std::optional<std::string> Namespace;
+ std::optional<std::string> Bucket;
+ std::optional<IoHash> HashKey;
+ std::optional<IoHash> ValueContentId;
+ };
+
+ const char* ValidNamespaceNameCharacters = "abcdefghijklmnopqrstuvwxyz0123456789-_.ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+ const char* ValidBucketNameCharacters = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+
+ std::optional<std::string> GetValidNamespaceName(std::string_view Name)
+ {
+ if (Name.empty())
+ {
+ return {};
+ }
+ if (Name.length() > 64)
+ {
+ return {};
+ }
+
+ if (Name.find_first_not_of(ValidNamespaceNameCharacters) != std::string::npos)
+ {
+ return {};
+ }
+ return ToLower(Name);
+ }
+
+ std::optional<std::string> GetValidBucketName(std::string_view Name)
+ {
+ if (Name.empty())
+ {
+ return {};
+ }
+ if (Name.find_first_not_of(ValidBucketNameCharacters) != std::string::npos)
+ {
+ return {};
+ }
+ return ToLower(Name);
+ }
+
+ std::optional<IoHash> GetValidIoHash(std::string_view Hash)
+ {
+ if (Hash.length() != IoHash::StringLength)
+ {
+ return {};
+ }
+
+ IoHash KeyHash;
+ if (!ParseHexBytes(Hash.data(), Hash.size(), KeyHash.Hash))
+ {
+ return {};
+ }
+ return KeyHash;
+ }
+
+ bool HttpRequestParseRelativeUri(std::string_view Key, HttpRequestData& Data)
+ {
+ std::vector<std::string_view> Tokens;
+ uint32_t TokenCount = zen::ForEachStrTok(Key, '/', [&](const std::string_view& Token) {
+ Tokens.push_back(Token);
+ return true;
+ });
+
+ switch (TokenCount)
+ {
+ case 1:
+ Data.Namespace = GetValidNamespaceName(Tokens[0]);
+ return Data.Namespace.has_value();
+ case 2:
+ {
+ std::optional<IoHash> PossibleHashKey = GetValidIoHash(Tokens[1]);
+ if (PossibleHashKey.has_value())
+ {
+ // Legacy bucket/key request
+ Data.Bucket = GetValidBucketName(Tokens[0]);
+ if (!Data.Bucket.has_value())
+ {
+ return false;
+ }
+ Data.HashKey = PossibleHashKey;
+ Data.Namespace = ZenCacheStore::DefaultNamespace;
+ return true;
+ }
+ Data.Namespace = GetValidNamespaceName(Tokens[0]);
+ if (!Data.Namespace.has_value())
+ {
+ return false;
+ }
+ Data.Bucket = GetValidBucketName(Tokens[1]);
+ if (!Data.Bucket.has_value())
+ {
+ return false;
+ }
+ return true;
+ }
+ case 3:
+ {
+ std::optional<IoHash> PossibleHashKey = GetValidIoHash(Tokens[1]);
+ if (PossibleHashKey.has_value())
+ {
+ // Legacy bucket/key/valueid request
+ Data.Bucket = GetValidBucketName(Tokens[0]);
+ if (!Data.Bucket.has_value())
+ {
+ return false;
+ }
+ Data.HashKey = PossibleHashKey;
+ Data.ValueContentId = GetValidIoHash(Tokens[2]);
+ if (!Data.ValueContentId.has_value())
+ {
+ return false;
+ }
+ Data.Namespace = ZenCacheStore::DefaultNamespace;
+ return true;
+ }
+ Data.Namespace = GetValidNamespaceName(Tokens[0]);
+ if (!Data.Namespace.has_value())
+ {
+ return false;
+ }
+ Data.Bucket = GetValidBucketName(Tokens[1]);
+ if (!Data.Bucket.has_value())
+ {
+ return false;
+ }
+ Data.HashKey = GetValidIoHash(Tokens[2]);
+ if (!Data.HashKey)
+ {
+ return false;
+ }
+ return true;
+ }
+ case 4:
+ {
+ Data.Namespace = GetValidNamespaceName(Tokens[0]);
+ if (!Data.Namespace.has_value())
+ {
+ return false;
+ }
+
+ Data.Bucket = GetValidBucketName(Tokens[1]);
+ if (!Data.Bucket.has_value())
+ {
+ return false;
+ }
+
+ Data.HashKey = GetValidIoHash(Tokens[2]);
+ if (!Data.HashKey.has_value())
+ {
+ return false;
+ }
+
+ Data.ValueContentId = GetValidIoHash(Tokens[3]);
+ if (!Data.ValueContentId.has_value())
+ {
+ return false;
+ }
+ return true;
+ }
+ default:
+ return false;
+ }
+ }
+
+ std::optional<std::string> GetRpcRequestNamespace(const CbObjectView Params)
+ {
+ CbFieldView NamespaceField = Params["Namespace"sv];
+ if (!NamespaceField)
+ {
+ return std::string(ZenCacheStore::DefaultNamespace);
+ }
+
+ if (NamespaceField.HasError())
+ {
+ return {};
+ }
+ if (!NamespaceField.IsString())
+ {
+ return {};
+ }
+ return GetValidNamespaceName(NamespaceField.AsString());
+ }
+
+ bool GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key)
+ {
+ CbFieldView BucketField = KeyView["Bucket"sv];
+ if (BucketField.HasError())
+ {
+ return false;
+ }
+ if (!BucketField.IsString())
+ {
+ return false;
+ }
+ std::optional<std::string> Bucket = GetValidBucketName(BucketField.AsString());
+ if (!Bucket.has_value())
+ {
+ return false;
+ }
+ CbFieldView HashField = KeyView["Hash"sv];
+ if (HashField.HasError())
+ {
+ return false;
+ }
+ if (!HashField.IsHash())
+ {
+ return false;
+ }
+ IoHash Hash = HashField.AsHash();
+ Key = CacheKey::Create(*Bucket, Hash);
+ return true;
+ }
+
+} // namespace
+
//////////////////////////////////////////////////////////////////////////
HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
@@ -124,45 +350,55 @@ HttpStructuredCacheService::Scrub(ScrubContext& Ctx)
void
HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
{
- CacheRef Ref;
-
metrics::OperationTiming::Scope $(m_HttpRequests);
- if (!ValidateKeyUri(Request, /* out */ Ref))
+ std::string_view Key = Request.RelativeUri();
+ if (Key == HttpZCacheRPCPrefix)
{
- std::string_view Key = Request.RelativeUri();
-
- if (Key == "$rpc")
- {
- return HandleRpcRequest(Request);
- }
-
- if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); }))
- {
- // Bucket reference
-
- return HandleCacheBucketRequest(Request, Key);
- }
+ return HandleRpcRequest(Request);
+ }
+ HttpRequestData RequestData;
+ if (!HttpRequestParseRelativeUri(Key, RequestData))
+ {
return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
}
- CachePolicy PolicyFromURL = ParseCachePolicy(Request.GetQueryParams());
+ if (RequestData.ValueContentId.has_value())
+ {
+ ZEN_ASSERT(RequestData.Namespace.has_value());
+ ZEN_ASSERT(RequestData.Bucket.has_value());
+ ZEN_ASSERT(RequestData.HashKey.has_value());
+ CacheRef Ref = {.Namespace = RequestData.Namespace.value(),
+ .BucketSegment = RequestData.Bucket.value(),
+ .HashKey = RequestData.HashKey.value(),
+ .ValueContentId = RequestData.ValueContentId.value()};
+ return HandleCacheValueRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams()));
+ }
- if (Ref.ValueContentId == IoHash::Zero)
+ if (RequestData.HashKey.has_value())
{
- return HandleCacheRecordRequest(Request, Ref, PolicyFromURL);
+ ZEN_ASSERT(RequestData.Namespace.has_value());
+ ZEN_ASSERT(RequestData.Bucket.has_value());
+ CacheRef Ref = {.Namespace = RequestData.Namespace.value(),
+ .BucketSegment = RequestData.Bucket.value(),
+ .HashKey = RequestData.HashKey.value(),
+ .ValueContentId = IoHash::Zero};
+ return HandleCacheRecordRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams()));
}
- else
+
+ if (RequestData.Bucket.has_value())
{
- return HandleCacheValueRequest(Request, Ref, PolicyFromURL);
+ ZEN_ASSERT(RequestData.Namespace.has_value());
+ return HandleCacheBucketRequest(Request, RequestData.Namespace.value(), RequestData.Bucket.value());
}
- return;
+ ZEN_ASSERT(RequestData.Namespace.has_value());
+ return HandleCacheNamespaceRequest(Request, RequestData.Namespace.value());
}
void
-HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Bucket)
+HttpStructuredCacheService::HandleCacheNamespaceRequest(zen::HttpServerRequest& Request, std::string_view)
{
switch (Request.RequestVerb())
{
@@ -174,15 +410,47 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
break;
case HttpVerb::kDelete:
- // Drop bucket
+ // Drop namespace
+ {
+ // if (m_CacheStore.DropNamespace(Namespace))
+ // {
+ // return Request.WriteResponse(HttpResponseCode::OK);
+ // }
+ // else
+ // {
+ // return Request.WriteResponse(HttpResponseCode::NotFound);
+ // }
+ }
+ break;
+
+ default:
+ break;
+ }
+}
- if (m_CacheStore.DropBucket(Bucket))
+void
+HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket)
+{
+ switch (Request.RequestVerb())
+ {
+ case HttpVerb::kHead:
+ case HttpVerb::kGet:
{
- return Request.WriteResponse(HttpResponseCode::OK);
+ // Query stats
}
- else
+ break;
+
+ case HttpVerb::kDelete:
+ // Drop bucket
{
- return Request.WriteResponse(HttpResponseCode::NotFound);
+ if (m_CacheStore.DropBucket(Namespace, Bucket))
+ {
+ return Request.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ return Request.WriteResponse(HttpResponseCode::NotFound);
+ }
}
break;
@@ -192,19 +460,19 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
}
void
-HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
+HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
{
switch (Request.RequestVerb())
{
case HttpVerb::kHead:
case HttpVerb::kGet:
{
- HandleGetCacheRecord(Request, Ref, PolicyFromURL);
+ HandleGetCacheRecord(Request, Ref, PolicyFromUrl);
}
break;
case HttpVerb::kPut:
- HandlePutCacheRecord(Request, Ref, PolicyFromURL);
+ HandlePutCacheRecord(Request, Ref, PolicyFromUrl);
break;
default:
break;
@@ -212,20 +480,21 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request,
}
void
-HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
+HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
{
const ZenContentType AcceptType = Request.AcceptContentType();
- const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData);
- const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord);
+ const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData);
+ const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord);
bool Success = false;
ZenCacheValue ClientResultValue;
- if (!EnumHasAnyFlags(PolicyFromURL, CachePolicy::Query))
+ if (!EnumHasAnyFlags(PolicyFromUrl, CachePolicy::Query))
{
return Request.WriteResponse(HttpResponseCode::OK);
}
- if (EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal) && m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, ClientResultValue))
+ if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) &&
+ m_CacheStore.Get(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue))
{
Success = true;
ZenContentType ContentType = ClientResultValue.Value.GetContentType();
@@ -286,7 +555,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
if (Success)
{
- ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)",
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' (LOCAL)",
+ Ref.Namespace,
Ref.BucketSegment,
Ref.HashKey,
NiceBytes(ClientResultValue.Value.Size()),
@@ -303,9 +573,9 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value);
}
}
- else if (!EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryRemote))
+ else if (!EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote))
{
- ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
+ ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
m_CacheStats.MissCount++;
return Request.WriteResponse(HttpResponseCode::NotFound);
}
@@ -313,17 +583,18 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
// Issue upstream query asynchronously in order to keep requests flowing without
// hogging I/O servicing threads with blocking work
- Request.WriteResponseAsync([this, AcceptType, PolicyFromURL, Ref](HttpServerRequest& AsyncRequest) {
+ Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref](HttpServerRequest& AsyncRequest) {
bool Success = false;
- const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord);
- const bool QueryLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal);
- const bool StoreLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreLocal);
- const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData);
+ const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord);
+ const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal);
+ const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData);
ZenCacheValue ClientResultValue;
metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming);
- if (GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, AcceptType);
+ if (GetUpstreamCacheResult UpstreamResult =
+ m_UpstreamCache.GetCacheRecord(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, AcceptType);
UpstreamResult.Success)
{
Success = true;
@@ -339,7 +610,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
if (ValidationResult != CbValidateError::None)
{
Success = false;
- ZEN_WARN("Get - '{}/{}' '{}' FAILED, invalid compact binary object from upstream",
+ ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid compact binary object from upstream",
+ Ref.Namespace,
Ref.BucketSegment,
Ref.HashKey,
ToString(AcceptType));
@@ -350,7 +622,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
if (Success && StoreLocal)
{
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, ClientResultValue);
+ m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue);
}
}
else if (AcceptType == ZenContentType::kCbPackage)
@@ -404,7 +676,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
if (StoreLocal)
{
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
+ m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
}
BinaryWriter MemStream;
@@ -433,14 +705,19 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
else
{
Success = false;
- ZEN_WARN("Get - '{}/{}' '{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
+ ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid upstream package",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ToString(AcceptType));
}
}
}
if (Success)
{
- ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)",
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' (UPSTREAM)",
+ Ref.Namespace,
Ref.BucketSegment,
Ref.HashKey,
NiceBytes(ClientResultValue.Value.Size()),
@@ -462,7 +739,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
}
else
{
- ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
+ ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
m_CacheStats.MissCount++;
AsyncRequest.WriteResponse(HttpResponseCode::NotFound);
}
@@ -470,7 +747,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
}
void
-HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
+HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
{
IoBuffer Body = Request.ReadPayload();
@@ -485,12 +762,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary)
{
- ZEN_DEBUG("PUT - '{}/{}' {} '{}'", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType));
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
+ ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}'", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType));
+ m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body});
- if (EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreRemote))
+ if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote))
{
- m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Key = {Ref.BucketSegment, Ref.HashKey}});
+ m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}});
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -501,11 +778,15 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (ValidationResult != CbValidateError::None)
{
- ZEN_WARN("PUT - '{}/{}' '{}' FAILED, invalid compact binary", Ref.BucketSegment, Ref.HashKey, ToString(ContentType));
+ ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, invalid compact binary",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ToString(ContentType));
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv);
}
- CachePolicy Policy = PolicyFromURL;
+ CachePolicy Policy = PolicyFromUrl;
CbObjectView CacheRecord(Body.Data());
std::vector<IoHash> ValidAttachments;
int32_t TotalCount = 0;
@@ -519,7 +800,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
TotalCount++;
});
- ZEN_DEBUG("PUT - '{}/{}' {} '{}' attachments '{}/{}' (valid/total)",
+ ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total)",
+ Ref.Namespace,
Ref.BucketSegment,
Ref.HashKey,
NiceBytes(Body.Size()),
@@ -528,13 +810,14 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
ValidAttachments.size());
Body.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
+ m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body});
const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size());
if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ .Namespace = Ref.Namespace,
.Key = {Ref.BucketSegment, Ref.HashKey},
.ValueContentIds = std::move(ValidAttachments)});
}
@@ -547,10 +830,10 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (!Package.TryLoad(Body))
{
- ZEN_WARN("PUT - '{}/{}' '{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey, ToString(ContentType));
+ ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, invalid package", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(ContentType));
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv);
}
- CachePolicy Policy = PolicyFromURL;
+ CachePolicy Policy = PolicyFromUrl;
CbObject CacheRecord = Package.GetObject();
AttachmentCount Count;
@@ -577,7 +860,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
}
else
{
- ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ Ref.Namespace,
Ref.BucketSegment,
Ref.HashKey,
ToString(HttpContentType::kCbPackage),
@@ -598,7 +882,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv);
}
- ZEN_DEBUG("PUT - '{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)",
+ ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)",
+ Ref.Namespace,
Ref.BucketSegment,
Ref.HashKey,
NiceBytes(Body.GetSize()),
@@ -611,13 +896,14 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
+ m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
const bool IsPartialRecord = Count.Valid != Count.Total;
if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
+ .Namespace = Ref.Namespace,
.Key = {Ref.BucketSegment, Ref.HashKey},
.ValueContentIds = std::move(ValidAttachments)});
}
@@ -631,16 +917,16 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
}
void
-HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
+HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
{
switch (Request.RequestVerb())
{
case HttpVerb::kHead:
case HttpVerb::kGet:
- HandleGetCacheValue(Request, Ref, PolicyFromURL);
+ HandleGetCacheValue(Request, Ref, PolicyFromUrl);
break;
case HttpVerb::kPut:
- HandlePutCacheValue(Request, Ref, PolicyFromURL);
+ HandlePutCacheValue(Request, Ref, PolicyFromUrl);
break;
default:
break;
@@ -648,44 +934,56 @@ HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request,
}
void
-HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
+HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
{
+ Stopwatch Timer;
+
IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId);
bool InUpstreamCache = false;
- CachePolicy Policy = PolicyFromURL;
- const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote);
-
- if (QueryUpstream)
+ CachePolicy Policy = PolicyFromUrl;
{
- if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
- UpstreamResult.Success)
+ const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote);
+
+ if (QueryUpstream)
{
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
- {
- m_CidStore.AddChunk(Compressed);
- InUpstreamCache = true;
- }
- else
+ if (auto UpstreamResult = m_UpstreamCache.GetCacheValue(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
+ UpstreamResult.Success)
{
- ZEN_WARN("got uncompressed upstream cache value");
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
+ {
+ m_CidStore.AddChunk(Compressed);
+ InUpstreamCache = true;
+ }
+ else
+ {
+ ZEN_WARN("got uncompressed upstream cache value");
+ }
}
}
}
if (!Value)
{
- ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, ToString(Request.AcceptContentType()));
+ ZEN_DEBUG("MISS - '{}/{}/{}/{}' '{}' in {}",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Ref.ValueContentId,
+ ToString(Request.AcceptContentType()),
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
m_CacheStats.MissCount++;
return Request.WriteResponse(HttpResponseCode::NotFound);
}
- ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
+ ZEN_DEBUG("HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}",
+ Ref.Namespace,
Ref.BucketSegment,
Ref.HashKey,
Ref.ValueContentId,
NiceBytes(Value.Size()),
ToString(Value.GetContentType()),
- InUpstreamCache ? "UPSTREAM" : "LOCAL");
+ InUpstreamCache ? "UPSTREAM" : "LOCAL",
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
m_CacheStats.HitCount++;
if (InUpstreamCache)
@@ -704,10 +1002,12 @@ HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request,
}
void
-HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
+HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
{
// Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored
- ZEN_UNUSED(PolicyFromURL);
+ ZEN_UNUSED(PolicyFromUrl);
+
+ Stopwatch Timer;
IoBuffer Body = Request.ReadPayload();
@@ -734,85 +1034,21 @@ HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request,
CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed);
- ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})",
+ ZEN_DEBUG("PUT - '{}/{}/{}/{}' {} '{}' ({}) in {}",
+ Ref.Namespace,
Ref.BucketSegment,
Ref.HashKey,
Ref.ValueContentId,
NiceBytes(Body.Size()),
ToString(Body.GetContentType()),
- Result.New ? "NEW" : "OLD");
+ Result.New ? "NEW" : "OLD",
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK;
Request.WriteResponse(ResponseCode);
}
-bool
-HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& OutRef)
-{
- std::string_view Key = Request.RelativeUri();
- std::string_view::size_type BucketSplitOffset = Key.find_first_of('/');
-
- if (BucketSplitOffset == std::string_view::npos)
- {
- return false;
- }
-
- OutRef.BucketSegment = ToLower(Key.substr(0, BucketSplitOffset));
-
- if (!std::all_of(begin(OutRef.BucketSegment), end(OutRef.BucketSegment), [](const char c) { return std::isalnum(c); }))
- {
- return false;
- }
-
- std::string_view HashSegment;
- std::string_view ValueSegment;
-
- std::string_view::size_type ValueSplitOffset = Key.find_last_of('/');
-
- // We know there is a slash so no need to check for npos return
-
- if (ValueSplitOffset == BucketSplitOffset)
- {
- // Basic cache record lookup
- HashSegment = Key.substr(BucketSplitOffset + 1);
- }
- else
- {
- // Cache record + valueid lookup
- HashSegment = Key.substr(BucketSplitOffset + 1, ValueSplitOffset - BucketSplitOffset - 1);
- ValueSegment = Key.substr(ValueSplitOffset + 1);
- }
-
- if (HashSegment.size() != IoHash::StringLength)
- {
- return false;
- }
-
- if (!ValueSegment.empty() && ValueSegment.size() == IoHash::StringLength)
- {
- const bool IsOk = ParseHexBytes(ValueSegment.data(), ValueSegment.size(), OutRef.ValueContentId.Hash);
-
- if (!IsOk)
- {
- return false;
- }
- }
- else
- {
- OutRef.ValueContentId = IoHash::Zero;
- }
-
- const bool IsOk = ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash);
-
- if (!IsOk)
- {
- return false;
- }
-
- return true;
-}
-
void
HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
{
@@ -888,23 +1124,27 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
- std::string_view PolicyText = Params["DefaultPolicy"].AsString();
- DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::string_view PolicyText = Params["DefaultPolicy"].AsString();
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
std::vector<bool> Results;
for (CbFieldView RequestField : Params["Requests"sv])
{
CbObjectView RequestObject = RequestField.AsObjectView();
CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView();
CbObjectView KeyView = RecordObject["Key"sv].AsObjectView();
- CbFieldView BucketField = KeyView["Bucket"sv];
- CbFieldView HashField = KeyView["Hash"sv];
- CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash());
- if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty())
+
+ CacheKey Key;
+ if (!GetRpcRequestCacheKey(KeyView, Key))
{
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
- PutRequestData PutRequest{std::move(Key), RecordObject, std::move(Policy)};
+ PutRequestData PutRequest{*Namespace, std::move(Key), RecordObject, std::move(Policy)};
PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
@@ -967,7 +1207,8 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
}
else
{
- ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ Request.Namespace,
Request.Key.Bucket,
Request.Key.Hash,
ToString(HttpContentType::kCbPackage),
@@ -988,7 +1229,8 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
return PutResult::Invalid;
}
- ZEN_DEBUG("PUT - '{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)",
+ ZEN_DEBUG("PUT - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)",
+ Request.Namespace,
Request.Key.Bucket,
Request.Key.Hash,
NiceBytes(TransferredSize),
@@ -1000,14 +1242,16 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
CacheValue.Value = IoBuffer(Record.GetSize());
Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Key.Bucket, Request.Key.Hash, CacheValue);
+ m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue);
const bool IsPartialRecord = Count.Valid != Count.Total;
if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
{
- m_UpstreamCache.EnqueueUpstream(
- {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)});
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
+ .Namespace = Request.Namespace,
+ .Key = Request.Key,
+ .ValueContentIds = std::move(ValidAttachments)});
}
return PutResult::Success;
}
@@ -1040,8 +1284,13 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
bool UsedUpstream = false;
};
- std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
std::vector<RecordRequestData> Requests;
std::vector<size_t> UpstreamIndexes;
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
@@ -1069,14 +1318,13 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
RecordRequestData& Request = Requests.emplace_back();
CbObjectView RequestObject = RequestField.AsObjectView();
CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
- CbFieldView BucketField = KeyObject["Bucket"sv];
- CbFieldView HashField = KeyObject["Hash"sv];
- CacheKey& Key = Request.Upstream.Key;
- Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash());
- if (HashField.HasError() || Key.Bucket.empty())
+
+ CacheKey& Key = Request.Upstream.Key;
+ if (!GetRpcRequestCacheKey(KeyObject, Key))
{
return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
}
+
Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
const CacheRecordPolicy& Policy = Request.DownstreamPolicy;
@@ -1085,7 +1333,8 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
bool FoundLocalInvalid = false;
ZenCacheValue RecordCacheValue;
- if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, RecordCacheValue))
+ if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) &&
+ m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, RecordCacheValue))
{
Request.RecordCacheValue = std::move(RecordCacheValue.Value);
if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject)
@@ -1199,7 +1448,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
}
}
- const auto OnCacheRecordGetComplete = [this, &ParseValues](CacheRecordGetCompleteParams&& Params) {
+ const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues](CacheRecordGetCompleteParams&& Params) {
if (!Params.Record)
{
return;
@@ -1216,7 +1465,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
Request.RecordObject = ObjectBuffer;
if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal))
{
- m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}});
+ m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}});
}
ParseValues(Request);
Request.UsedUpstream = true;
@@ -1254,7 +1503,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
}
else
{
- ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'", Value.ContentId, Key.Bucket, Key.Hash);
+ ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}/{}'",
+ Value.ContentId,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash);
}
}
if (!Value.Exists && !EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
@@ -1269,7 +1522,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
}
};
- m_UpstreamCache.GetCacheRecords(UpstreamRequests, std::move(OnCacheRecordGetComplete));
+ m_UpstreamCache.GetCacheRecords(*Namespace, UpstreamRequests, std::move(OnCacheRecordGetComplete));
}
CbPackage ResponsePackage;
@@ -1291,7 +1544,8 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
}
}
- ZEN_DEBUG("HIT - '{}/{}' {}{}{}",
+ ZEN_DEBUG("HIT - '{}/{}/{}' {}{}{}",
+ *Namespace,
Key.Bucket,
Key.Hash,
NiceBytes(Request.RecordCacheValue.Size()),
@@ -1307,11 +1561,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query))
{
// If they requested no query, do not record this as a miss
- ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash);
+ ZEN_DEBUG("DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash);
}
else
{
- ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, Request.RecordObject ? ""sv : "(PARTIAL)"sv);
+ ZEN_DEBUG("MISS - '{}/{}/{}' {}", *Namespace, Key.Bucket, Key.Hash, Request.RecordObject ? ""sv : "(PARTIAL)"sv);
m_CacheStats.MissCount++;
}
}
@@ -1337,20 +1591,25 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheValues"sv);
- std::string_view PolicyText = Params["DefaultPolicy"].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::string_view PolicyText = Params["DefaultPolicy"].AsString();
+ CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
std::vector<bool> Results;
for (CbFieldView RequestField : Params["Requests"sv])
{
CbObjectView RequestObject = RequestField.AsObjectView();
CbObjectView KeyView = RequestObject["Key"sv].AsObjectView();
- CbFieldView BucketField = KeyView["Bucket"sv];
- CbFieldView HashField = KeyView["Hash"sv];
- CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash());
- if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty())
+
+ CacheKey Key;
+ if (!GetRpcRequestCacheKey(KeyView, Key))
{
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
+
PolicyText = RequestObject["Policy"sv].AsString();
CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment();
@@ -1373,21 +1632,22 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
{
IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
Value.SetContentType(ZenContentType::kCompressedBinary);
- m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = Value});
+ m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = Value});
TransferredSize = Chunk.GetCompressedSize();
}
Succeeded = true;
}
else
{
- ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}' FAILED, value is not compressed", Key.Bucket, Key.Hash, RawHash);
+ ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash);
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
}
else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
{
ZenCacheValue ExistingValue;
- if (m_CacheStore.Get(Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType()))
+ if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
+ IsCompressedBinary(ExistingValue.Value.GetContentType()))
{
Succeeded = true;
}
@@ -1397,10 +1657,15 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
{
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Key = Key});
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key});
}
Results.push_back(Succeeded);
- ZEN_DEBUG("PUTCACHEVALUES - '{}/{}' {}, '{}'", Key.Bucket, Key.Hash, NiceBytes(TransferredSize), Succeeded ? "Added"sv : "Invalid");
+ ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}'",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(TransferredSize),
+ Succeeded ? "Added"sv : "Invalid");
}
if (Results.empty())
{
@@ -1431,9 +1696,15 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
{
ZEN_TRACE_CPU("Z$::RpcGetCacheValues");
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
struct RequestData
{
CacheKey Key;
@@ -1444,18 +1715,21 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
+ std::vector<size_t> RemoteRequestIndexes;
+
for (CbFieldView RequestField : Params["Requests"sv])
{
+ Stopwatch Timer;
+
RequestData& Request = Requests.emplace_back();
CbObjectView RequestObject = RequestField.AsObjectView();
CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
- CbFieldView BucketField = KeyObject["Bucket"sv];
- CbFieldView HashField = KeyObject["Hash"sv];
- Request.Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash());
- if (BucketField.HasError() || HashField.HasError() || Request.Key.Bucket.empty())
+
+ if (!GetRpcRequestCacheKey(KeyObject, Request.Key))
{
return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
}
+
PolicyText = RequestObject["Policy"sv].AsString();
Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
@@ -1463,57 +1737,99 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
CachePolicy Policy = Request.Policy;
CompressedBuffer& Result = Request.Result;
- ZenCacheValue CacheValue;
- std::string_view Source;
+ ZenCacheValue CacheValue;
if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
{
- if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType()))
+ if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType()))
{
Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value));
- if (Result)
- {
- Source = "LOCAL"sv;
- }
- }
- }
- if (!Result && EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
- {
- GetUpstreamCacheResult UpstreamResult =
- m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kCompressedBinary);
- if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType()))
- {
- Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value));
- if (Result)
- {
- UpstreamResult.Value.SetContentType(ZenContentType::kCompressedBinary);
- Source = "UPSTREAM"sv;
- // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement
- // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive
- // for us to keep the data only on the upstream server.
- // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
- {
- m_CacheStore.Put(Key.Bucket, Key.Hash, ZenCacheValue{UpstreamResult.Value});
- }
- }
}
}
-
if (Result)
{
- ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}' {} ({})", Key.Bucket, Key.Hash, NiceBytes(Result.GetCompressed().GetSize()), Source);
+ ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(Result.GetCompressed().GetSize()),
+ "LOCAL"sv,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
m_CacheStats.HitCount++;
}
+ else if (EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
+ {
+ RemoteRequestIndexes.push_back(Requests.size() - 1);
+ }
else if (!EnumHasAnyFlags(Policy, CachePolicy::Query))
{
// If they requested no query, do not record this as a miss
- ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash);
+ ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash);
}
else
{
- ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}'", Key.Bucket, Key.Hash);
+ ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ "LOCAL"sv,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
m_CacheStats.MissCount++;
}
}
+
+ if (!RemoteRequestIndexes.empty())
+ {
+ std::vector<CacheChunkRequest> RequestedRecordsData;
+ std::vector<CacheChunkRequest*> CacheChunkRequests;
+ RequestedRecordsData.reserve(RemoteRequestIndexes.size());
+ CacheChunkRequests.reserve(RemoteRequestIndexes.size());
+ for (size_t Index : RemoteRequestIndexes)
+ {
+ RequestData& Request = Requests[Index];
+ RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}});
+ CacheChunkRequests.push_back(&RequestedRecordsData.back());
+ }
+ Stopwatch Timer;
+ m_UpstreamCache.GetCacheValues(
+ *Namespace,
+ CacheChunkRequests,
+ [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) {
+ CacheChunkRequest& ChunkRequest = Params.Request;
+ if (Params.Value)
+ {
+ size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest);
+ size_t RequestIndex = RemoteRequestIndexes[RequestOffset];
+ RequestData& Request = Requests[RequestIndex];
+ Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value));
+ if (Request.Result && IsCompressedBinary(Params.Value.GetContentType()))
+ {
+ // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement
+ // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive
+ // for us to keep the data only on the upstream server.
+ // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
+ m_CacheStore.Put(*Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value});
+ ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
+ *Namespace,
+ ChunkRequest.Key.Bucket,
+ ChunkRequest.Key.Hash,
+ NiceBytes(Request.Result.GetCompressed().GetSize()),
+ "UPSTREAM"sv,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount++;
+ return;
+ }
+ }
+ ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
+ *Namespace,
+ ChunkRequest.Key.Bucket,
+ ChunkRequest.Key.Hash,
+ "UPSTREAM"sv,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ m_CacheStats.MissCount++;
+ });
+ }
+
if (Requests.empty())
{
return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
@@ -1596,6 +1912,7 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http
ZEN_TRACE_CPU("Z$::RpcGetCacheChunks");
+ std::string Namespace;
std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream
std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests
std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream
@@ -1605,27 +1922,28 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http
std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream
// Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests
- if (!ParseGetCacheChunksRequest(RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
+ if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
{
return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
}
// For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we
// have it locally, and otherwise append a request for the payload to UpstreamChunks
- GetLocalCacheRecords(RecordKeys, Records, RecordRequests, UpstreamChunks);
+ GetLocalCacheRecords(Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks);
// For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks
- GetLocalCacheValues(ValueRequests, UpstreamChunks);
+ GetLocalCacheValues(Namespace, ValueRequests, UpstreamChunks);
// Call GetCacheChunks on the upstream for any payloads we do not have locally
- GetUpstreamCacheChunks(UpstreamChunks, RequestKeys, Requests);
+ GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests);
// Send the payload and descriptive data about each chunk to the client
- WriteGetCacheChunksResponse(Requests, HttpRequest);
+ WriteGetCacheChunksResponse(Namespace, Requests, HttpRequest);
}
bool
-HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys,
+HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
std::vector<cache::detail::RecordBody>& Records,
std::vector<CacheChunkRequest>& RequestKeys,
std::vector<cache::detail::ChunkRequest>& Requests,
@@ -1637,11 +1955,20 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv);
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
- CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
- size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num());
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
+
+ std::optional<std::string> NamespaceText = GetRpcRequestNamespace(Params);
+ if (!NamespaceText)
+ {
+ ZEN_WARN("GetCacheChunks: Invalid namespace in ChunkRequest.");
+ return false;
+ }
+ Namespace = *NamespaceText;
+
+ CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
+ size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num());
// Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed,
// we will need to change the pointers to indexes to handle reallocations.
@@ -1660,12 +1987,10 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque
CbObjectView RequestObject = RequestView.AsObjectView();
CacheChunkRequest& RequestKey = RequestKeys.emplace_back();
ChunkRequest& Request = Requests.emplace_back();
- Request.Key = &RequestKey;
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
- CbFieldView HashField = KeyObject["Hash"sv];
- RequestKey.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash());
- if (RequestKey.Key.Bucket.empty() || HashField.HasError())
+ Request.Key = &RequestKey;
+ if (!GetRpcRequestCacheKey(KeyObject, Request.Key->Key))
{
ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest.");
return false;
@@ -1730,7 +2055,8 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque
}
void
-HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys,
+HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
std::vector<cache::detail::RecordBody>& Records,
std::vector<cache::detail::ChunkRequest*>& RecordRequests,
std::vector<CacheChunkRequest*>& OutUpstreamChunks)
@@ -1749,7 +2075,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>&
if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal))
{
ZenCacheValue CacheValue;
- if (m_CacheStore.Get(RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue))
+ if (m_CacheStore.Get(Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue))
{
Record.Exists = true;
Record.CacheValue = std::move(CacheValue.Value);
@@ -1766,7 +2092,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>&
if (!UpstreamRecordRequests.empty())
{
- const auto OnCacheRecordGetComplete = [this, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) {
+ const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) {
if (!Params.Record)
{
return;
@@ -1784,10 +2110,10 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>&
if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal))
{
- m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = Record.CacheValue});
+ m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue});
}
};
- m_UpstreamCache.GetCacheRecords(UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
+ m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
}
std::vector<CacheChunkRequest*> UpstreamPayloadRequests;
@@ -1871,7 +2197,8 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>&
}
void
-HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
std::vector<CacheChunkRequest*>& OutUpstreamChunks)
{
using namespace cache::detail;
@@ -1881,7 +2208,7 @@ HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::Chunk
if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
{
ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
+ if (m_CacheStore.Get(Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
{
if (IsCompressedBinary(CacheValue.Value.GetContentType()))
{
@@ -1915,7 +2242,8 @@ HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::Chunk
}
void
-HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks,
+HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Namespace,
+ std::vector<CacheChunkRequest*>& UpstreamChunks,
std::vector<CacheChunkRequest>& RequestKeys,
std::vector<cache::detail::ChunkRequest>& Requests)
{
@@ -1923,7 +2251,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest
if (!UpstreamChunks.empty())
{
- const auto OnCacheValueGetComplete = [this, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) {
+ const auto OnCacheValueGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) {
if (Params.RawHash == Params.RawHash.Zero)
{
return;
@@ -1950,7 +2278,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest
}
else
{
- m_CacheStore.Put(Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value});
+ m_CacheStore.Put(Namespace, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value});
}
}
if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
@@ -1967,12 +2295,13 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest
m_CacheStats.UpstreamHitCount++;
};
- m_UpstreamCache.GetCacheValues(UpstreamChunks, std::move(OnCacheValueGetComplete));
+ m_UpstreamCache.GetCacheValues(Namespace, UpstreamChunks, std::move(OnCacheValueGetComplete));
}
}
void
-HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests,
+HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest>& Requests,
zen::HttpServerRequest& HttpRequest)
{
using namespace cache::detail;
@@ -1997,7 +2326,8 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detai
Writer.AddInteger("RawSize"sv, Request.TotalSize);
}
- ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
+ ZEN_DEBUG("HIT - '{}/{}/{}/{}' {} '{}' ({})",
+ Namespace,
Request.Key->Key.Bucket,
Request.Key->Key.Hash,
Request.Key->ValueId,
@@ -2008,11 +2338,11 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detai
}
else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query))
{
- ZEN_DEBUG("SKIP - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId);
+ ZEN_DEBUG("SKIP - '{}/{}/{}/{}'", Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId);
}
else
{
- ZEN_DEBUG("MISS - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId);
+ ZEN_DEBUG("MISS - '{}/{}/{}/{}'", Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId);
m_CacheStats.MissCount++;
}
}
@@ -2081,4 +2411,94 @@ HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request)
Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
}
+#if ZEN_WITH_TESTS
+
+TEST_CASE("z$service.parse.relative.Uri")
+{
+ HttpRequestData LegacyBucketRequestBecomesNamespaceRequest;
+ CHECK(HttpRequestParseRelativeUri("test", LegacyBucketRequestBecomesNamespaceRequest));
+ CHECK(LegacyBucketRequestBecomesNamespaceRequest.Namespace == "test"sv);
+ CHECK(!LegacyBucketRequestBecomesNamespaceRequest.Bucket.has_value());
+ CHECK(!LegacyBucketRequestBecomesNamespaceRequest.HashKey.has_value());
+ CHECK(!LegacyBucketRequestBecomesNamespaceRequest.ValueContentId.has_value());
+
+ HttpRequestData LegacyHashKeyRequest;
+ CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", LegacyHashKeyRequest));
+ CHECK(LegacyHashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace);
+ CHECK(LegacyHashKeyRequest.Bucket == "test"sv);
+ CHECK(LegacyHashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv));
+ CHECK(!LegacyHashKeyRequest.ValueContentId.has_value());
+
+ HttpRequestData LegacyValueContentIdRequest;
+ CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789",
+ LegacyValueContentIdRequest));
+ CHECK(LegacyValueContentIdRequest.Namespace == ZenCacheStore::DefaultNamespace);
+ CHECK(LegacyValueContentIdRequest.Bucket == "test"sv);
+ CHECK(LegacyValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv));
+ CHECK(LegacyValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv));
+
+ HttpRequestData V2DefaultNamespaceRequest;
+ CHECK(HttpRequestParseRelativeUri("ue4.ddc", V2DefaultNamespaceRequest));
+ CHECK(V2DefaultNamespaceRequest.Namespace == "ue4.ddc");
+ CHECK(!V2DefaultNamespaceRequest.Bucket.has_value());
+ CHECK(!V2DefaultNamespaceRequest.HashKey.has_value());
+ CHECK(!V2DefaultNamespaceRequest.ValueContentId.has_value());
+
+ HttpRequestData V2NamespaceRequest;
+ CHECK(HttpRequestParseRelativeUri("nicenamespace", V2NamespaceRequest));
+ CHECK(V2NamespaceRequest.Namespace == "nicenamespace"sv);
+ CHECK(!V2NamespaceRequest.Bucket.has_value());
+ CHECK(!V2NamespaceRequest.HashKey.has_value());
+ CHECK(!V2NamespaceRequest.ValueContentId.has_value());
+
+ HttpRequestData V2BucketRequestWithDefaultNamespace;
+ CHECK(HttpRequestParseRelativeUri("ue4.ddc/test", V2BucketRequestWithDefaultNamespace));
+ CHECK(V2BucketRequestWithDefaultNamespace.Namespace == "ue4.ddc");
+ CHECK(V2BucketRequestWithDefaultNamespace.Bucket == "test"sv);
+ CHECK(!V2BucketRequestWithDefaultNamespace.HashKey.has_value());
+ CHECK(!V2BucketRequestWithDefaultNamespace.ValueContentId.has_value());
+
+ HttpRequestData V2BucketRequestWithNamespace;
+ CHECK(HttpRequestParseRelativeUri("nicenamespace/test", V2BucketRequestWithNamespace));
+ CHECK(V2BucketRequestWithNamespace.Namespace == "nicenamespace"sv);
+ CHECK(V2BucketRequestWithNamespace.Bucket == "test"sv);
+ CHECK(!V2BucketRequestWithNamespace.HashKey.has_value());
+ CHECK(!V2BucketRequestWithNamespace.ValueContentId.has_value());
+
+ HttpRequestData V2HashKeyRequest;
+ CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", V2HashKeyRequest));
+ CHECK(V2HashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace);
+ CHECK(V2HashKeyRequest.Bucket == "test");
+ CHECK(V2HashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv));
+ CHECK(!V2HashKeyRequest.ValueContentId.has_value());
+
+ HttpRequestData V2ValueContentIdRequest;
+ CHECK(
+ HttpRequestParseRelativeUri("nicenamespace/test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789",
+ V2ValueContentIdRequest));
+ CHECK(V2ValueContentIdRequest.Namespace == "nicenamespace"sv);
+ CHECK(V2ValueContentIdRequest.Bucket == "test"sv);
+ CHECK(V2ValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv));
+ CHECK(V2ValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv));
+
+ HttpRequestData Invalid;
+ CHECK(!HttpRequestParseRelativeUri("", Invalid));
+ CHECK(!HttpRequestParseRelativeUri("/", Invalid));
+ CHECK(!HttpRequestParseRelativeUri("bad\2_namespace", Invalid));
+ CHECK(!HttpRequestParseRelativeUri("nice/\2\1bucket", Invalid));
+ CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789a", Invalid));
+ CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcdef1234", Invalid));
+ CHECK(!HttpRequestParseRelativeUri("namespace/bucket/pppppppp89abcdef12340123456789abcdef1234", Invalid));
+ CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcd", Invalid));
+ CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/ppppppppdef12345678956789abcdef123456789",
+ Invalid));
+}
+
+#endif
+
+void
+z$service_forcelink()
+{
+}
+
} // namespace zen
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 00c4260aa..890a2ebab 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -80,6 +80,7 @@ public:
private:
struct CacheRef
{
+ std::string Namespace;
std::string BucketSegment;
IoHash HashKey;
IoHash ValueContentId;
@@ -98,26 +99,27 @@ private:
Invalid,
};
- [[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef);
- void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
- void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
- void HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
- void HandleCacheValueRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
- void HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
- void HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL);
- void HandleRpcRequest(zen::HttpServerRequest& Request);
- void HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest);
- void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
- void HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest);
- void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
- void HandleRpcGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
- void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
- virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
- virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;
- PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
+ void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandleCacheValueRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandleRpcRequest(zen::HttpServerRequest& Request);
+ void HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest);
+ void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
+ void HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest);
+ void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
+ void HandleRpcGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
+ void HandleCacheNamespaceRequest(zen::HttpServerRequest& Request, std::string_view Namespace);
+ void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket);
+ virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
+ virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;
+ PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
/** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
- bool ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys,
+ bool ParseGetCacheChunksRequest(std::string& Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
std::vector<cache::detail::RecordBody>& Records,
std::vector<CacheChunkRequest>& RequestKeys,
std::vector<cache::detail::ChunkRequest>& Requests,
@@ -125,18 +127,24 @@ private:
std::vector<cache::detail::ChunkRequest*>& ValueRequests,
CbObjectView RpcRequest);
/** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */
- void GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys,
+ void GetLocalCacheRecords(std::string_view Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
std::vector<cache::detail::RecordBody>& Records,
std::vector<cache::detail::ChunkRequest*>& RecordRequests,
std::vector<CacheChunkRequest*>& OutUpstreamChunks);
/** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */
- void GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks);
+ void GetLocalCacheValues(std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks);
/** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */
- void GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks,
+ void GetUpstreamCacheChunks(std::string_view Namespace,
+ std::vector<CacheChunkRequest*>& UpstreamChunks,
std::vector<CacheChunkRequest>& RequestKeys,
std::vector<cache::detail::ChunkRequest>& Requests);
/** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
- void WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests, zen::HttpServerRequest& HttpRequest);
+ void WriteGetCacheChunksResponse(std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest>& Requests,
+ zen::HttpServerRequest& HttpRequest);
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
@@ -159,4 +167,6 @@ IsCompressedBinary(ZenContentType Type)
return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary;
}
+void z$service_forcelink();
+
} // namespace zen
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 738e4c1fd..da948fd72 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -14,13 +14,13 @@
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/string.h>
-#include <zencore/testing.h>
-#include <zencore/testutils.h>
#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zenstore/cidstore.h>
+#include <xxhash.h>
+
#if ZEN_PLATFORM_WINDOWS
# include <zencore/windows.h>
#endif
@@ -30,10 +30,183 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <gsl/gsl-lite.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
+# include <random>
+#endif
+
//////////////////////////////////////////////////////////////////////////
namespace zen {
+namespace {
+
+#pragma pack(push)
+#pragma pack(1)
+
+ struct CacheBucketIndexHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
+ static constexpr uint32_t CurrentVersion = 1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t PayloadAlignment = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+ };
+
+ static_assert(sizeof(CacheBucketIndexHeader) == 32);
+
+ struct LegacyDiskLocation
+ {
+ inline LegacyDiskLocation() = default;
+
+ inline LegacyDiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
+ : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
+ , LowerSize(ValueSize & 0xFFFFffff)
+ , IndexDataSize(IndexSize)
+ {
+ }
+
+ static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
+ static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize)
+ static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
+ static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file
+ static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary
+ static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value
+ static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format
+
+ static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
+
+ inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
+ inline uint64_t Size() const { return LowerSize; }
+ inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
+ inline ZenContentType GetContentType() const
+ {
+ ZenContentType ContentType = ZenContentType::kBinary;
+
+ if (IsFlagSet(LegacyDiskLocation::kStructured))
+ {
+ ContentType = ZenContentType::kCbObject;
+ }
+
+ if (IsFlagSet(LegacyDiskLocation::kCompressed))
+ {
+ ContentType = ZenContentType::kCompressedBinary;
+ }
+
+ return ContentType;
+ }
+ inline uint64_t Flags() const { return OffsetAndFlags & kFlagsMask; }
+
+ private:
+ uint64_t OffsetAndFlags = 0;
+ uint32_t LowerSize = 0;
+ uint32_t IndexDataSize = 0;
+ };
+
+ struct LegacyDiskIndexEntry
+ {
+ IoHash Key;
+ LegacyDiskLocation Location;
+ };
+
+#pragma pack(pop)
+
+ static_assert(sizeof(LegacyDiskIndexEntry) == 36);
+
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".slog";
+ const char* LegacyDataExtension = ".sobs";
+
+ std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + IndexExtension);
+ }
+
+ std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + ".tmp" + IndexExtension);
+ }
+
+ std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ return BucketDir / (BucketName + LogExtension);
+ }
+
+ std::filesystem::path GetLegacyLogPath(const std::filesystem::path& BucketDir)
+ {
+ return BucketDir / (std::string("zen") + LogExtension);
+ }
+
+ std::filesystem::path GetLegacyDataPath(const std::filesystem::path& BucketDir)
+ {
+ return BucketDir / (std::string("zen") + LegacyDataExtension);
+ }
+
+ bool ValidateLegacyEntry(const LegacyDiskIndexEntry& Entry, std::string& OutReason)
+ {
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.Flags() & ~(LegacyDiskLocation::kStandaloneFile | LegacyDiskLocation::kStructured |
+ LegacyDiskLocation::kTombStone | LegacyDiskLocation::kCompressed))
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.Flags(), Entry.Key.ToHexString());
+ return false;
+ }
+ if (!Entry.Location.IsFlagSet(LegacyDiskLocation::kTombStone))
+ {
+ return true;
+ }
+ uint64_t Size = Entry.Location.Size();
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+ }
+
+ bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason)
+ {
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.GetFlags() &
+ ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed))
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
+ {
+ return true;
+ }
+ uint64_t Size = Entry.Location.Size();
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+ }
+
+} // namespace
+
namespace fs = std::filesystem;
static CbObject
@@ -59,7 +232,7 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
WriteFile(Path, Object.GetBuffer().AsIoBuffer());
}
-ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir)
+ZenCacheNamespace::ZenCacheNamespace(CasGc& Gc, const std::filesystem::path& RootDir)
: GcStorage(Gc)
, GcContributor(Gc)
, m_RootDir(RootDir)
@@ -75,12 +248,12 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir)
#endif
}
-ZenCacheStore::~ZenCacheStore()
+ZenCacheNamespace::~ZenCacheNamespace()
{
}
bool
-ZenCacheStore::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
+ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
{
ZEN_TRACE_CPU("Z$::Get");
@@ -118,7 +291,7 @@ ZenCacheStore::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheVal
}
void
-ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
+ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
{
ZEN_TRACE_CPU("Z$::Put");
@@ -154,7 +327,7 @@ ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCa
}
bool
-ZenCacheStore::DropBucket(std::string_view Bucket)
+ZenCacheNamespace::DropBucket(std::string_view Bucket)
{
ZEN_INFO("dropping bucket '{}'", Bucket);
@@ -170,13 +343,13 @@ ZenCacheStore::DropBucket(std::string_view Bucket)
}
void
-ZenCacheStore::Flush()
+ZenCacheNamespace::Flush()
{
m_DiskLayer.Flush();
}
void
-ZenCacheStore::Scrub(ScrubContext& Ctx)
+ZenCacheNamespace::Scrub(ScrubContext& Ctx)
{
if (m_LastScrubTime == Ctx.ScrubTimestamp())
{
@@ -190,11 +363,11 @@ ZenCacheStore::Scrub(ScrubContext& Ctx)
}
void
-ZenCacheStore::GatherReferences(GcContext& GcCtx)
+ZenCacheNamespace::GatherReferences(GcContext& GcCtx)
{
Stopwatch Timer;
- const auto Guard = MakeGuard(
- [this, &Timer] { ZEN_INFO("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+ const auto Guard =
+ MakeGuard([&] { ZEN_INFO("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
access_tracking::AccessTimes AccessTimes;
m_MemLayer.GatherAccessTimes(AccessTimes);
@@ -204,14 +377,14 @@ ZenCacheStore::GatherReferences(GcContext& GcCtx)
}
void
-ZenCacheStore::CollectGarbage(GcContext& GcCtx)
+ZenCacheNamespace::CollectGarbage(GcContext& GcCtx)
{
m_MemLayer.Reset();
m_DiskLayer.CollectGarbage(GcCtx);
}
GcStorageSize
-ZenCacheStore::StorageSize() const
+ZenCacheNamespace::StorageSize() const
{
return {.DiskSize = m_DiskLayer.TotalSize(), .MemorySize = m_MemLayer.TotalSize()};
}
@@ -425,6 +598,8 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
{
using namespace std::literals;
+ m_BlocksBasePath = BucketDir / "blocks";
+
CreateDirectories(BucketDir);
std::filesystem::path ManifestPath{BucketDir / "zen_manifest"};
@@ -470,48 +645,465 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
}
void
-ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew)
+ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
{
- m_BucketDir = BucketDir;
+ ZEN_INFO("write store snapshot for '{}'", m_BucketDir / m_BucketName);
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}",
+ m_BucketDir / m_BucketName,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
- uint64_t MaxFileOffset = 0;
- uint64_t InvalidEntryCount = 0;
- m_SobsCursor = 0;
- m_TotalSize = 0;
+ namespace fs = std::filesystem;
- m_Index.clear();
+ fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+ fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName);
+
+ // Move index away, we keep it if something goes wrong
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(STmpIndexPath);
+ }
+ if (fs::is_regular_file(IndexPath))
+ {
+ fs::rename(IndexPath, STmpIndexPath);
+ }
+
+ try
+ {
+ m_SlogFile.Flush();
- std::filesystem::path SobsPath{BucketDir / "zen.sobs"};
- std::filesystem::path SlogPath{BucketDir / "zen.slog"};
+ // Write the current state of the location map to a new index state
+ uint64_t LogCount = 0;
+ std::vector<DiskIndexEntry> Entries;
- m_SobsFile.Open(SobsPath, IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite);
- m_SlogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
+ {
+ Entries.resize(m_Index.size());
- m_SlogFile.Replay(
- [&](const DiskIndexEntry& Entry) {
- if (Entry.Key == IoHash::Zero)
+ uint64_t EntryIndex = 0;
+ for (auto& Entry : m_Index)
{
- ++InvalidEntryCount;
+ DiskIndexEntry& IndexEntry = Entries[EntryIndex++];
+ IndexEntry.Key = Entry.first;
+ IndexEntry.Location = Entry.second.Location;
}
- else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
+
+ LogCount = m_SlogFile.GetLogCount();
+ }
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
+ CacheBucketIndexHeader Header = {.EntryCount = Entries.size(),
+ .LogPosition = LogCount,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
+
+ Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
+
+ ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
+ ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.Close();
+ EntryCount = Entries.size();
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
+
+ // Restore any previous snapshot
+
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(IndexPath);
+ fs::rename(STmpIndexPath, IndexPath);
+ }
+ }
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(STmpIndexPath);
+ }
+}
+
+uint64_t
+ZenCacheDiskLayer::CacheBucket::ReadIndexFile()
+{
+ std::vector<DiskIndexEntry> Entries;
+ std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' index containing #{} entries in {}",
+ m_BucketDir / m_BucketName,
+ Entries.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(CacheBucketIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
+ CacheBucketIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && (Header.Version == CacheBucketIndexHeader::CurrentVersion) &&
+ (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
+ (Header.EntryCount <= ExpectedEntryCount))
{
- m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ Entries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
+ m_PayloadAlignment = Header.PayloadAlignment;
+
+ std::string InvalidEntryReason;
+ for (const DiskIndexEntry& Entry : Entries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
+ }
+ m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
+ }
+
+ return Header.LogPosition;
}
else
{
- m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
- m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed);
+ ZEN_WARN("skipping invalid index file '{}'", IndexPath);
}
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size());
- },
- 0);
+ }
+ }
+ return 0;
+}
+uint64_t
+ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount)
+{
+ std::vector<DiskIndexEntry> Entries;
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ if (std::filesystem::is_regular_file(LogPath))
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' log containing #{} entries in {}", LogPath, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (CasLog.Initialize())
+ {
+ uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+ uint64_t ReadCount = EntryCount - SkipEntryCount;
+ m_Index.reserve(ReadCount);
+ uint64_t InvalidEntryCount = 0;
+ CasLog.Replay(
+ [&](const DiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Location.Flags & DiskLocation::kTombStone)
+ {
+ m_Index.erase(Record.Key);
+ return;
+ }
+ if (!ValidateEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ m_Index.insert_or_assign(Record.Key, IndexEntry(Record.Location, GcClock::TickCount()));
+ },
+ SkipEntryCount);
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
+ }
+ }
+ }
+ return 0;
+};
+
+uint64_t
+ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource)
+{
+ std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir);
+
+ if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0)
+ {
+ return 0;
+ }
+
+ ZEN_INFO("migrating store {}", m_BucketDir / m_BucketName);
+
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir);
+
+ uint64_t MigratedChunkCount = 0;
+ uint32_t MigratedBlockCount = 0;
+ Stopwatch MigrationTimer;
+ uint64_t TotalSize = 0;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})",
+ m_BucketDir / m_BucketName,
+ MigratedChunkCount,
+ MigratedBlockCount,
+ NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()),
+ NiceBytes(TotalSize));
+ });
+
+ uint64_t BlockFileSize = 0;
+ {
+ BasicFile BlockFile;
+ BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
+ BlockFileSize = BlockFile.FileSize();
+ }
+
+ std::unordered_map<IoHash, LegacyDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
+ uint64_t InvalidEntryCount = 0;
+
+ size_t BlockChunkCount = 0;
+ TCasLogFile<LegacyDiskIndexEntry> LegacyCasLog;
+ LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead);
+ {
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' legacy log containing #{} entries in {}",
+ LegacyLogPath,
+ LegacyDiskIndex.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ if (LegacyCasLog.Initialize())
+ {
+ LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount());
+ LegacyCasLog.Replay(
+ [&](const LegacyDiskIndexEntry& Record) {
+ if (Record.Location.IsFlagSet(LegacyDiskLocation::kTombStone))
+ {
+ LegacyDiskIndex.erase(Record.Key);
+ return;
+ }
+ std::string InvalidEntryReason;
+ if (!ValidateLegacyEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ if (m_Index.contains(Record.Key))
+ {
+ return;
+ }
+ LegacyDiskIndex[Record.Key] = Record;
+ },
+ 0);
+
+ std::vector<IoHash> BadEntries;
+ for (const auto& Entry : LegacyDiskIndex)
+ {
+ const LegacyDiskIndexEntry& Record(Entry.second);
+ if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
+ {
+ continue;
+ }
+ if (Record.Location.Offset() + Record.Location.Size() <= BlockFileSize)
+ {
+ BlockChunkCount++;
+ continue;
+ }
+ ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath);
+ BadEntries.push_back(Entry.first);
+ }
+ for (const IoHash& BadHash : BadEntries)
+ {
+ LegacyDiskIndex.erase(BadHash);
+ }
+ InvalidEntryCount += BadEntries.size();
+ }
+ }
if (InvalidEntryCount)
{
- ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, SlogPath);
+ ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
}
- m_SobsCursor = (MaxFileOffset + 15) & ~15;
+ if (LegacyDiskIndex.empty())
+ {
+ LegacyCasLog.Close();
+ if (CleanSource)
+ {
+ // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find
+ // a manifest and crashes on startup if they don't.
+ // In order to not break startup when switching back an older version, lets just reset
+ // the legacy data files to zero length.
+
+ BasicFile LegacyLog;
+ LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySobs;
+ LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
+ }
+ return 0;
+ }
+
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ CreateDirectories(LogPath.parent_path());
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash;
+ std::vector<BlockStoreLocation> ChunkLocations;
+ ChunkIndexToChunkHash.reserve(BlockChunkCount);
+ ChunkLocations.reserve(BlockChunkCount);
+
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(LegacyDiskIndex.size() - BlockChunkCount);
+
+ for (const auto& Entry : LegacyDiskIndex)
+ {
+ const IoHash& ChunkHash = Entry.first;
+ const LegacyDiskLocation& Location = Entry.second.Location;
+ if (Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
+ {
+ uint8_t Flags = 0xff & (Location.Flags() >> 56);
+ DiskLocation NewLocation = DiskLocation(Location.Size(), Flags);
+ LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation});
+ continue;
+ }
+ size_t ChunkIndex = ChunkLocations.size();
+ ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.Offset(), .Size = Location.Size()});
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
+ TotalSize += Location.Size();
+ }
+ for (const DiskIndexEntry& Entry : LogEntries)
+ {
+ m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
+ }
+ CasLog.Append(LogEntries);
+
+ m_BlockStore.Split(
+ ChunkLocations,
+ LegacyDataPath,
+ m_BlocksBasePath,
+ MaxBlockSize,
+ BlockStoreDiskLocation::MaxBlockIndex + 1,
+ m_PayloadAlignment,
+ CleanSource,
+ [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount](
+ const BlockStore::MovedChunksArray& MovedChunks) {
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(MovedChunks.size());
+ for (const auto& Entry : MovedChunks)
+ {
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
+ const LegacyDiskLocation& OldLocation = OldEntry.Location;
+ uint8_t Flags = 0xff & (OldLocation.Flags() >> 56);
+ LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, Flags)});
+ }
+ for (const DiskIndexEntry& Entry : LogEntries)
+ {
+ m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
+ }
+ CasLog.Append(LogEntries);
+ CasLog.Flush();
+ if (CleanSource)
+ {
+ std::vector<LegacyDiskIndexEntry> LegacyLogEntries;
+ LegacyLogEntries.reserve(MovedChunks.size());
+ for (const auto& Entry : MovedChunks)
+ {
+ size_t ChunkIndex = Entry.first;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
+ const LegacyDiskLocation& OldLocation = OldEntry.Location;
+ LegacyDiskLocation NewLocation(OldLocation.Offset(),
+ OldLocation.Size(),
+ 0,
+ OldLocation.Flags() | LegacyDiskLocation::kTombStone);
+ LegacyLogEntries.push_back(LegacyDiskIndexEntry{.Key = ChunkHash, .Location = NewLocation});
+ }
+ LegacyCasLog.Append(LegacyLogEntries);
+ LegacyCasLog.Flush();
+ }
+ MigratedBlockCount++;
+ MigratedChunkCount += MovedChunks.size();
+ });
+
+ LegacyCasLog.Close();
+ CasLog.Close();
+
+ if (CleanSource)
+ {
+ // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find
+ // a manifest and crashes on startup if they don't.
+ // In order to not break startup when switching back an older version, lets just reset
+ // the legacy data files to zero length.
+
+ BasicFile LegacyLog;
+ LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySobs;
+ LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
+ }
+ return MigratedChunkCount;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew)
+{
+ m_BucketDir = BucketDir;
+
+ m_TotalSize = 0;
+
+ m_Index.clear();
+
+ std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir);
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+
+ if (IsNew)
+ {
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir);
+ fs::remove(LegacyLogPath);
+ fs::remove(LegacyDataPath);
+ fs::remove(LogPath);
+ fs::remove(IndexPath);
+ fs::remove_all(m_BlocksBasePath);
+ }
+
+ uint64_t LogPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(LogPosition);
+ uint64_t LegacyLogEntryCount = MigrateLegacyData(true);
+
+ CreateDirectories(m_BucketDir);
+
+ m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ std::vector<BlockStoreLocation> KnownLocations;
+ KnownLocations.reserve(m_Index.size());
+ for (const auto& Entry : m_Index)
+ {
+ const DiskLocation& Location = Entry.second.Location;
+ m_TotalSize.fetch_add(Location.Size(), std::memory_order::relaxed);
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ continue;
+ }
+ const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment);
+ KnownLocations.push_back(BlockLocation);
+ }
+
+ m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
+
+ if (IsNew || ((LogEntryCount + LegacyLogEntryCount) > 0))
+ {
+ MakeIndexSnapshot();
+ }
+ // TODO: should validate integrity of container files here
}
void
@@ -532,12 +1124,13 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& H
bool
ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue)
{
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment);
+
+ OutValue.Value = m_BlockStore.TryGetChunk(Location);
+ if (!OutValue.Value)
{
return false;
}
-
- OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size());
OutValue.Value.SetContentType(Loc.GetContentType());
return true;
@@ -562,23 +1155,6 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
return false;
}
-void
-ZenCacheDiskLayer::CacheBucket::DeleteStandaloneCacheValue(const DiskLocation& Loc,
- const IoHash& HashKey,
- const fs::path& Path,
- std::error_code& Ec)
-{
- ZEN_DEBUG("deleting standalone cache file '{}'", Path);
- fs::remove(Path, Ec);
-
- if (!Ec)
- {
- m_SlogFile.Append(DiskIndexEntry{.Key = HashKey, .Location = {0, Loc.Size(), 0, DiskLocation::kTombStone}});
- m_Index.erase(HashKey);
- m_TotalSize.fetch_sub(Loc.Size(), std::memory_order::relaxed);
- }
-}
-
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -588,23 +1164,21 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
RwLock::SharedLockScope _(m_IndexLock);
-
- if (auto It = m_Index.find(HashKey); It != m_Index.end())
+ auto It = m_Index.find(HashKey);
+ if (It == m_Index.end())
{
- IndexEntry& Entry = It.value();
- Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
-
- if (GetInlineCacheValue(Entry.Location, OutValue))
- {
- return true;
- }
-
+ return false;
+ }
+ IndexEntry& Entry = It.value();
+ Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
+ DiskLocation Location = Entry.Location;
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ // We don't need to hold the index lock when we read a standalone file
_.ReleaseNow();
-
- return GetStandaloneCacheValue(Entry.Location, HashKey, OutValue);
+ return GetStandaloneCacheValue(Location, HashKey, OutValue);
}
-
- return false;
+ return GetInlineCacheValue(Location, OutValue);
}
void
@@ -619,54 +1193,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
{
return PutStandaloneCacheValue(HashKey, Value);
}
- else
- {
- // Small object put
-
- uint64_t EntryFlags = 0;
-
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- EntryFlags |= DiskLocation::kStructured;
- }
- else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
- {
- EntryFlags |= DiskLocation::kCompressed;
- }
-
- RwLock::ExclusiveLockScope _(m_IndexLock);
-
- DiskLocation Loc(m_SobsCursor, Value.Value.Size(), 0, EntryFlags);
-
- m_SobsCursor = RoundUp(m_SobsCursor + Loc.Size(), 16);
-
- if (auto It = m_Index.find(HashKey); It == m_Index.end())
- {
- // Previously unknown object
- m_Index.insert({HashKey, {Loc, GcClock::TickCount()}});
- }
- else
- {
- // TODO: should check if write is idempotent and bail out if it is?
- // this would requiring comparing contents on disk unless we add a
- // content hash to the index entry
- IndexEntry& Entry = It.value();
- Entry.Location = Loc;
- Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
- }
-
- m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset());
- m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed);
- }
+ PutInlineCacheValue(HashKey, Value);
}
void
ZenCacheDiskLayer::CacheBucket::Drop()
{
- // TODO: add error handling
-
- m_SobsFile.Close();
+ m_BlockStore.Close();
m_SlogFile.Close();
DeleteDirectories(m_BucketDir);
}
@@ -674,11 +1207,10 @@ ZenCacheDiskLayer::CacheBucket::Drop()
void
ZenCacheDiskLayer::CacheBucket::Flush()
{
- RwLock::SharedLockScope _(m_IndexLock);
-
- m_SobsFile.Flush();
- m_SlogFile.Flush();
+ m_BlockStore.Flush();
+ RwLock::SharedLockScope _(m_IndexLock);
+ MakeIndexSnapshot();
SaveManifest();
}
@@ -724,20 +1256,22 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
ZenCacheValue Value;
- if (GetInlineCacheValue(Loc, Value))
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- // Validate contents
+ if (GetStandaloneCacheValue(Loc, HashKey, Value))
+ {
+ // Note: we cannot currently validate contents since we don't
+ // have a content hash!
+ continue;
+ }
}
- else if (GetStandaloneCacheValue(Loc, HashKey, Value))
+ else if (GetInlineCacheValue(Loc, Value))
{
- // Note: we cannot currently validate contents since we don't
- // have a content hash!
- }
- else
- {
- // Value not found
- BadKeys.push_back(HashKey);
+ // Validate contents
+ continue;
}
+ // Value not found
+ BadKeys.push_back(HashKey);
}
}
@@ -754,12 +1288,18 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
// Log a tombstone and delete the in-memory index for the bad entry
- const auto It = m_Index.find(BadKey);
- const DiskLocation& Location = It->second.Location;
- m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = {Location.Offset(), Location.Size(), 0, DiskLocation::kTombStone}});
+ const auto It = m_Index.find(BadKey);
+ DiskLocation Location = It->second.Location;
+ Location.Flags |= DiskLocation::kTombStone;
+ m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = Location});
m_Index.erase(BadKey);
}
}
+
+ // Let whomever it concerns know about the bad chunks. This could
+ // be used to invalidate higher level data structures more efficiently
+ // than a full validation pass might be able to do
+ Ctx.ReportBadCasChunks(BadKeys);
}
void
@@ -767,68 +1307,95 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences");
- Stopwatch Timer;
- const auto Guard = MakeGuard(
- [this, &Timer] { ZEN_INFO("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})",
+ m_BucketDir / m_BucketName,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs));
+ });
const GcClock::TimePoint ExpireTime =
GcCtx.MaxCacheDuration() == GcClock::Duration::max() ? GcClock::TimePoint::min() : GcCtx.Time() - GcCtx.MaxCacheDuration();
const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
- RwLock::SharedLockScope _(m_IndexLock);
-
- std::vector<IoHash> ValidKeys;
- std::vector<IoHash> ExpiredKeys;
- std::vector<IoHash> Cids;
- std::vector<IndexMap::value_type> Entries(m_Index.begin(), m_Index.end());
-
- std::sort(Entries.begin(), Entries.end(), [](const auto& LHS, const auto& RHS) {
- return LHS.second.LastAccess < RHS.second.LastAccess;
- });
+ IndexMap Index;
+ {
+ RwLock::SharedLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ Index = m_Index;
+ }
- const auto ValidIt = std::lower_bound(Entries.begin(), Entries.end(), ExpireTicks, [](const auto& Kv, auto Ticks) {
- const IndexEntry& Entry = Kv.second;
- return Entry.LastAccess < Ticks;
- });
+ std::vector<IoHash> ExpiredKeys;
+ ExpiredKeys.reserve(1024);
+ std::vector<IoHash> Cids;
Cids.reserve(1024);
- for (auto Kv = ValidIt; Kv != Entries.end(); ++Kv)
+ for (const auto& Entry : Index)
{
- const IoHash& Key = Kv->first;
- const DiskLocation& Loc = Kv->second.Location;
+ const IoHash& Key = Entry.first;
+ if (Entry.second.LastAccess < ExpireTicks)
+ {
+ ExpiredKeys.push_back(Key);
+ continue;
+ }
+
+ const DiskLocation& Loc = Entry.second.Location;
if (Loc.IsFlagSet(DiskLocation::kStructured))
{
- ZenCacheValue CacheValue;
- if (!GetInlineCacheValue(Loc, CacheValue))
+ if (Cids.size() > 1024)
{
- GetStandaloneCacheValue(Loc, Key, CacheValue);
+ GcCtx.ContributeCids(Cids);
+ Cids.clear();
}
- if (CacheValue.Value)
+ ZenCacheValue CacheValue;
{
- ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject);
- if (Cids.size() > 1024)
+ RwLock::SharedLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- GcCtx.ContributeCids(Cids);
- Cids.clear();
+ if (!GetStandaloneCacheValue(Loc, Key, CacheValue))
+ {
+ continue;
+ }
+ }
+ else if (!GetInlineCacheValue(Loc, CacheValue))
+ {
+ continue;
}
- CbObject Obj(SharedBuffer{CacheValue.Value});
- Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
}
+
+ ZEN_ASSERT(CacheValue.Value);
+ ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject);
+ CbObject Obj(SharedBuffer{CacheValue.Value});
+ Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
}
}
- ValidKeys.reserve(std::distance(ValidIt, Entries.end()));
- ExpiredKeys.reserve(std::distance(Entries.begin(), ValidIt));
-
- std::transform(ValidIt, Entries.end(), std::back_inserter(ValidKeys), [](const auto& Kv) { return Kv.first; });
- std::transform(Entries.begin(), ValidIt, std::back_inserter(ExpiredKeys), [](const auto& Kv) { return Kv.first; });
-
GcCtx.ContributeCids(Cids);
- GcCtx.ContributeCacheKeys(m_BucketName, std::move(ValidKeys), std::move(ExpiredKeys));
+ GcCtx.ContributeCacheKeys(m_BucketName, std::move(ExpiredKeys));
}
void
@@ -836,203 +1403,282 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage");
- Flush();
-
- RwLock::ExclusiveLockScope _(m_IndexLock);
-
- const uint64_t OldCount = m_Index.size();
- const uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
-
- ZEN_INFO("collecting garbage from z$ bucket '{}'", m_BucketDir);
-
- Stopwatch Timer;
- const auto Guard = MakeGuard([this, &Timer, &OldCount, &OldTotalSize] {
- const uint64_t NewCount = m_Index.size();
- const uint64_t NewTotalSize = m_TotalSize.load(std::memory_order::relaxed);
- ZEN_INFO("garbage collect from '{}' DONE after {}, collected {} ({}) chunks of total {} ({})",
- m_BucketDir,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- OldCount - NewCount,
- NiceBytes(OldTotalSize - NewTotalSize),
- OldCount,
- NiceBytes(OldTotalSize));
+ ZEN_INFO("collecting garbage from '{}'", m_BucketDir / m_BucketName);
+
+ Stopwatch TotalTimer;
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+ uint64_t TotalChunkCount = 0;
+ uint64_t DeletedSize = 0;
+ uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
+
+ uint64_t DeletedCount = 0;
+ uint64_t MovedCount = 0;
+
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO(
+ "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved "
+ "#{} "
+ "of #{} "
+ "entires ({}).",
+ m_BucketDir / m_BucketName,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs),
+ NiceBytes(DeletedSize),
+ DeletedCount,
+ MovedCount,
+ TotalChunkCount,
+ NiceBytes(OldTotalSize));
+ RwLock::SharedLockScope _(m_IndexLock);
SaveManifest();
});
- if (m_Index.empty())
+ m_SlogFile.Flush();
+
+ std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketName);
+ std::vector<IoHash> DeleteCacheKeys;
+ DeleteCacheKeys.reserve(ExpiredCacheKeys.size());
+ GcCtx.FilterCas(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) {
+ if (Keep)
+ {
+ return;
+ }
+ DeleteCacheKeys.push_back(ChunkHash);
+ });
+ if (DeleteCacheKeys.empty())
{
+ ZEN_INFO("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName);
return;
}
- auto AddEntries = [this](std::span<const IoHash> Keys, std::vector<IndexMap::value_type>& OutEntries) {
- for (const IoHash& Key : Keys)
+ std::vector<DiskIndexEntry> ExpiredStandaloneEntries;
+ IndexMap Index;
+ BlockStore::ReclaimSnapshotState BlockStoreState;
+ {
+ RwLock::SharedLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (m_Index.empty())
{
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- OutEntries.push_back(*It);
- }
+ ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName);
+ return;
}
- };
+ BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
- std::vector<IndexMap::value_type> ValidEntries;
- std::vector<IndexMap::value_type> ExpiredEntries;
-
- AddEntries(GcCtx.ValidCacheKeys(m_BucketName), ValidEntries);
- AddEntries(GcCtx.ExpiredCacheKeys(m_BucketName), ExpiredEntries);
-
- // Remove all standalone file(s)
- // NOTE: This can probably be made asynchronously
- {
- std::error_code Ec;
- ExtendablePathBuilder<256> Path;
+ SaveManifest();
+ Index = m_Index;
- for (const auto& Entry : ExpiredEntries)
+ for (const IoHash& Key : DeleteCacheKeys)
{
- const IoHash& Key = Entry.first;
- const DiskLocation& Loc = Entry.second.Location;
-
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ if (auto It = Index.find(Key); It != Index.end())
{
- Path.Reset();
- BuildPath(Path, Key);
-
- // NOTE: this will update index and log file
- DeleteStandaloneCacheValue(Loc, Key, Path.c_str(), Ec);
-
- if (Ec)
+ DiskIndexEntry Entry = {.Key = It->first, .Location = It->second.Location};
+ if (Entry.Location.Flags & DiskLocation::kStandaloneFile)
{
- ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason '{}'", Path.ToUtf8(), Ec.message());
- Ec.clear();
+ Entry.Location.Flags |= DiskLocation::kTombStone;
+ ExpiredStandaloneEntries.push_back(Entry);
}
}
}
+ if (GcCtx.IsDeletionMode())
+ {
+ for (const auto& Entry : ExpiredStandaloneEntries)
+ {
+ m_Index.erase(Entry.Key);
+ }
+ m_SlogFile.Append(ExpiredStandaloneEntries);
+ }
}
- if (GcCtx.CollectSmallObjects() && !ExpiredEntries.empty())
+ if (GcCtx.IsDeletionMode())
{
- // Naive GC implementation of small objects. Needs enough free
- // disk space to store intermediate sob container along side the
- // old container
-
- const auto ResetSobStorage = [this, &ValidEntries]() {
- m_SobsFile.Close();
- m_SlogFile.Close();
+ std::error_code Ec;
+ ExtendablePathBuilder<256> Path;
- const bool IsNew = true;
- m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite);
- m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
+ for (const auto& Entry : ExpiredStandaloneEntries)
+ {
+ const IoHash& Key = Entry.Key;
+ const DiskLocation& Loc = Entry.Location;
- m_SobsCursor = 0;
- m_TotalSize = 0;
- m_Index.clear();
+ Path.Reset();
+ BuildPath(Path, Key);
+ fs::path FilePath = Path.ToPath();
- for (const auto& Entry : ValidEntries)
{
- const IoHash& Key = Entry.first;
- const DiskLocation& Loc = Entry.second.Location;
+ RwLock::SharedLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (m_Index.contains(Key))
+ {
+ // Someone added it back, let the file on disk be
+ ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8());
+ continue;
+ }
+ __.ReleaseNow();
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(Key));
+ if (fs::is_regular_file(FilePath))
{
- m_SlogFile.Append({.Key = Key, .Location = Loc});
- m_Index.insert({Key, {Loc, GcClock::TickCount()}});
- m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed);
+ ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8());
+ fs::remove(FilePath, Ec);
}
}
- };
- uint64_t NewContainerSize{};
- for (const auto& Entry : ValidEntries)
- {
- const DiskLocation& Loc = Entry.second.Location;
-
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile) == false)
+ if (Ec)
{
- NewContainerSize += (Loc.Size() + sizeof(DiskLocation));
+ ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message());
+ Ec.clear();
+ DiskLocation RestoreLocation = Loc;
+ RestoreLocation.Flags &= ~DiskLocation::kTombStone;
+
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ if (m_Index.contains(Key))
+ {
+ continue;
+ }
+ m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation});
+ m_Index.insert({Key, {Loc, GcClock::TickCount()}});
+ m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed);
+ continue;
}
+ m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ DeletedSize += Entry.Location.Size();
+ DeletedCount++;
}
+ }
- if (NewContainerSize == 0)
- {
- ResetSobStorage();
- return;
- }
+ TotalChunkCount = Index.size();
- const uint64_t DiskSpaceMargin = (256 << 10);
+ std::vector<IoHash> TotalChunkHashes;
+ TotalChunkHashes.reserve(TotalChunkCount);
+ for (const auto& Entry : Index)
+ {
+ const DiskLocation& Location = Entry.second.Location;
- std::error_code Ec;
- DiskSpace Space = DiskSpaceInfo(m_BucketDir, Ec);
- if (Ec || Space.Free < NewContainerSize + DiskSpaceMargin)
+ if (Location.Flags & DiskLocation::kStandaloneFile)
{
- ZEN_WARN("garbage collect z$ bucket '{}' FAILED, not enough disk space {}/{} (required/free)",
- m_BucketDir,
- NiceBytes(NewContainerSize),
- NiceBytes(Space.Free));
- return;
+ continue;
}
+ TotalChunkHashes.push_back(Entry.first);
+ }
- std::filesystem::path TmpSobsPath{m_BucketDir / "zen.sobs.tmp"};
- std::filesystem::path TmpSlogPath{m_BucketDir / "zen.slog.tmp"};
-
- // Copy non expired sob(s) to temporary sob container
-
+ if (TotalChunkHashes.empty())
+ {
+ return;
+ }
+ TotalChunkCount = TotalChunkHashes.size();
+
+ std::vector<BlockStoreLocation> ChunkLocations;
+ BlockStore::ChunkIndexArray KeepChunkIndexes;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkIndexToChunkHash.reserve(TotalChunkCount);
+
+ GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
+ auto KeyIt = Index.find(ChunkHash);
+ const DiskLocation& DiskLocation = KeyIt->second.Location;
+ BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment);
+ size_t ChunkIndex = ChunkLocations.size();
+ ChunkLocations.push_back(Location);
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
+ if (Keep)
{
- BasicFile TmpSobs;
- TCasLogFile<DiskIndexEntry> TmpLog;
- uint64_t TmpCursor{};
- std::vector<uint8_t> Chunk;
+ KeepChunkIndexes.push_back(ChunkIndex);
+ }
+ });
- TmpSobs.Open(TmpSobsPath, BasicFile::Mode::kTruncate);
- TmpLog.Open(TmpSlogPath, CasLogFile::Mode::kTruncate);
+ size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size();
- for (const auto& Entry : ValidEntries)
- {
- const IoHash& Key = Entry.first;
- const DiskLocation& Loc = Entry.second.Location;
+ const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
+ if (!PerformDelete)
+ {
+ m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
+ uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed);
+ ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}",
+ m_BucketDir / m_BucketName,
+ DeleteCount,
+ 0, // NiceBytes(TotalSize - NewTotalSize),
+ TotalChunkCount,
+ NiceBytes(TotalSize));
+ return;
+ }
- DiskLocation NewLoc;
+ std::vector<IoHash> DeletedChunks;
+ m_BlockStore.ReclaimSpace(
+ BlockStoreState,
+ ChunkLocations,
+ KeepChunkIndexes,
+ m_PayloadAlignment,
+ false,
+ [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
+ for (const auto& Entry : MovedChunks)
+ {
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const DiskLocation& OldDiskLocation = Index[ChunkHash].Location;
+ LogEntries.push_back(
+ {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())});
+ }
+ for (const size_t ChunkIndex : RemovedChunks)
+ {
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const DiskLocation& OldDiskLocation = Index[ChunkHash].Location;
+ LogEntries.push_back({.Key = ChunkHash,
+ .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment),
+ m_PayloadAlignment,
+ OldDiskLocation.GetFlags() | DiskLocation::kTombStone)});
+ DeletedChunks.push_back(ChunkHash);
+ }
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- NewLoc = DiskLocation(0, Loc.Size(), 0, Loc.GetFlags());
- }
- else
+ m_SlogFile.Append(LogEntries);
+ m_SlogFile.Flush();
+ {
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ for (const DiskIndexEntry& Entry : LogEntries)
{
- Chunk.resize(Loc.Size());
- m_SobsFile.Read(Chunk.data(), Chunk.size(), Loc.Offset());
-
- NewLoc = DiskLocation(TmpCursor, Chunk.size(), 0, Loc.GetFlags());
- TmpSobs.Write(Chunk.data(), Chunk.size(), TmpCursor);
- TmpCursor = RoundUp(TmpCursor + Chunk.size(), 16);
+ if (Entry.Location.GetFlags() & DiskLocation::kTombStone)
+ {
+ m_Index.erase(Entry.Key);
+ uint64_t ChunkSize = Entry.Location.GetBlockLocation(m_PayloadAlignment).Size;
+ m_TotalSize.fetch_sub(ChunkSize);
+ continue;
+ }
+ m_Index[Entry.Key].Location = Entry.Location;
}
-
- TmpLog.Append(DiskIndexEntry{.Key = Key, .Location = NewLoc});
}
- }
-
- // Swap state
- try
- {
- fs::path SobsPath{m_BucketDir / "zen.sobs"};
- fs::path SlogPath{m_BucketDir / "zen.slog"};
-
- m_SobsFile.Close();
- m_SlogFile.Close();
-
- fs::remove(SobsPath);
- fs::remove(SlogPath);
-
- fs::rename(TmpSobsPath, SobsPath);
- fs::rename(TmpSlogPath, SlogPath);
+ },
+ [&]() { return GcCtx.CollectSmallObjects(); });
- const bool IsNew = false;
- OpenLog(m_BucketDir, IsNew);
- }
- catch (std::exception& Err)
- {
- ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what());
- ResetSobStorage();
- }
- }
+ GcCtx.DeletedCas(DeletedChunks);
}
void
@@ -1079,96 +1725,187 @@ ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& Ac
void
ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
{
- RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
-
- ExtendablePathBuilder<256> DataFilePath;
- BuildPath(DataFilePath, HashKey);
+ uint64_t NewFileSize = Value.Value.Size();
TemporaryFile DataFile;
std::error_code Ec;
DataFile.CreateTemporary(m_BucketDir.c_str(), Ec);
-
if (Ec)
{
- throw std::system_error(Ec, fmt::format("Failed to open temporary file for put at '{}'", m_BucketDir));
- }
+ throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir));
+ }
+
+ bool CleanUpTempFile = false;
+ auto __ = MakeGuard([&] {
+ if (CleanUpTempFile)
+ {
+ std::error_code Ec;
+ std::filesystem::remove(DataFile.GetPath(), Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'",
+ DataFile.GetPath(),
+ m_BucketDir,
+ Ec.message());
+ }
+ }
+ });
DataFile.WriteAll(Value.Value, Ec);
-
if (Ec)
{
- throw std::system_error(Ec, fmt::format("Failed to write payload ({} bytes) to file", NiceBytes(Value.Value.Size())));
+ throw std::system_error(Ec,
+ fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'",
+ NiceBytes(NewFileSize),
+ DataFile.GetPath().string(),
+ m_BucketDir));
}
- // Move file into place (atomically)
-
+ ExtendablePathBuilder<256> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
std::filesystem::path FsPath{DataFilePath.ToPath()};
- DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
-
- if (Ec)
+ // We retry to move the file since it can be held open for read.
+ // This happens if the server processes a Get request for the file or
+ // if we are busy sending the file upstream
+ int RetryCount = 4;
+ do
{
- int RetryCount = 3;
-
- do
+ Ec.clear();
{
- std::filesystem::path ParentPath = FsPath.parent_path();
- CreateDirectories(ParentPath);
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
- if (!Ec)
+ // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file
+ // will be disabled as the file handle has already been closed
+ CleanUpTempFile = Ec ? true : false;
+
+ if (Ec)
{
- break;
+ std::error_code ExistingEc;
+ uint64_t OldFileSize = std::filesystem::file_size(FsPath, ExistingEc);
+ if (!ExistingEc && (OldFileSize == NewFileSize))
+ {
+ ZEN_INFO(
+ "Failed to move temporary file '{}' to '{}' for '{}'. Target file has same size, assuming concurrent write of same "
+ "value, "
+ "move "
+ "failed with reason '{}'",
+ DataFile.GetPath(),
+ FsPath.string(),
+ m_BucketDir,
+ Ec.message());
+ return;
+ }
}
+ }
- std::error_code InnerEc;
- const uint64_t ExistingFileSize = std::filesystem::file_size(FsPath, InnerEc);
+ if (!Ec)
+ {
+ uint8_t EntryFlags = DiskLocation::kStandaloneFile;
- if (!InnerEc && ExistingFileSize == Value.Value.Size())
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ EntryFlags |= DiskLocation::kStructured;
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
{
- // Concurrent write of same value?
- return;
+ EntryFlags |= DiskLocation::kCompressed;
}
- // Semi arbitrary back-off
- zen::Sleep(1000 * RetryCount);
- } while (RetryCount--);
+ DiskLocation Loc(NewFileSize, EntryFlags);
+ IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount());
- if (Ec)
- {
- throw std::system_error(Ec, fmt::format("Failed to finalize file '{}'", DataFilePath.ToUtf8()));
+ uint64_t OldFileSize = 0;
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It == m_Index.end())
+ {
+ // Previously unknown object
+ m_Index.insert({HashKey, Entry});
+ }
+ else
+ {
+ // TODO: should check if write is idempotent and bail out if it is?
+ OldFileSize = It.value().Location.Size();
+ It.value() = Entry;
+ }
+
+ m_SlogFile.Append({.Key = HashKey, .Location = Loc});
+ if (OldFileSize <= NewFileSize)
+ {
+ m_TotalSize.fetch_add(NewFileSize - OldFileSize, std::memory_order::relaxed);
+ }
+ else
+ {
+ m_TotalSize.fetch_sub(OldFileSize - NewFileSize, std::memory_order::relaxed);
+ }
+ return;
}
- }
- // Update index
+ std::filesystem::path ParentPath = FsPath.parent_path();
+ if (!std::filesystem::is_directory(ParentPath))
+ {
+ Ec.clear();
+ std::filesystem::create_directories(ParentPath, Ec);
+ if (!Ec)
+ {
+ // Retry without sleep
+ continue;
+ }
+ throw std::system_error(
+ Ec,
+ fmt::format("Failed to create parent directory '{}' for file '{}' for put in '{}'", ParentPath, FsPath, m_BucketDir));
+ }
- uint64_t EntryFlags = DiskLocation::kStandaloneFile;
+ ZEN_INFO("Failed renaming temporary file '{}' to '{}' for put in '{}', pausing and retrying, reason '{}'",
+ DataFile.GetPath().string(),
+ FsPath.string(),
+ m_BucketDir,
+ Ec.message());
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- EntryFlags |= DiskLocation::kStructured;
- }
+ // Semi arbitrary back-off
+ zen::Sleep(200 * (5 - RetryCount)); // Sleep at most for a total of 3 seconds
+ } while (RetryCount-- > 0);
- RwLock::ExclusiveLockScope _(m_IndexLock);
+ throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir));
+}
- DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags);
- IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount());
+void
+ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
+{
+ uint8_t EntryFlags = 0;
- if (auto It = m_Index.find(HashKey); It == m_Index.end())
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
{
- // Previously unknown object
- m_Index.insert({HashKey, Entry});
+ EntryFlags |= DiskLocation::kStructured;
}
- else
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
{
- // TODO: should check if write is idempotent and bail out if it is?
- It.value() = Entry;
+ EntryFlags |= DiskLocation::kCompressed;
}
- m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed);
+ m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) {
+ DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags);
+ const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location};
+ m_SlogFile.Append(DiskIndexEntry);
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
+ {
+ // TODO: should check if write is idempotent and bail out if it is?
+ // this would requiring comparing contents on disk unless we add a
+ // content hash to the index entry
+ IndexEntry& Entry = It.value();
+ Entry.Location = Location;
+ Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
+ }
+ else
+ {
+ m_Index.insert({HashKey, {Location, GcClock::TickCount()}});
+ }
+ });
+ m_TotalSize.fetch_add(Value.Value.Size(), std::memory_order::relaxed);
}
//////////////////////////////////////////////////////////////////////////
@@ -1255,10 +1992,10 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
auto It = m_Buckets.try_emplace(BucketName, BucketName);
Bucket = &It.first->second;
- std::filesystem::path bucketPath = m_RootDir;
- bucketPath /= BucketName;
+ std::filesystem::path BucketPath = m_RootDir;
+ BucketPath /= BucketName;
- Bucket->OpenOrCreate(bucketPath);
+ Bucket->OpenOrCreate(BucketPath);
}
}
@@ -1273,49 +2010,23 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
void
ZenCacheDiskLayer::DiscoverBuckets()
{
- FileSystemTraversal Traversal;
- struct Visitor : public FileSystemTraversal::TreeVisitor
- {
- virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent,
- [[maybe_unused]] const path_view& File,
- [[maybe_unused]] uint64_t FileSize) override
- {
- }
-
- virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override
- {
- Dirs.push_back((decltype(Dirs)::value_type)(DirectoryName));
- return false;
- }
-
- std::vector<std::filesystem::path::string_type> Dirs;
- } Visit;
-
- Traversal.TraverseFileSystem(m_RootDir, Visit);
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent);
// Initialize buckets
RwLock::ExclusiveLockScope _(m_Lock);
- for (const auto& BucketName : Visit.Dirs)
+ for (const std::filesystem::path& BucketPath : DirContent.Directories)
{
+ std::string BucketName = PathToUtf8(BucketPath.stem());
// New bucket needs to be created
-
-#if ZEN_PLATFORM_WINDOWS
- std::string BucketName8 = WideToUtf8(BucketName);
-#else
- const auto& BucketName8 = BucketName;
-#endif
-
- if (auto It = m_Buckets.find(BucketName8); It != m_Buckets.end())
+ if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
{
}
else
{
- auto InsertResult = m_Buckets.try_emplace(BucketName8, BucketName8);
-
- std::filesystem::path BucketPath = m_RootDir;
- BucketPath /= BucketName8;
+ auto InsertResult = m_Buckets.try_emplace(BucketName, BucketName);
CacheBucket& Bucket = InsertResult.first->second;
@@ -1323,11 +2034,11 @@ ZenCacheDiskLayer::DiscoverBuckets()
if (Bucket.IsOk())
{
- ZEN_INFO("Discovered bucket '{}'", BucketName8);
+ ZEN_INFO("Discovered bucket '{}'", BucketName);
}
else
{
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName8, m_RootDir);
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
m_Buckets.erase(InsertResult.first);
}
@@ -1363,11 +2074,10 @@ void
ZenCacheDiskLayer::Flush()
{
std::vector<CacheBucket*> Buckets;
- Buckets.reserve(m_Buckets.size());
{
RwLock::SharedLockScope _(m_Lock);
-
+ Buckets.reserve(m_Buckets.size());
for (auto& Kv : m_Buckets)
{
Buckets.push_back(&Kv.second);
@@ -1416,6 +2126,176 @@ ZenCacheDiskLayer::TotalSize() const
return TotalSize;
}
+//////////////////////////// ZenCacheStore
+
+static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc";
+
+ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStorage(Gc), GcContributor(Gc)
+{
+ CreateDirectories(BasePath);
+
+ DirectoryContent DirContent;
+ GetDirectoryContent(BasePath, DirectoryContent::IncludeDirsFlag, DirContent);
+
+ std::vector<std::string> LegacyBuckets;
+ std::vector<std::string> Namespaces;
+ for (const std::filesystem::path& DirPath : DirContent.Directories)
+ {
+ std::string DirName = PathToUtf8(DirPath.stem());
+ if (DirName.starts_with(NamespaceDiskPrefix))
+ {
+ Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length()));
+ continue;
+ }
+ LegacyBuckets.push_back(DirName);
+ }
+
+ ZEN_INFO("Found #{} namespaces in '{}' and #{} legacy buckets", Namespaces.size(), BasePath, LegacyBuckets.size());
+
+ if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end())
+ {
+ // default (unspecified) and ue4-ddc namespace points to the same namespace instance
+
+ ZEN_INFO("Moving #{} legacy buckets to '{}' namespace", LegacyBuckets.size(), UE4DDCNamespaceName);
+
+ std::filesystem::path DefaultNamespaceFolder = BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName);
+ CreateDirectories(DefaultNamespaceFolder);
+
+ // Move any non-namespace folders into the default namespace folder
+ for (const std::string& DirName : LegacyBuckets)
+ {
+ std::filesystem::path LegacyFolder = BasePath / DirName;
+ std::filesystem::path NewPath = DefaultNamespaceFolder / DirName;
+ std::error_code Ec;
+ std::filesystem::rename(LegacyFolder, NewPath, Ec);
+ if (Ec)
+ {
+ ZEN_ERROR("Unable to move '{}' to '{}', reason '{}'", LegacyFolder, NewPath, Ec.message());
+ }
+ }
+ Namespaces.push_back(std::string(UE4DDCNamespaceName));
+ }
+
+ for (const std::string& NamespaceName : Namespaces)
+ {
+ m_Namespaces[NamespaceName] =
+ std::make_unique<ZenCacheNamespace>(Gc, BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName));
+ }
+}
+
+ZenCacheStore::~ZenCacheStore()
+{
+ m_Namespaces.clear();
+}
+
+bool
+ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ return Store->Get(Bucket, HashKey, OutValue);
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString());
+ return false;
+}
+
+void
+ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value)
+{
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ return Store->Put(Bucket, HashKey, Value);
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString());
+}
+
+bool
+ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket)
+{
+ if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
+ {
+ return Store->DropBucket(Bucket);
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}'", Namespace, Bucket);
+ return false;
+}
+
+void
+ZenCacheStore::Flush()
+{
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); });
+}
+
+void
+ZenCacheStore::Scrub(ScrubContext& Ctx)
+{
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); });
+}
+
+ZenCacheNamespace*
+ZenCacheStore::GetNamespace(std::string_view Namespace)
+{
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+ if (Namespace == DefaultNamespace)
+ {
+ if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+ }
+ return nullptr;
+}
+
+void
+ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const
+{
+ std::vector<std::pair<std::string, ZenCacheNamespace&> > Namespaces;
+ {
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ Namespaces.reserve(m_Namespaces.size());
+ for (const auto& Entry : m_Namespaces)
+ {
+ if (Entry.first == DefaultNamespace)
+ {
+ continue;
+ }
+ Namespaces.push_back({Entry.first, *Entry.second});
+ }
+ }
+ for (auto& Entry : Namespaces)
+ {
+ Callback(Entry.first, Entry.second);
+ }
+}
+
+void
+ZenCacheStore::GatherReferences(GcContext& GcCtx)
+{
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.GatherReferences(GcCtx); });
+}
+
+void
+ZenCacheStore::CollectGarbage(GcContext& GcCtx)
+{
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.CollectGarbage(GcCtx); });
+}
+
+GcStorageSize
+ZenCacheStore::StorageSize() const
+{
+ GcStorageSize Size;
+ IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) {
+ GcStorageSize StoreSize = Store.StorageSize();
+ Size.MemorySize += StoreSize.MemorySize;
+ Size.DiskSize += StoreSize.DiskSize;
+ });
+ return Size;
+}
+
//////////////////////////////////////////////////////////////////////////
#if ZEN_WITH_TESTS
@@ -1427,10 +2307,18 @@ namespace testutils {
IoBuffer CreateBinaryCacheValue(uint64_t Size)
{
- std::vector<uint32_t> Data(size_t(Size / sizeof(uint32_t)));
- std::generate(Data.begin(), Data.end(), [Idx = 0]() mutable { return Idx++; });
+ static std::random_device rd;
+ static std::mt19937 g(rd());
+
+ std::vector<uint8_t> Values;
+ Values.resize(Size);
+ for (size_t Idx = 0; Idx < Size; ++Idx)
+ {
+ Values[Idx] = static_cast<uint8_t>(Idx);
+ }
+ std::shuffle(Values.begin(), Values.end(), g);
- IoBuffer Buf(IoBuffer::Clone, Data.data(), Data.size() * sizeof(uint32_t));
+ IoBuffer Buf(IoBuffer::Clone, Values.data(), Values.size());
Buf.SetContentType(ZenContentType::kBinary);
return Buf;
};
@@ -1443,7 +2331,7 @@ TEST_CASE("z$.store")
CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const int kIterationCount = 100;
@@ -1496,8 +2384,8 @@ TEST_CASE("z$.size")
GcStorageSize CacheSize;
{
- CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256);
@@ -1516,8 +2404,8 @@ TEST_CASE("z$.size")
}
{
- CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const GcStorageSize SerializedSize = Zcs.StorageSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
@@ -1539,8 +2427,8 @@ TEST_CASE("z$.size")
GcStorageSize CacheSize;
{
- CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64);
@@ -1559,8 +2447,8 @@ TEST_CASE("z$.size")
}
{
- CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const GcStorageSize SerializedSize = Zcs.StorageSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
@@ -1597,9 +2485,9 @@ TEST_CASE("z$.gc")
};
{
- CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
- const auto Bucket = "teardrinker"sv;
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ const auto Bucket = "teardrinker"sv;
// Create a cache record
const IoHash Key = CreateKey(42);
@@ -1635,7 +2523,7 @@ TEST_CASE("z$.gc")
// Expect timestamps to be serialized
{
CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
std::vector<IoHash> Keep;
// Collect garbage with 1 hour max cache duration
@@ -1656,7 +2544,7 @@ TEST_CASE("z$.gc")
{
ScopedTemporaryDirectory TempDir;
CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const auto Bucket = "fortysixandtwo"sv;
const GcClock::TimePoint CurrentTime = GcClock::Now();
@@ -1704,7 +2592,7 @@ TEST_CASE("z$.gc")
{
ScopedTemporaryDirectory TempDir;
CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const auto Bucket = "rightintwo"sv;
const GcClock::TimePoint CurrentTime = GcClock::Now();
@@ -1737,6 +2625,7 @@ TEST_CASE("z$.gc")
GcCtx.MaxCacheDuration(std::chrono::minutes(2));
GcCtx.CollectSmallObjects(true);
+ Zcs.Flush();
Gc.CollectGarbage(GcCtx);
for (const auto& Key : Keys)
@@ -1751,6 +2640,502 @@ TEST_CASE("z$.gc")
}
}
+TEST_CASE("z$.legacyconversion")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ uint64_t ChunkSizes[] = {2041,
+ 1123,
+ 1223,
+ 1239,
+ 341,
+ 1412,
+ 912,
+ 774,
+ 341,
+ 431,
+ 554,
+ 1098,
+ 2048,
+ 339 + 64 * 1024,
+ 561 + 64 * 1024,
+ 16 + 64 * 1024,
+ 16 + 64 * 1024,
+ 2048,
+ 2048};
+ size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t);
+ size_t SingleBlockSize = 0;
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(ChunkCount);
+ for (uint64_t Size : ChunkSizes)
+ {
+ Chunks.push_back(testutils::CreateBinaryCacheValue(Size));
+ SingleBlockSize += Size;
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(ChunkCount);
+ for (const IoBuffer& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ CreateDirectories(TempDir.Path());
+
+ const std::string Bucket = "rightintwo";
+ {
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path());
+ const GcClock::TimePoint CurrentTime = GcClock::Now();
+
+ for (size_t i = 0; i < ChunkCount; i++)
+ {
+ Zcs.Put(Bucket, ChunkHashes[i], {.Value = Chunks[i]});
+ }
+
+ std::vector<IoHash> KeepChunks;
+ for (size_t i = 0; i < ChunkCount; i += 2)
+ {
+ KeepChunks.push_back(ChunkHashes[i]);
+ }
+ GcContext GcCtx(CurrentTime + std::chrono::hours(2));
+ GcCtx.MaxCacheDuration(std::chrono::minutes(2));
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepChunks);
+ Zcs.Flush();
+ Gc.CollectGarbage(GcCtx);
+ }
+ std::filesystem::path BucketDir = TempDir.Path() / Bucket;
+ std::filesystem::path BlocksBaseDir = BucketDir / "blocks";
+
+ std::filesystem::path CasPath = BlockStore ::GetBlockPath(BlocksBaseDir, 1);
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(BucketDir);
+ std::filesystem::remove(LegacyDataPath);
+ std::filesystem::rename(CasPath, LegacyDataPath);
+
+ std::vector<DiskIndexEntry> LogEntries;
+ std::filesystem::path IndexPath = GetIndexPath(BucketDir, Bucket);
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(CacheBucketIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
+ CacheBucketIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if (Header.Magic == CacheBucketIndexHeader::ExpectedMagic && Header.Version == CacheBucketIndexHeader::CurrentVersion &&
+ Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount)
+ {
+ LogEntries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
+ }
+ }
+ ObjectIndexFile.Close();
+ std::filesystem::remove(IndexPath);
+ }
+
+ std::filesystem::path LogPath = GetLogPath(BucketDir, Bucket);
+ {
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ LogEntries.reserve(CasLog.GetLogCount());
+ CasLog.Replay([&](const DiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0);
+ }
+ TCasLogFile<LegacyDiskIndexEntry> LegacyLog;
+ std::filesystem::path LegacylogPath = GetLegacyLogPath(BucketDir);
+ LegacyLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate);
+
+ for (const DiskIndexEntry& Entry : LogEntries)
+ {
+ uint64_t Size;
+ uint64_t Offset;
+ if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ Size = Entry.Location.Location.StandaloneSize;
+ Offset = 0;
+ }
+ else
+ {
+ BlockStoreLocation Location = Entry.Location.GetBlockLocation(16);
+ Size = Location.Size;
+ Offset = Location.Offset;
+ }
+ LegacyDiskLocation LegacyLocation(Offset, Size, 0, static_cast<uint64_t>(Entry.Location.Flags) << 56);
+ LegacyDiskIndexEntry LegacyEntry = {.Key = Entry.Key, .Location = LegacyLocation};
+ LegacyLog.Append(LegacyEntry);
+ }
+ LegacyLog.Close();
+
+ std::filesystem::remove_all(BlocksBaseDir);
+ std::filesystem::remove(LogPath);
+ std::filesystem::remove(IndexPath);
+
+ {
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path());
+
+ for (size_t i = 0; i < ChunkCount; i += 2)
+ {
+ ZenCacheValue Value;
+ CHECK(Zcs.Get(Bucket, ChunkHashes[i], Value));
+ CHECK(ChunkHashes[i] == IoHash::HashBuffer(Value.Value));
+ CHECK(!Zcs.Get(Bucket, ChunkHashes[i + 1], Value));
+ }
+ }
+}
+
+TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
+{
+ // for (uint32_t i = 0; i < 100; ++i)
+ {
+ ScopedTemporaryDirectory TempDir;
+
+ const uint64_t kChunkSize = 1048;
+ const int32_t kChunkCount = 8192;
+
+ struct Chunk
+ {
+ std::string Bucket;
+ IoBuffer Buffer;
+ };
+ std::unordered_map<IoHash, Chunk, IoHash::Hasher> Chunks;
+ Chunks.reserve(kChunkCount);
+
+ const std::string Bucket1 = "rightinone";
+ const std::string Bucket2 = "rightintwo";
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ while (true)
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ if (Chunks.contains(Hash))
+ {
+ continue;
+ }
+ Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
+ break;
+ }
+ while (true)
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ if (Chunks.contains(Hash))
+ {
+ continue;
+ }
+ Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
+ break;
+ }
+ }
+
+ CreateDirectories(TempDir.Path());
+
+ WorkerThreadPool ThreadPool(4);
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path());
+
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer});
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+ }
+
+ const uint64_t TotalSize = Zcs.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * Chunks.size(), TotalSize);
+
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
+ std::string Bucket = Chunk.second.Bucket;
+ IoHash ChunkHash = Chunk.first;
+ ZenCacheValue CacheValue;
+
+ CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
+ IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
+ CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+ }
+ std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes;
+ GcChunkHashes.reserve(Chunks.size());
+ for (const auto& Chunk : Chunks)
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ {
+ std::unordered_map<IoHash, Chunk, IoHash::Hasher> NewChunks;
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
+ }
+ {
+ IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
+ }
+ }
+
+ std::atomic<size_t> WorkCompleted = 0;
+ std::atomic_uint32_t AddedChunkCount = 0;
+ for (const auto& Chunk : NewChunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer});
+ AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ }
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (AddedChunkCount.load() < NewChunks.size())
+ {
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const auto& Chunk : NewChunks)
+ {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ }
+ std::vector<IoHash> KeepHashes;
+ KeepHashes.reserve(GcChunkHashes.size());
+ for (const auto& Entry : GcChunkHashes)
+ {
+ KeepHashes.push_back(Entry.first);
+ }
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepHashes);
+ Zcs.CollectGarbage(GcCtx);
+ CasChunkSet& Deleted = GcCtx.DeletedCas();
+ Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+
+ while (WorkCompleted < NewChunks.size() + Chunks.size())
+ {
+ Sleep(1);
+ }
+
+ {
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const auto& Chunk : NewChunks)
+ {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
+ }
+ }
+ std::vector<IoHash> KeepHashes;
+ KeepHashes.reserve(GcChunkHashes.size());
+ for (const auto& Entry : GcChunkHashes)
+ {
+ KeepHashes.push_back(Entry.first);
+ }
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepHashes);
+ Zcs.CollectGarbage(GcCtx);
+ CasChunkSet& Deleted = GcCtx.DeletedCas();
+ Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+ }
+ {
+ std::atomic<size_t> WorkCompleted = 0;
+ for (const auto& Chunk : GcChunkHashes)
+ {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < GcChunkHashes.size())
+ {
+ Sleep(1);
+ }
+ }
+ }
+}
+
+TEST_CASE("z$.namespaces")
+{
+ using namespace testutils;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ ScopedTemporaryDirectory TempDir;
+ CreateDirectories(TempDir.Path());
+
+ {
+ CasGc Gc;
+ ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ const auto Bucket = "teardrinker"sv;
+ const auto CustomNamespace = "mynamespace"sv;
+
+ // Create a cache record
+ const IoHash Key = CreateKey(42);
+ CbObject CacheValue = CreateCacheValue(4096);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ ZenCacheValue PutValue = {.Value = Buffer};
+ Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key, PutValue);
+
+ ZenCacheValue GetValue;
+ CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key, GetValue));
+
+ CHECK(!Zcs.Get(CustomNamespace, Bucket, Key, GetValue));
+
+ // This should just be dropped for now until we decide how we add namespaces
+ Zcs.Put(CustomNamespace, Bucket, Key, PutValue);
+ CHECK(!Zcs.Get(CustomNamespace, Bucket, Key, GetValue));
+
+ const IoHash Key2 = CreateKey(43);
+ CbObject CacheValue2 = CreateCacheValue(4096);
+
+ IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
+ Buffer2.SetContentType(ZenContentType::kCbObject);
+ ZenCacheValue PutValue2 = {.Value = Buffer2};
+ Zcs.Put(CustomNamespace, Bucket, Key2, PutValue2);
+
+ CHECK(!Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue));
+ }
+}
+
+TEST_CASE("z$.blocked.disklayer.put")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ GcStorageSize CacheSize;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ CasGc Gc;
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+
+ CbObject CacheValue = CreateCacheValue(64 * 1024 + 64);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ size_t Key = Buffer.Size();
+ IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t));
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer});
+
+ ZenCacheValue BufferGet;
+ CHECK(Zcs.Get("test_bucket", HashKey, BufferGet));
+
+ // Overwriting with a value of same size should go fine
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer});
+
+ CbObject CacheValue2 = CreateCacheValue(64 * 1024 + 64 + 1);
+ IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
+ Buffer2.SetContentType(ZenContentType::kCbObject);
+# if ZEN_PLATFORM_WINDOWS
+ // On Windows platform, overwriting with different size while we have
+ // it open for read should throw exception if file is held open
+ CHECK_THROWS(Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}));
+# else
+ // Other platforms should handle overwrite just fine
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer2});
+# endif
+
+ BufferGet = ZenCacheValue{};
+
+ // Read access has been removed, we should now be able to overwrite it
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer2});
+}
+
#endif
void
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index f39d01747..34b8d18f4 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -8,6 +8,7 @@
#include <zencore/thread.h>
#include <zencore/uid.h>
#include <zenstore/basicfile.h>
+#include <zenstore/blockstore.h>
#include <zenstore/cas.h>
#include <zenstore/caslog.h>
#include <zenstore/gc.h>
@@ -76,37 +77,32 @@ struct DiskLocation
{
inline DiskLocation() = default;
- inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
- : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
- , LowerSize(ValueSize & 0xFFFFffff)
- , IndexDataSize(IndexSize)
+ inline DiskLocation(uint64_t ValueSize, uint8_t Flags) : Flags(Flags | kStandaloneFile) { Location.StandaloneSize = ValueSize; }
+
+ inline DiskLocation(const BlockStoreLocation& Location, uint64_t PayloadAlignment, uint8_t Flags) : Flags(Flags & ~kStandaloneFile)
{
+ this->Location.BlockLocation = BlockStoreDiskLocation(Location, PayloadAlignment);
}
- static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
- static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize)
- static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
- static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file
- static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary
- static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value
- static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format
-
- static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
+ inline BlockStoreLocation GetBlockLocation(uint64_t PayloadAlignment) const
+ {
+ ZEN_ASSERT(!(Flags & kStandaloneFile));
+ return Location.BlockLocation.Get(PayloadAlignment);
+ }
- inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
- inline uint64_t Size() const { return LowerSize; }
- inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
- inline uint64_t GetFlags() const { return OffsetAndFlags & kFlagsMask; }
+ inline uint64_t Size() const { return (Flags & kStandaloneFile) ? Location.StandaloneSize : Location.BlockLocation.GetSize(); }
+ inline uint8_t IsFlagSet(uint64_t Flag) const { return Flags & Flag; }
+ inline uint8_t GetFlags() const { return Flags; }
inline ZenContentType GetContentType() const
{
ZenContentType ContentType = ZenContentType::kBinary;
- if (IsFlagSet(DiskLocation::kStructured))
+ if (IsFlagSet(kStructured))
{
ContentType = ZenContentType::kCbObject;
}
- if (IsFlagSet(DiskLocation::kCompressed))
+ if (IsFlagSet(kCompressed))
{
ContentType = ZenContentType::kCompressedBinary;
}
@@ -114,21 +110,29 @@ struct DiskLocation
return ContentType;
}
-private:
- uint64_t OffsetAndFlags = 0;
- uint32_t LowerSize = 0;
- uint32_t IndexDataSize = 0;
+ union
+ {
+ BlockStoreDiskLocation BlockLocation; // 10 bytes
+ uint64_t StandaloneSize = 0; // 8 bytes
+ } Location;
+
+ static const uint8_t kStandaloneFile = 0x80u; // Stored as a separate file
+ static const uint8_t kStructured = 0x40u; // Serialized as compact binary
+ static const uint8_t kTombStone = 0x20u; // Represents a deleted key/value
+ static const uint8_t kCompressed = 0x10u; // Stored in compressed buffer format
+
+ uint8_t Flags = 0;
+ uint8_t Reserved = 0;
};
struct DiskIndexEntry
{
- IoHash Key;
- DiskLocation Location;
+ IoHash Key; // 20 bytes
+ DiskLocation Location; // 12 bytes
};
-
#pragma pack(pop)
-static_assert(sizeof(DiskIndexEntry) == 36);
+static_assert(sizeof(DiskIndexEntry) == 32);
/** In-memory cache storage
@@ -245,15 +249,19 @@ private:
inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); }
private:
+ const uint64_t MaxBlockSize = 1ull << 30;
+ uint64_t m_PayloadAlignment = 1ull << 4;
+
std::string m_BucketName;
std::filesystem::path m_BucketDir;
+ std::filesystem::path m_BlocksBasePath;
+ BlockStore m_BlockStore;
Oid m_BucketId;
bool m_IsOk = false;
uint64_t m_LargeObjectThreshold = 64 * 1024;
// These files are used to manage storage of small objects for this bucket
- BasicFile m_SobsFile;
TCasLogFile<DiskIndexEntry> m_SlogFile;
struct IndexEntry
@@ -264,10 +272,12 @@ private:
IndexEntry() : Location(), LastAccess() {}
IndexEntry(const DiskLocation& Loc, const int64_t Timestamp) : Location(Loc), LastAccess(Timestamp) {}
IndexEntry(const IndexEntry& E) : Location(E.Location), LastAccess(E.LastAccess.load(std::memory_order_relaxed)) {}
- IndexEntry(IndexEntry&& E) : Location(std::move(E.Location)), LastAccess(E.LastAccess.load(std::memory_order_relaxed)) {}
+ IndexEntry(IndexEntry&& E) noexcept : Location(std::move(E.Location)), LastAccess(E.LastAccess.load(std::memory_order_relaxed))
+ {
+ }
IndexEntry& operator=(const IndexEntry& E) { return *this = IndexEntry(E); }
- IndexEntry& operator=(IndexEntry&& E)
+ IndexEntry& operator=(IndexEntry&& E) noexcept
{
Location = std::move(E.Location);
LastAccess.store(E.LastAccess.load(), std::memory_order_relaxed);
@@ -277,20 +287,21 @@ private:
using IndexMap = tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher>;
- RwLock m_IndexLock;
- IndexMap m_Index;
- uint64_t m_SobsCursor = 0;
+ RwLock m_IndexLock;
+ IndexMap m_Index;
+
std::atomic_uint64_t m_TotalSize{};
- void BuildPath(PathBuilderBase& Path, const IoHash& HashKey);
- void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
- bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue);
- void DeleteStandaloneCacheValue(const DiskLocation& Loc,
- const IoHash& HashKey,
- const std::filesystem::path& Path,
- std::error_code& Ec);
- bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue);
- void OpenLog(const std::filesystem::path& BucketDir, const bool IsNew);
+ void BuildPath(PathBuilderBase& Path, const IoHash& HashKey);
+ void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
+ bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue);
+ void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
+ bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue);
+ void MakeIndexSnapshot();
+ uint64_t ReadIndexFile();
+ uint64_t ReadLog(uint64_t LogPosition);
+ uint64_t MigrateLegacyData(bool CleanSource);
+ void OpenLog(const std::filesystem::path& BucketDir, const bool IsNew);
// These locks are here to avoid contention on file creation, therefore it's sufficient
// that we take the same lock for the same hash
@@ -311,11 +322,11 @@ private:
ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete;
};
-class ZenCacheStore final : public GcStorage, public GcContributor
+class ZenCacheNamespace final : public RefCounted, public GcStorage, public GcContributor
{
public:
- ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir);
- ~ZenCacheStore();
+ ZenCacheNamespace(CasGc& Gc, const std::filesystem::path& RootDir);
+ ~ZenCacheNamespace();
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
@@ -338,8 +349,38 @@ private:
std::unique_ptr<ZenCacheTracker> m_AccessTracker;
#endif
- ZenCacheStore(const ZenCacheStore&) = delete;
- ZenCacheStore& operator=(const ZenCacheStore&) = delete;
+ ZenCacheNamespace(const ZenCacheNamespace&) = delete;
+ ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete;
+};
+
+class ZenCacheStore final : public GcStorage, public GcContributor
+{
+public:
+ static constexpr std::string_view DefaultNamespace =
+ "!default!"; // This is intentionally not a valid namespace name and will only be used for mapping when no namespace is given
+ static constexpr std::string_view NamespaceDiskPrefix = "ns_";
+
+ ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath);
+ ~ZenCacheStore();
+
+ bool Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
+ bool DropBucket(std::string_view Namespace, std::string_view Bucket);
+ void Flush();
+ void Scrub(ScrubContext& Ctx);
+
+ virtual void GatherReferences(GcContext& GcCtx) override;
+ virtual void CollectGarbage(GcContext& GcCtx) override;
+ virtual GcStorageSize StorageSize() const override;
+
+private:
+ ZenCacheNamespace* GetNamespace(std::string_view Namespace);
+ void IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const;
+
+ typedef std::unordered_map<std::string, std::unique_ptr<ZenCacheNamespace>> NameSpaceMap;
+
+ mutable RwLock m_NamespacesLock;
+ NameSpaceMap m_Namespaces;
};
void z$_forcelink();