diff options
| author | Martin Ridgers <[email protected]> | 2021-09-21 11:06:13 +0200 |
|---|---|---|
| committer | Martin Ridgers <[email protected]> | 2021-09-21 11:06:13 +0200 |
| commit | 68c951e0f440ffd483795dced737e88152c1a581 (patch) | |
| tree | 5c0910ca2a85b45fb05dba3ce457b7d156213894 /zenserver/cache | |
| parent | Merge main into linux-mac (diff) | |
| parent | Trigger storage scrubbing pass at startup (diff) | |
| download | zen-68c951e0f440ffd483795dced737e88152c1a581.tar.xz zen-68c951e0f440ffd483795dced737e88152c1a581.zip | |
Merged main into linux-mac
Diffstat (limited to 'zenserver/cache')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 384 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 11 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 129 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 43 |
4 files changed, 369 insertions, 198 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index cf7deaa93..7f1fe7b44 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -9,6 +9,7 @@ #include <zencore/stream.h> #include <zencore/timer.h> #include <zenhttp/httpserver.h> +#include <zenstore/CAS.h> #include "structuredcache.h" #include "structuredcachestore.h" @@ -25,17 +26,130 @@ #include <queue> #include <thread> +#include <gsl/gsl-lite.hpp> + namespace zen { using namespace std::literals; ////////////////////////////////////////////////////////////////////////// -HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InCacheStore, - zen::CasStore& InStore, - zen::CidStore& InCidStore, +namespace detail { namespace cacheopt { + constexpr std::string_view Local = "local"sv; + constexpr std::string_view Remote = "remote"sv; + constexpr std::string_view Data = "data"sv; + constexpr std::string_view Meta = "meta"sv; + constexpr std::string_view Value = "value"sv; + constexpr std::string_view Attachments = "attachments"sv; +}} // namespace detail::cacheopt + +////////////////////////////////////////////////////////////////////////// + +enum class CachePolicy : uint8_t +{ + None = 0, + QueryLocal = 1 << 0, + QueryRemote = 1 << 1, + Query = QueryLocal | QueryRemote, + StoreLocal = 1 << 2, + StoreRemote = 1 << 3, + Store = StoreLocal | StoreRemote, + SkipMeta = 1 << 4, + SkipValue = 1 << 5, + SkipAttachments = 1 << 6, + SkipData = SkipMeta | SkipValue | SkipAttachments, + SkipLocalCopy = 1 << 7, + Local = QueryLocal | StoreLocal, + Remote = QueryRemote | StoreRemote, + Default = Query | Store, + Disable = None, +}; + +gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy); + +CachePolicy +ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) +{ + CachePolicy QueryPolicy = CachePolicy::Query; + + { + std::string_view Opts = QueryParams.GetValue("query"sv); + if (!Opts.empty()) + { + QueryPolicy = CachePolicy::None; + ForEachStrTok(Opts, ',', [&QueryPolicy](const std::string_view& Opt) { + if (Opt == detail::cacheopt::Local) + { + QueryPolicy |= CachePolicy::QueryLocal; + } + if (Opt == detail::cacheopt::Remote) + { + QueryPolicy |= CachePolicy::QueryRemote; + } + return true; + }); + } + } + + CachePolicy StorePolicy = CachePolicy::Store; + + { + std::string_view Opts = QueryParams.GetValue("store"sv); + if (!Opts.empty()) + { + StorePolicy = CachePolicy::None; + ForEachStrTok(Opts, ',', [&StorePolicy](const std::string_view& Opt) { + if (Opt == detail::cacheopt::Local) + { + StorePolicy |= CachePolicy::StoreLocal; + } + if (Opt == detail::cacheopt::Remote) + { + StorePolicy |= CachePolicy::StoreRemote; + } + return true; + }); + } + } + + CachePolicy SkipPolicy = CachePolicy::None; + + { + std::string_view Opts = QueryParams.GetValue("skip"sv); + if (!Opts.empty()) + { + ForEachStrTok(Opts, ',', [&SkipPolicy](const std::string_view& Opt) { + if (Opt == detail::cacheopt::Meta) + { + SkipPolicy |= CachePolicy::SkipMeta; + } + if (Opt == detail::cacheopt::Value) + { + SkipPolicy |= CachePolicy::SkipValue; + } + if (Opt == detail::cacheopt::Attachments) + { + SkipPolicy |= CachePolicy::SkipAttachments; + } + if (Opt == detail::cacheopt::Data) + { + SkipPolicy |= CachePolicy::SkipData; + } + return true; + }); + } + } + + return QueryPolicy | StorePolicy | SkipPolicy; +} + +////////////////////////////////////////////////////////////////////////// + +HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, + CasStore& InStore, + CidStore& InCidStore, std::unique_ptr<UpstreamCache> UpstreamCache) -: m_Log(zen::logging::Get("cache")) +: m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) , m_CasStore(InStore) , m_CidStore(InCidStore) @@ -59,8 +173,14 @@ HttpStructuredCacheService::Flush() { } +void +HttpStructuredCacheService::Scrub(ScrubContext& Ctx) +{ + ZEN_UNUSED(Ctx); +} + void -HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) +HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { CacheRef Ref; @@ -75,28 +195,31 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) return HandleCacheBucketRequest(Request, Key); } - return Request.WriteResponse(zen::HttpResponseCode::BadRequest); // invalid URL + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } + const auto QueryParams = Request.GetQueryParams(); + CachePolicy Policy = ParseCachePolicy(QueryParams); + if (Ref.PayloadId == IoHash::Zero) { - return HandleCacheRecordRequest(Request, Ref); + return HandleCacheRecordRequest(Request, Ref, Policy); } else { - return HandleCachePayloadRequest(Request, Ref); + return HandleCachePayloadRequest(Request, Ref, Policy); } return; } void -HttpStructuredCacheService::HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket) +HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Bucket) { ZEN_UNUSED(Request, Bucket); switch (auto Verb = Request.RequestVerb()) { - using enum zen::HttpVerb; + using enum HttpVerb; case kHead: case kGet: @@ -110,22 +233,22 @@ HttpStructuredCacheService::HandleCacheBucketRequest(zen::HttpServerRequest& Req if (m_CacheStore.DropBucket(Bucket)) { - return Request.WriteResponse(zen::HttpResponseCode::OK); + return Request.WriteResponse(HttpResponseCode::OK); } else { - return Request.WriteResponse(zen::HttpResponseCode::NotFound); + return Request.WriteResponse(HttpResponseCode::NotFound); } break; } } void -HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref) +HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy) { switch (auto Verb = Request.RequestVerb()) { - using enum zen::HttpVerb; + using enum HttpVerb; case kHead: case kGet: @@ -136,7 +259,9 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); bool InUpstreamCache = false; - if (!Success && m_UpstreamCache) + const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote)); + + if (QueryUpstream) { const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage @@ -153,14 +278,13 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req { if (CacheRecordType == ZenContentType::kCbObject) { - const zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(UpstreamResult.Value, zen::CbValidateMode::All); + const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); if (ValidationResult == CbValidateError::None) { - zen::CbObjectView CacheRecord(UpstreamResult.Value.Data()); + CbObjectView CacheRecord(UpstreamResult.Value.Data()); - zen::CbObjectWriter IndexData; + CbObjectWriter IndexData; IndexData.BeginArray("references"); CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); }); IndexData.EndArray(); @@ -214,6 +338,18 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req if (FoundCount == AttachmentCount) { m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); + + if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments)) + { + CbPackage PackageWithoutAttachments; + PackageWithoutAttachments.SetObject(CacheRecord); + + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + PackageWithoutAttachments.Save(Writer); + + Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + } } else { @@ -236,7 +372,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req { ZEN_DEBUG("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey); - return Request.WriteResponse(zen::HttpResponseCode::NotFound); + return Request.WriteResponse(HttpResponseCode::NotFound); } if (Verb == kHead) @@ -248,41 +384,45 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req { CbObjectView CacheRecord(Value.Value.Data()); - const zen::CbValidateError ValidationResult = zen::ValidateCompactBinary(Value.Value, zen::CbValidateMode::All); + const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All); if (ValidationResult != CbValidateError::None) { ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey); - return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); } - uint32_t AttachmentCount = 0; - uint32_t FoundCount = 0; - uint64_t AttachmentBytes = 0ull; + const bool SkipAttachments = zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments); + uint32_t AttachmentCount = 0; + uint32_t FoundCount = 0; + uint64_t AttachmentBytes = 0ull; CbPackage Package; - CacheRecord.IterateAttachments( - [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) - { - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); - AttachmentBytes += Chunk.Size(); - FoundCount++; - } - AttachmentCount++; - }); - - if (FoundCount != AttachmentCount) + if (!SkipAttachments) { - ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments", - Ref.BucketSegment, - Ref.HashKey, - FoundCount, - AttachmentCount); + CacheRecord.IterateAttachments( + [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + AttachmentBytes += Chunk.Size(); + FoundCount++; + } + AttachmentCount++; + }); + + if (FoundCount != AttachmentCount) + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments", + Ref.BucketSegment, + Ref.HashKey, + FoundCount, + AttachmentCount); - return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + } } Package.SetObject(LoadCompactBinaryObject(Value.Value)); @@ -300,7 +440,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - return Request.WriteResponse(zen::HttpResponseCode::OK, HttpContentType::kCbPackage, Response); + return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response); } else { @@ -310,41 +450,43 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req NiceBytes(Value.Value.Size()), InUpstreamCache ? "UPSTREAM" : "LOCAL"); - return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + return Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); } } break; case kPut: { - zen::IoBuffer Body = Request.ReadPayload(); + IoBuffer Body = Request.ReadPayload(); if (!Body || Body.Size() == 0) { - return Request.WriteResponse(zen::HttpResponseCode::BadRequest); + return Request.WriteResponse(HttpResponseCode::BadRequest); } const HttpContentType ContentType = Request.RequestContentType(); + const bool StoreUpstream = m_UpstreamCache && (CachePolicy::StoreRemote == (Policy & CachePolicy::StoreRemote)); + if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType) { // TODO: create a cache record and put value in CAS? m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size())); - if (m_UpstreamCache) + if (StoreUpstream) { auto Result = m_UpstreamCache->EnqueueUpstream( {.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}}); } - return Request.WriteResponse(zen::HttpResponseCode::Created); + return Request.WriteResponse(HttpResponseCode::Created); } else if (ContentType == HttpContentType::kCbObject) { // Validate payload before accessing it - const zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); + const CbValidateError ValidationResult = + ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), CbValidateMode::All); if (ValidationResult != CbValidateError::None) { @@ -360,7 +502,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req } // Extract referenced payload hashes - zen::CbObjectView Cbo(Body.Data()); + CbObjectView Cbo(Body.Data()); std::vector<IoHash> References; std::vector<IoHash> MissingRefs; @@ -371,7 +513,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req if (!References.empty()) { - zen::CbObjectWriter Idx; + CbObjectWriter Idx; Idx.BeginArray("references"); for (const IoHash& Hash : References) @@ -393,26 +535,23 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} attachments missing", Ref.BucketSegment, Ref.HashKey, - zen::NiceBytes(CacheValue.Value.Size()), + NiceBytes(CacheValue.Value.Size()), MissingRefs.size(), References.size()); - if (MissingRefs.empty()) + if (MissingRefs.empty() && StoreUpstream) { - // Only enqueue valid cache records, i.e. all referenced payloads exists - if (m_UpstreamCache) - { - auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, - .CacheKey = {Ref.BucketSegment, Ref.HashKey}, - .PayloadIds = std::move(References)}); - } + ZEN_ASSERT(m_UpstreamCache); + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(References)}); - return Request.WriteResponse(zen::HttpResponseCode::Created); + return Request.WriteResponse(HttpResponseCode::Created); } else { // TODO: Binary attachments? - zen::CbObjectWriter Response; + CbObjectWriter Response; Response.BeginArray("needs"); for (const IoHash& MissingRef : MissingRefs) { @@ -422,7 +561,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Response.EndArray(); // Return Created | BadRequest? - return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save()); + return Request.WriteResponse(HttpResponseCode::Created, Response.Save()); } } else if (ContentType == HttpContentType::kCbPackage) @@ -437,26 +576,22 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req CbObject CacheRecord = Package.GetObject(); - int32_t AttachmentCount = 0; - int32_t NewAttachmentCount = 0; - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalNewBytes = 0; - bool AttachmentsOk = true; - + struct AttachmentInsertResult + { + int32_t Count = 0; + int32_t NewCount = 0; + uint64_t Bytes = 0; + uint64_t NewBytes = 0; + bool Ok = false; + }; + + AttachmentInsertResult AttachmentResult{.Ok = true}; std::span<const CbAttachment> Attachments = Package.GetAttachments(); + std::vector<IoHash> PayloadIds; - std::vector<IoHash> PayloadIds; PayloadIds.reserve(Attachments.size()); - CacheRecord.IterateAttachments([this, - &Ref, - &Package, - &AttachmentsOk, - &AttachmentCount, - &TotalAttachmentBytes, - &TotalNewBytes, - &NewAttachmentCount, - &PayloadIds](CbFieldView AttachmentHash) { + CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentResult, &PayloadIds](CbFieldView AttachmentHash) { if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) { if (Attachment->IsCompressedBinary()) @@ -469,12 +604,12 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req if (InsertResult.New) { - TotalNewBytes += ChunkSize; - ++NewAttachmentCount; + AttachmentResult.NewBytes += ChunkSize; + AttachmentResult.NewCount++; } - TotalAttachmentBytes += ChunkSize; - AttachmentCount++; + AttachmentResult.Bytes += ChunkSize; + AttachmentResult.Count++; } else { @@ -482,7 +617,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Ref.BucketSegment, Ref.HashKey, AttachmentHash.AsHash()); - AttachmentsOk = false; + AttachmentResult.Ok = false; } } else @@ -491,23 +626,24 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Ref.BucketSegment, Ref.HashKey, AttachmentHash.AsHash()); - AttachmentsOk = false; + AttachmentResult.Ok = false; } }); - if (!AttachmentsOk) + if (!AttachmentResult.Ok) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"); } IoBuffer CacheRecordChunk = CacheRecord.GetBuffer().AsIoBuffer(); - const uint64_t TotalPackageBytes = TotalAttachmentBytes + CacheRecordChunk.Size(); + const uint64_t TotalPackageBytes = AttachmentResult.Bytes + CacheRecordChunk.Size(); ZenCacheValue CacheValue{.Value = CacheRecordChunk}; m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); - if (m_UpstreamCache) + if (StoreUpstream) { + ZEN_ASSERT(m_UpstreamCache); auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, .CacheKey = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(PayloadIds)}); @@ -516,17 +652,17 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} ({}/{}) new attachments", Ref.BucketSegment, Ref.HashKey, - zen::NiceBytes(TotalPackageBytes), - NewAttachmentCount, - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - zen::NiceBytes(TotalAttachmentBytes)); + NiceBytes(TotalPackageBytes), + AttachmentResult.NewCount, + AttachmentResult.Count, + NiceBytes(AttachmentResult.NewBytes), + NiceBytes(AttachmentResult.Bytes)); - return Request.WriteResponse(zen::HttpResponseCode::Created); + return Request.WriteResponse(HttpResponseCode::Created); } else { - return Request.WriteResponse(zen::HttpResponseCode::BadRequest); + return Request.WriteResponse(HttpResponseCode::BadRequest); } } break; @@ -540,7 +676,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req } void -HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref) +HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy) { // Note: the URL references the uncompressed payload hash - so this maintains the mapping // from uncompressed CAS identity (aka CID/Content ID) to the stored payload hash @@ -548,27 +684,29 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re // this is a PITA but a consequence of the fact that the client side code is not able to // address data by compressed hash + ZEN_UNUSED(Policy); + switch (auto Verb = Request.RequestVerb()) { - using enum zen::HttpVerb; + using enum HttpVerb; case kHead: case kGet: { - zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); - bool InUpstreamCache = false; + IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); + bool InUpstreamCache = false; if (!Payload && m_UpstreamCache) { if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); UpstreamResult.Success) { - if (zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { - Payload = UpstreamResult.Value; - zen::IoHash ChunkHash = zen::IoHash::HashBuffer(Payload); - zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); - InUpstreamCache = true; + Payload = UpstreamResult.Value; + IoHash ChunkHash = IoHash::HashBuffer(Payload); + CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); + InUpstreamCache = true; m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); } @@ -582,7 +720,7 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re if (!Payload) { ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId); - return Request.WriteResponse(zen::HttpResponseCode::NotFound); + return Request.WriteResponse(HttpResponseCode::NotFound); } ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})", @@ -598,29 +736,27 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re Request.SetSuppressResponseBody(); } - return Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, Payload); + return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); } break; case kPut: { - if (zen::IoBuffer Body = Request.ReadPayload()) + if (IoBuffer Body = Request.ReadPayload()) { if (Body.Size() == 0) { - return Request.WriteResponse(zen::HttpResponseCode::BadRequest, - HttpContentType::kText, - "Empty payload not permitted"); + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Empty payload not permitted"); } - zen::IoHash ChunkHash = zen::IoHash::HashBuffer(Body); + IoHash ChunkHash = IoHash::HashBuffer(Body); - zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(Body)); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); if (!Compressed) { // All attachment payloads need to be in compressed buffer format - return Request.WriteResponse(zen::HttpResponseCode::BadRequest, + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"); } @@ -632,7 +768,7 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re return Request.WriteResponse(HttpResponseCode::BadRequest); } - zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); + CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); @@ -646,11 +782,11 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re if (Result.New) { - return Request.WriteResponse(zen::HttpResponseCode::Created); + return Request.WriteResponse(HttpResponseCode::Created); } else { - return Request.WriteResponse(zen::HttpResponseCode::OK); + return Request.WriteResponse(HttpResponseCode::OK); } } } @@ -666,7 +802,7 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re } bool -HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef) +HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& OutRef) { std::string_view Key = Request.RelativeUri(); std::string_view::size_type BucketSplitOffset = Key.find_first_of('/'); @@ -702,14 +838,14 @@ HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, Cach PayloadSegment = Key.substr(PayloadSplitOffset + 1); } - if (HashSegment.size() != zen::IoHash::StringLength) + if (HashSegment.size() != IoHash::StringLength) { return false; } - if (!PayloadSegment.empty() && PayloadSegment.size() == zen::IoHash::StringLength) + if (!PayloadSegment.empty() && PayloadSegment.size() == IoHash::StringLength) { - const bool IsOk = zen::ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); + const bool IsOk = ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); if (!IsOk) { @@ -718,10 +854,10 @@ HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, Cach } else { - OutRef.PayloadId = zen::IoHash::Zero; + OutRef.PayloadId = IoHash::Zero; } - const bool IsOk = zen::ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); + const bool IsOk = ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); if (!IsOk) { diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 8289fd700..bd163dd1d 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -10,13 +10,13 @@ namespace spdlog { class logger; } -class ZenCacheStore; - namespace zen { class CasStore; class CidStore; class UpstreamCache; +class ZenCacheStore; +enum class CachePolicy : uint8_t; /** * Structured cache service. Imposes constraints on keys, supports blobs and @@ -60,6 +60,7 @@ public: virtual void HandleRequest(zen::HttpServerRequest& Request) override; void Flush(); + void Scrub(ScrubContext& Ctx); private: struct CacheRef @@ -70,13 +71,13 @@ private: }; [[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef); - void HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref); - void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref); + void HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy); + void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; - ZenCacheStore& m_CacheStore; + zen::ZenCacheStore& m_CacheStore; zen::CasStore& m_CasStore; zen::CidStore& m_CidStore; std::unique_ptr<UpstreamCache> m_UpstreamCache; diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 018955e65..502ca6605 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -24,15 +24,16 @@ #include <atlfile.h> -using namespace zen; -using namespace fmt::literals; - ////////////////////////////////////////////////////////////////////////// -ZenCacheStore::ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir} +namespace zen { + +using namespace fmt::literals; + +ZenCacheStore::ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir} { ZEN_INFO("initializing structured cache at '{}'", RootDir); - zen::CreateDirectories(RootDir); + CreateDirectories(RootDir); } ZenCacheStore::~ZenCacheStore() @@ -40,7 +41,7 @@ ZenCacheStore::~ZenCacheStore() } bool -ZenCacheStore::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheStore::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue); @@ -68,7 +69,7 @@ ZenCacheStore::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCac } void -ZenCacheStore::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { // Store value and index @@ -104,6 +105,12 @@ ZenCacheStore::Flush() m_DiskLayer.Flush(); } +void +ZenCacheStore::Scrub(ScrubContext& Ctx) +{ + m_DiskLayer.Scrub(Ctx); + m_MemLayer.Scrub(Ctx); +} ////////////////////////////////////////////////////////////////////////// ZenCacheMemoryLayer::ZenCacheMemoryLayer() @@ -115,7 +122,7 @@ ZenCacheMemoryLayer::~ZenCacheMemoryLayer() } bool -ZenCacheMemoryLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { CacheBucket* Bucket = nullptr; @@ -139,7 +146,7 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, } void -ZenCacheMemoryLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { CacheBucket* Bucket = nullptr; @@ -178,8 +185,14 @@ ZenCacheMemoryLayer::DropBucket(std::string_view Bucket) return !!m_Buckets.erase(std::string(Bucket)); } +void +ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) +{ + ZEN_UNUSED(Ctx); +} + bool -ZenCacheMemoryLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { RwLock::SharedLockScope _(m_bucketLock); @@ -196,7 +209,7 @@ ZenCacheMemoryLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& } void -ZenCacheMemoryLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { RwLock::ExclusiveLockScope _(m_bucketLock); @@ -227,7 +240,7 @@ struct DiskLocation struct DiskIndexEntry { - zen::IoHash Key; + IoHash Key; DiskLocation Location; }; @@ -243,8 +256,8 @@ struct ZenCacheDiskLayer::CacheBucket void OpenOrCreate(std::filesystem::path BucketDir); static bool Delete(std::filesystem::path BucketDir); - bool Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value); void Drop(); void Flush(); @@ -260,12 +273,12 @@ private: BasicFile m_SobsFile; TCasLogFile<DiskIndexEntry> m_SlogFile; - void BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey); - void PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value); + void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey); + void PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value); - RwLock m_IndexLock; - tsl::robin_map<zen::IoHash, DiskLocation, zen::IoHash::Hasher> m_Index; - uint64_t m_WriteCursor = 0; + RwLock m_IndexLock; + tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index; + uint64_t m_WriteCursor = 0; }; ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas) @@ -281,7 +294,7 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir) { if (std::filesystem::exists(BucketDir)) { - zen::DeleteDirectories(BucketDir); + DeleteDirectories(BucketDir); return true; } @@ -292,7 +305,7 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir) void ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) { - zen::CreateDirectories(BucketDir); + CreateDirectories(BucketDir); m_BucketDir = BucketDir; @@ -357,7 +370,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) uint64_t MaxFileOffset = 0; - if (zen::RwLock::ExclusiveLockScope _(m_IndexLock); m_Index.empty()) + if (RwLock::ExclusiveLockScope _(m_IndexLock); m_Index.empty()) { m_SlogFile.Replay([&](const DiskIndexEntry& Record) { m_Index[Record.Key] = Record.Location; @@ -372,25 +385,29 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) } void -ZenCacheDiskLayer::CacheBucket::BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey) +ZenCacheDiskLayer::CacheBucket::BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey) { - char hex[sizeof(HashKey.Hash) * 2]; - ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, hex); + char HexString[sizeof(HashKey.Hash) * 2]; + ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); Path.Append(m_BucketDir.c_str()); + Path.Append(L"/blob/"); + Path.AppendAsciiRange(HexString, HexString + 3); + Path.Append(L"/"); + Path.AppendAsciiRange(HexString + 3, HexString + 5); Path.Append(L"/"); - Path.AppendAsciiRange(hex, hex + sizeof(hex)); + Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); } bool -ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { if (!m_Ok) { return false; } - zen::RwLock::SharedLockScope _(m_IndexLock); + RwLock::SharedLockScope _(m_IndexLock); if (auto it = m_Index.find(HashKey); it != m_Index.end()) { @@ -417,7 +434,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& O WideStringBuilder<128> DataFilePath; BuildPath(DataFilePath, HashKey); - if (zen::IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str())) + if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str())) { OutValue.Value = Data; OutValue.Value.SetContentType(ContentType); @@ -431,7 +448,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& O } void -ZenCacheDiskLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { if (!m_Ok) { @@ -453,12 +470,12 @@ ZenCacheDiskLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheVa EntryFlags |= DiskLocation::kStructured; } - zen::RwLock::ExclusiveLockScope _(m_IndexLock); + RwLock::ExclusiveLockScope _(m_IndexLock); DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(m_WriteCursor, EntryFlags), .Size = gsl::narrow<uint32_t>(Value.Value.Size())}; - m_WriteCursor = zen::RoundUp(m_WriteCursor + Loc.Size, 16); + m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size, 16); if (auto it = m_Index.find(HashKey); it == m_Index.end()) { @@ -483,7 +500,7 @@ ZenCacheDiskLayer::CacheBucket::Drop() m_SobsFile.Close(); m_SlogFile.Close(); - zen::DeleteDirectories(m_BucketDir); + DeleteDirectories(m_BucketDir); } void @@ -494,12 +511,22 @@ ZenCacheDiskLayer::CacheBucket::Flush() } void -ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) { - zen::WideStringBuilder<128> DataFilePath; + ZEN_UNUSED(Ctx); +} + +void +ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value) +{ + WideStringBuilder<128> DataFilePath; BuildPath(DataFilePath, HashKey); - // TODO: replace this with a more efficient implementation with proper atomic rename + // TODO: replace this process with a more efficient implementation with proper atomic rename + // and also avoid creating directories if we can + + std::filesystem::path ParentPath = std::filesystem::path(DataFilePath.c_str()).parent_path(); + CreateDirectories(ParentPath); CAtlTemporaryFile DataFile; @@ -507,21 +534,23 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const if (FAILED(hRes)) { - zen::ThrowSystemException(hRes, "Failed to open temporary file for put at '{}'"_format(m_BucketDir)); + ThrowSystemException(hRes, "Failed to open temporary file for put at '{}'"_format(m_BucketDir)); } hRes = DataFile.Write(Value.Value.Data(), gsl::narrow<DWORD>(Value.Value.Size())); if (FAILED(hRes)) { - zen::ThrowSystemException(hRes, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size()))); + ThrowSystemException(hRes, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size()))); } + // Move file into place (note: not fully atomic!) + hRes = DataFile.Close(DataFilePath.c_str()); if (FAILED(hRes)) { - zen::ThrowSystemException(hRes, "Failed to finalize file '{}'"_format(zen::WideToUtf8(DataFilePath))); + ThrowSystemException(hRes, "Failed to finalize file '{}'"_format(WideToUtf8(DataFilePath))); } // Update index @@ -533,7 +562,7 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const EntryFlags |= DiskLocation::kStructured; } - zen::RwLock::ExclusiveLockScope _(m_IndexLock); + RwLock::ExclusiveLockScope _(m_IndexLock); DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(0, EntryFlags), .Size = 0}; @@ -560,12 +589,12 @@ ZenCacheDiskLayer::ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; bool -ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { CacheBucket* Bucket = nullptr; { - zen::RwLock::SharedLockScope _(m_Lock); + RwLock::SharedLockScope _(m_Lock); auto it = m_Buckets.find(std::string(InBucket)); @@ -579,7 +608,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, Ze { // Bucket needs to be opened/created - zen::RwLock::ExclusiveLockScope _(m_Lock); + RwLock::ExclusiveLockScope _(m_Lock); if (auto it = m_Buckets.find(std::string(InBucket)); it != m_Buckets.end()) { @@ -603,12 +632,12 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, Ze } void -ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { CacheBucket* Bucket = nullptr; { - zen::RwLock::SharedLockScope _(m_Lock); + RwLock::SharedLockScope _(m_Lock); auto it = m_Buckets.find(std::string(InBucket)); @@ -622,7 +651,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, co { // New bucket needs to be created - zen::RwLock::ExclusiveLockScope _(m_Lock); + RwLock::ExclusiveLockScope _(m_Lock); if (auto it = m_Buckets.find(std::string(InBucket)); it != m_Buckets.end()) { @@ -651,7 +680,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, co bool ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { - zen::RwLock::ExclusiveLockScope _(m_Lock); + RwLock::ExclusiveLockScope _(m_Lock); auto it = m_Buckets.find(std::string(InBucket)); @@ -679,7 +708,7 @@ ZenCacheDiskLayer::Flush() Buckets.reserve(m_Buckets.size()); { - zen::RwLock::SharedLockScope _(m_Lock); + RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { @@ -705,7 +734,7 @@ ZenCacheTracker::~ZenCacheTracker() } void -ZenCacheTracker::TrackAccess(std::string_view Bucket, const zen::IoHash& HashKey) +ZenCacheTracker::TrackAccess(std::string_view Bucket, const IoHash& HashKey) { ZEN_UNUSED(Bucket); ZEN_UNUSED(HashKey); @@ -715,3 +744,5 @@ void ZenCacheTracker::Flush() { } + +} // namespace zen diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index 48c3cfde9..fdf4a8cfe 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -23,8 +23,6 @@ namespace zen { class WideStringBuilderBase; class CasStore; -} // namespace zen - /****************************************************************************** /$$$$$$$$ /$$$$$$ /$$ @@ -44,8 +42,8 @@ class CasStore; struct ZenCacheValue { - zen::IoBuffer Value; - zen::CbObject IndexData; + IoBuffer Value; + CbObject IndexData; }; class ZenCacheMemoryLayer @@ -54,34 +52,36 @@ public: ZenCacheMemoryLayer(); ~ZenCacheMemoryLayer(); - bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); bool DropBucket(std::string_view Bucket); + void Scrub(ScrubContext& Ctx); private: struct CacheBucket { - zen::RwLock m_bucketLock; - tsl::robin_map<zen::IoHash, zen::IoBuffer> m_cacheMap; + RwLock m_bucketLock; + tsl::robin_map<IoHash, IoBuffer> m_cacheMap; - bool Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value); }; - zen::RwLock m_Lock; + RwLock m_Lock; std::unordered_map<std::string, CacheBucket> m_Buckets; }; class ZenCacheDiskLayer { public: - ZenCacheDiskLayer(zen::CasStore& Cas, const std::filesystem::path& RootDir); + ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir); ~ZenCacheDiskLayer(); - bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); bool DropBucket(std::string_view Bucket); void Flush(); + void Scrub(ScrubContext& Ctx); private: /** A cache bucket manages a single directory containing @@ -89,22 +89,23 @@ private: */ struct CacheBucket; - zen::CasStore& m_CasStore; + CasStore& m_CasStore; std::filesystem::path m_RootDir; - zen::RwLock m_Lock; + RwLock m_Lock; std::unordered_map<std::string, CacheBucket> m_Buckets; // TODO: make this case insensitive }; class ZenCacheStore { public: - ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir); + ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir); ~ZenCacheStore(); - bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); bool DropBucket(std::string_view Bucket); void Flush(); + void Scrub(ScrubContext& Ctx); private: std::filesystem::path m_RootDir; @@ -121,8 +122,10 @@ public: ZenCacheTracker(ZenCacheStore& CacheStore); ~ZenCacheTracker(); - void TrackAccess(std::string_view Bucket, const zen::IoHash& HashKey); + void TrackAccess(std::string_view Bucket, const IoHash& HashKey); void Flush(); private: }; + +} // namespace zen |