diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 88 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 93 | ||||
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 170 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 62 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cachedisklayer.h | 21 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cacherpc.h | 1 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/structuredcachestore.h | 6 |
7 files changed, 311 insertions, 130 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index b9a9ca380..d5dd28f68 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -996,8 +996,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (Success && StoreLocal) { - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, nullptr); - m_CacheStats.WriteCount++; + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + if (m_CacheStore + .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr)) + { + m_CacheStats.WriteCount++; + } } } else if (AcceptType == ZenContentType::kCbPackage) @@ -1082,30 +1086,34 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (StoreLocal) { - m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - CacheValue, - ReferencedAttachments, - nullptr); - m_CacheStats.WriteCount++; - - if (!WriteAttachmentBuffers.empty()) + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + if (m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr)) { - std::vector<CidStore::InsertResult> InsertResults = - m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); - for (const CidStore::InsertResult& Result : InsertResults) + m_CacheStats.WriteCount++; + + if (!WriteAttachmentBuffers.empty()) { - if (Result.New) + std::vector<CidStore::InsertResult> InsertResults = + m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (const CidStore::InsertResult& Result : InsertResults) { - Count.New++; + if (Result.New) + { + Count.New++; + } } } - } - WriteAttachmentBuffers = {}; - WriteRawHashes = {}; + WriteAttachmentBuffers = {}; + WriteRawHashes = {}; + } } BinaryWriter MemStream; @@ -1224,13 +1232,18 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con { RawHash = IoHash::HashBuffer(SharedBuffer(Body)); } - m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, - {}, - nullptr); + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + if (!m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + nullptr)) + { + return Request.WriteResponse(HttpResponseCode::Conflict); + } m_CacheStats.WriteCount++; if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) @@ -1279,7 +1292,19 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con TotalCount++; }); - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}, ReferencedAttachments, nullptr); + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + + if (!m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body}, + ReferencedAttachments, + Overwrite, + nullptr)) + { + return Request.WriteResponse(HttpResponseCode::Conflict); + } m_CacheStats.WriteCount++; ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}", @@ -1372,10 +1397,15 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); } + const bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal); + ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments); + if (!m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite)) + { + return Request.WriteResponse(HttpResponseCode::Conflict); + } m_CacheStats.WriteCount++; if (!WriteAttachmentBuffers.empty()) diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 25f68330a..eaed1f64e 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1793,16 +1793,98 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -void +static bool +ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function<IoHash()>& RawHashProvider) +{ + if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) + { + return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash)); + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t ValueRawSize = 0; + IoHash ValueRawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) && + (RawHashProvider() == ValueRawHash); + } + + return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value)); +} + +static bool +ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2) +{ + if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero)) + { + return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; }); + } + else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t Value1RawSize = 0; + IoHash Value1RawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) && + ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; }); + } + + return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); }); +} + +bool ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::Put"); metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + if (!Overwrite) + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + auto It = m_Index.find(HashKey); + if (It != m_Index.end()) + { + PayloadIndex EntryIndex = It.value(); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + DiskLocation Location = m_Payloads[EntryIndex].Location; + + bool ComparisonComplete = false; + const BucketPayload* Payload = &m_Payloads[EntryIndex]; + if (Payload->MetaData) + { + const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData]; + if (MetaData) + { + if (!ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(false); + } + return false; + } + ComparisonComplete = true; + } + } + + if (!ComparisonComplete) + { + IndexLock.ReleaseNow(); + ZenCacheValue ExistingValue; + if (Get(HashKey, ExistingValue) && !ValueMatchesValue(Value, ExistingValue)) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(false); + } + return false; + } + } + } + } + if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { PutStandaloneCacheValue(HashKey, Value, References); @@ -1817,6 +1899,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, } m_DiskWriteCount++; + return true; } uint64_t @@ -3835,21 +3918,23 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc } } -void +bool ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); - + bool RetVal = false; if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); - Bucket->Put(HashKey, Value, References, BucketBatchHandle); + RetVal = Bucket->Put(HashKey, Value, References, Overwrite, BucketBatchHandle); TryMemCacheTrim(); } + return RetVal; } void diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index cca51e63e..94072d22d 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -422,7 +422,19 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag CacheValue.Value = IoBuffer(Record.GetSize()); Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments, nullptr); + bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) && + !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal); + if (!m_CacheStore.Put(Request.Context, + Request.Namespace, + Request.Key.Bucket, + Request.Key.Hash, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr)) + { + return PutResult::Conflict; + } m_CacheStats.WriteCount++; if (!WriteAttachmentBuffers.empty()) @@ -753,18 +765,23 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (StoreLocal) { + bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::QueryLocal); std::vector<IoHash> ReferencedAttachments; ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { const IoHash ValueHash = HashView.AsHash(); ReferencedAttachments.push_back(ValueHash); }); - m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = {Request.RecordCacheValue}}, - ReferencedAttachments, - nullptr); + if (!m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = {Request.RecordCacheValue}}, + ReferencedAttachments, + Overwrite, + nullptr)) + { + return; + } m_CacheStats.WriteCount++; } ParseValues(Request); @@ -962,20 +979,25 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) { - IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); + bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal); + IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); Value.SetContentType(ZenContentType::kCompressedBinary); if (RawSize == 0) { RawSize = Chunk.DecodeRawSize(); } - m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, - {}, - Batch.get()); - m_CacheStats.WriteCount++; + bool PutSucceeded = m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + Batch.get()); + if (PutSucceeded) + { + m_CacheStats.WriteCount++; + } if (Batch) { BatchResultIndexes.push_back(Results.size()); @@ -983,7 +1005,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con } else { - Results.push_back(true); + Results.push_back(PutSucceeded); } TransferredSize = Chunk.GetCompressedSize(); } @@ -1225,6 +1247,7 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO const bool HasData = IsCompressedBinary(Params.Value.GetContentType()); const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData); const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + const bool Overwrite = StoreData && !EnumHasAllFlags(Request.Policy, CachePolicy::QueryLocal); const bool IsHit = SkipData || HasData; if (IsHit) { @@ -1235,14 +1258,18 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO if (HasData && StoreData) { - m_CacheStore.Put(Context, - *Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, - {}, - nullptr); - m_CacheStats.WriteCount++; + if (m_CacheStore.Put( + Context, + *Namespace, + Request.Key.Bucket, + Request.Key.Hash, + ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, + {}, + Overwrite, + nullptr)) + { + m_CacheStats.WriteCount++; + } } ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", @@ -1510,36 +1537,47 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, if (!UpstreamRecordRequests.empty()) { - const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context]( - CacheRecordGetCompleteParams&& Params) { - if (!Params.Record) - { - return; - } - CacheKeyRequest& RecordKey = Params.Request; - size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); - RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); - RecordBody& Record = Records[RecordIndex]; - - const CacheKey& Key = RecordKey.Key; - Record.Exists = true; - CbObject ObjectBuffer = CbObject::Clone(Params.Record); - Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); - Record.CacheValue.SetContentType(ZenContentType::kCbObject); - Record.Source = Params.Source; - - bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); - if (StoreLocal) - { - std::vector<IoHash> ReferencedAttachments; - ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { - const IoHash ValueHash = HashView.AsHash(); - ReferencedAttachments.push_back(ValueHash); - }); - m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments, nullptr); - m_CacheStats.WriteCount++; - } - }; + const auto OnCacheRecordGetComplete = + [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) { + if (!Params.Record) + { + return; + } + CacheKeyRequest& RecordKey = Params.Request; + size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); + RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); + RecordBody& Record = Records[RecordIndex]; + + const CacheKey& Key = RecordKey.Key; + Record.Exists = true; + CbObject ObjectBuffer = CbObject::Clone(Params.Record); + Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); + Record.CacheValue.SetContentType(ZenContentType::kCbObject); + Record.Source = Params.Source; + + bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) + { + bool Overwrite = !EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal); + std::vector<IoHash> ReferencedAttachments; + ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + ReferencedAttachments.push_back(ValueHash); + }); + if (!m_CacheStore.Put(Context, + Namespace, + Key.Bucket, + Key.Hash, + {.Value = Record.CacheValue}, + ReferencedAttachments, + Overwrite, + nullptr)) + { + return; + } + m_CacheStats.WriteCount++; + } + }; m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } @@ -1748,20 +1786,24 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (StoreLocal) { + bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal); if (Request.IsRecordRequest) { m_CidStore.AddChunk(Params.Value, Params.RawHash); } else { - m_CacheStore.Put(Context, - Namespace, - Key.Key.Bucket, - Key.Key.Hash, - {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, - {}, - nullptr); - m_CacheStats.WriteCount++; + if (m_CacheStore.Put(Context, + Namespace, + Key.Key.Bucket, + Key.Key.Hash, + {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, + {}, + Overwrite, + nullptr)) + { + m_CacheStats.WriteCount++; + } } } if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index 133cb42d7..25fd23b93 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -252,11 +252,12 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc return; } -void +bool ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU(OptionalBatchHandle ? "Z$::Namespace::Put(Batched)" : "Z$::Namespace::Put"); @@ -268,8 +269,12 @@ ZenCacheNamespace::Put(std::string_view InBucket, ZEN_ASSERT(Value.Value.Size()); ZenCacheDiskLayer::PutBatchHandle* DiskLayerBatchHandle = OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr; - m_DiskLayer.Put(InBucket, HashKey, Value, References, DiskLayerBatchHandle); - m_WriteCount++; + bool RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle); + if (RetVal) + { + m_WriteCount++; + } + return RetVal; } bool @@ -716,13 +721,14 @@ ZenCacheStore::Get(const CacheRequestContext& Context, m_MissCount++; } -void +bool ZenCacheStore::Put(const CacheRequestContext& Context, std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatch* OptionalBatchHandle) { // Ad hoc rejection of known bad usage patterns for DDC bucket names @@ -730,7 +736,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context, if (IsKnownBadBucketName(Bucket)) { m_RejectedWriteCount++; - return; + return false; } ZEN_MEMSCOPE(GetCacheStoreTag()); @@ -760,9 +766,16 @@ ZenCacheStore::Put(const CacheRequestContext& Context, if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) { ZenCacheNamespace::PutBatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr; - Store->Put(Bucket, HashKey, Value, References, BatchHandle); - m_WriteCount++; - return; + bool RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle); + if (RetVal) + { + m_WriteCount++; + } + else + { + m_RejectedWriteCount++; + } + return RetVal; } ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put [{}] bucket '{}', key '{}'", @@ -770,6 +783,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context, Namespace, Bucket, HashKey.ToHexString()); + return false; } bool @@ -1363,7 +1377,7 @@ TEST_CASE("cachestore.store") Value.Value = Obj.GetBuffer().AsIoBuffer(); Value.Value.SetContentType(ZenContentType::kCbObject); - Zcs.Put("test_bucket"sv, Key, Value, {}); + Zcs.Put("test_bucket"sv, Key, Value, {}, false); } for (int i = 0; i < kIterationCount; ++i) @@ -1417,7 +1431,7 @@ TEST_CASE("cachestore.size") const size_t Bucket = Key % 4; std::string BucketName = fmt::format("test_bucket-{}", Bucket); IoHash Hash = IoHash::HashBuffer(&Key, sizeof(uint32_t)); - Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}); + Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}, false); Keys.push_back({BucketName, Hash}); } CacheSize = Zcs.StorageSize(); @@ -1471,7 +1485,7 @@ TEST_CASE("cachestore.size") for (size_t Key = 0; Key < Count; ++Key) { const size_t Bucket = Key % 4; - Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}); + Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}, false); } CacheSize = Zcs.StorageSize(); @@ -1554,7 +1568,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) for (const auto& Chunk : Chunks) { ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}); + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); WorkCompleted.fetch_add(1); }); } @@ -1635,7 +1649,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) for (const auto& Chunk : NewChunks) { ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}); + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); AddedChunkCount.fetch_add(1); WorkCompleted.fetch_add(1); }); @@ -1740,14 +1754,14 @@ TEST_CASE("cachestore.namespaces") Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {}); + Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {}, false); ZenCacheValue GetValue; CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue)); // This should just be dropped as we don't allow creating of namespaces on the fly - Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {}); + Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {}, false); CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue)); } @@ -1763,7 +1777,7 @@ TEST_CASE("cachestore.namespaces") IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); Buffer2.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue2 = {.Value = Buffer2}; - Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {}); + Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {}, false); ZenCacheValue GetValue; CHECK(!Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue)); @@ -1805,7 +1819,7 @@ TEST_CASE("cachestore.drop.bucket") Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}); + Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false); return Key; }; auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { @@ -1878,7 +1892,7 @@ TEST_CASE("cachestore.drop.namespace") Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}); + Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false); return Key; }; auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { @@ -1964,7 +1978,7 @@ TEST_CASE("cachestore.blocked.disklayer.put") size_t Key = Buffer.Size(); IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t)); - Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {}); + Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {}, false); ZenCacheValue BufferGet; CHECK(Zcs.Get("test_bucket", HashKey, BufferGet)); @@ -1974,7 +1988,7 @@ TEST_CASE("cachestore.blocked.disklayer.put") Buffer2.SetContentType(ZenContentType::kCbObject); // We should be able to overwrite even if the file is open for read - Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {}); + Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {}, false); MemoryView OldView = BufferGet.Value.GetView(); @@ -2065,7 +2079,7 @@ TEST_CASE("cachestore.scrub") AttachmentHashes.push_back(Attachment.DecodeRawHash()); CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), AttachmentHashes.back()); } - Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes); + Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes, false); } }; @@ -2114,7 +2128,8 @@ TEST_CASE("cachestore.newgc.basics") {.Value = Record.second, .RawSize = Record.second.GetSize(), .RawHash = IoHash::HashBuffer(Record.second.GetData(), Record.second.GetSize())}, - AttachmentKeys); + AttachmentKeys, + false); for (const auto& Attachment : Attachments) { CidStore.AddChunk(Attachment.second.GetCompressed().Flatten().AsIoBuffer(), Attachment.second.DecodeRawHash()); @@ -2130,7 +2145,8 @@ TEST_CASE("cachestore.newgc.basics") {.Value = CacheValue.second, .RawSize = CacheValue.second.GetSize(), .RawHash = IoHash::HashBuffer(CacheValue.second.GetData(), CacheValue.second.GetSize())}, - {}); + {}, + false); CacheEntries.insert({Key, CacheEntry{CacheValue.second, {}}}); return Key; }; diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index b0b4f22cb..7de707a7f 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -178,10 +178,11 @@ public: PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult); void EndPutBatch(PutBatchHandle* Batch) noexcept; - void Put(std::string_view Bucket, + bool Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle); bool Drop(); bool DropBucket(std::string_view Bucket); @@ -228,13 +229,17 @@ public: void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult); - void EndPutBatch(PutBatchHandle* Batch) noexcept; - void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, PutBatchHandle* OptionalBatchHandle); - uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); - bool Drop(); - void Flush(); - void ScrubStorage(ScrubContext& Ctx); + PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult); + void EndPutBatch(PutBatchHandle* Batch) noexcept; + bool Put(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle); + uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); + bool Drop(); + void Flush(); + void ScrubStorage(ScrubContext& Ctx); RwLock::SharedLockScope GetGcReferencerLock(); struct ReferencesStats diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h index da8cf69fe..d489a5386 100644 --- a/src/zenstore/include/zenstore/cache/cacherpc.h +++ b/src/zenstore/include/zenstore/cache/cacherpc.h @@ -60,6 +60,7 @@ enum class PutResult { Success, Fail, + Conflict, Invalid, }; diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h index 82fec9b0e..1b5e0b76b 100644 --- a/src/zenstore/include/zenstore/cache/structuredcachestore.h +++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h @@ -91,10 +91,11 @@ public: bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle); - void Put(std::string_view Bucket, + bool Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle = nullptr); bool DropBucket(std::string_view Bucket); @@ -243,12 +244,13 @@ public: const IoHash& HashKey, GetBatch& BatchHandle); - void Put(const CacheRequestContext& Context, + bool Put(const CacheRequestContext& Context, std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatch* OptionalBatchHandle = nullptr); bool DropBucket(std::string_view Namespace, std::string_view Bucket); |