aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp88
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp93
-rw-r--r--src/zenstore/cache/cacherpc.cpp170
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp62
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h21
-rw-r--r--src/zenstore/include/zenstore/cache/cacherpc.h1
-rw-r--r--src/zenstore/include/zenstore/cache/structuredcachestore.h6
7 files changed, 311 insertions, 130 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index b9a9ca380..d5dd28f68 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -996,8 +996,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
if (Success && StoreLocal)
{
- m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, nullptr);
- m_CacheStats.WriteCount++;
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ if (m_CacheStore
+ .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr))
+ {
+ m_CacheStats.WriteCount++;
+ }
}
}
else if (AcceptType == ZenContentType::kCbPackage)
@@ -1082,30 +1086,34 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
if (StoreLocal)
{
- m_CacheStore.Put(RequestContext,
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- CacheValue,
- ReferencedAttachments,
- nullptr);
- m_CacheStats.WriteCount++;
-
- if (!WriteAttachmentBuffers.empty())
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ if (m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ CacheValue,
+ ReferencedAttachments,
+ Overwrite,
+ nullptr))
{
- std::vector<CidStore::InsertResult> InsertResults =
- m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
- for (const CidStore::InsertResult& Result : InsertResults)
+ m_CacheStats.WriteCount++;
+
+ if (!WriteAttachmentBuffers.empty())
{
- if (Result.New)
+ std::vector<CidStore::InsertResult> InsertResults =
+ m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (const CidStore::InsertResult& Result : InsertResults)
{
- Count.New++;
+ if (Result.New)
+ {
+ Count.New++;
+ }
}
}
- }
- WriteAttachmentBuffers = {};
- WriteRawHashes = {};
+ WriteAttachmentBuffers = {};
+ WriteRawHashes = {};
+ }
}
BinaryWriter MemStream;
@@ -1224,13 +1232,18 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
{
RawHash = IoHash::HashBuffer(SharedBuffer(Body));
}
- m_CacheStore.Put(RequestContext,
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- {.Value = Body, .RawSize = RawSize, .RawHash = RawHash},
- {},
- nullptr);
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ if (!m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ {.Value = Body, .RawSize = RawSize, .RawHash = RawHash},
+ {},
+ Overwrite,
+ nullptr))
+ {
+ return Request.WriteResponse(HttpResponseCode::Conflict);
+ }
m_CacheStats.WriteCount++;
if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote))
@@ -1279,7 +1292,19 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
TotalCount++;
});
- m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}, ReferencedAttachments, nullptr);
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+
+ if (!m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ {.Value = Body},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr))
+ {
+ return Request.WriteResponse(HttpResponseCode::Conflict);
+ }
m_CacheStats.WriteCount++;
ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}",
@@ -1372,10 +1397,15 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv);
}
+ const bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal);
+
ZenCacheValue CacheValue;
CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments);
+ if (!m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite))
+ {
+ return Request.WriteResponse(HttpResponseCode::Conflict);
+ }
m_CacheStats.WriteCount++;
if (!WriteAttachmentBuffers.empty())
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 25f68330a..eaed1f64e 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1793,16 +1793,98 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
}
-void
+static bool
+ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function<IoHash()>& RawHashProvider)
+{
+ if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero))
+ {
+ return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash));
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ uint64_t ValueRawSize = 0;
+ IoHash ValueRawHash = IoHash::Zero;
+ return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) &&
+ (RawHashProvider() == ValueRawHash);
+ }
+
+ return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value));
+}
+
+static bool
+ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2)
+{
+ if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero))
+ {
+ return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; });
+ }
+ else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ uint64_t Value1RawSize = 0;
+ IoHash Value1RawHash = IoHash::Zero;
+ return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) &&
+ ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; });
+ }
+
+ return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); });
+}
+
+bool
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::Put");
metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
+ if (!Overwrite)
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ auto It = m_Index.find(HashKey);
+ if (It != m_Index.end())
+ {
+ PayloadIndex EntryIndex = It.value();
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+ DiskLocation Location = m_Payloads[EntryIndex].Location;
+
+ bool ComparisonComplete = false;
+ const BucketPayload* Payload = &m_Payloads[EntryIndex];
+ if (Payload->MetaData)
+ {
+ const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData];
+ if (MetaData)
+ {
+ if (!ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; }))
+ {
+ if (OptionalBatchHandle)
+ {
+ OptionalBatchHandle->OutResults.push_back(false);
+ }
+ return false;
+ }
+ ComparisonComplete = true;
+ }
+ }
+
+ if (!ComparisonComplete)
+ {
+ IndexLock.ReleaseNow();
+ ZenCacheValue ExistingValue;
+ if (Get(HashKey, ExistingValue) && !ValueMatchesValue(Value, ExistingValue))
+ {
+ if (OptionalBatchHandle)
+ {
+ OptionalBatchHandle->OutResults.push_back(false);
+ }
+ return false;
+ }
+ }
+ }
+ }
+
if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
{
PutStandaloneCacheValue(HashKey, Value, References);
@@ -1817,6 +1899,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
}
m_DiskWriteCount++;
+ return true;
}
uint64_t
@@ -3835,21 +3918,23 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc
}
}
-void
+bool
ZenCacheDiskLayer::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Put");
-
+ bool RetVal = false;
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket);
- Bucket->Put(HashKey, Value, References, BucketBatchHandle);
+ RetVal = Bucket->Put(HashKey, Value, References, Overwrite, BucketBatchHandle);
TryMemCacheTrim();
}
+ return RetVal;
}
void
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index cca51e63e..94072d22d 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -422,7 +422,19 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
CacheValue.Value = IoBuffer(Record.GetSize());
Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments, nullptr);
+ bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) &&
+ !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal);
+ if (!m_CacheStore.Put(Request.Context,
+ Request.Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ CacheValue,
+ ReferencedAttachments,
+ Overwrite,
+ nullptr))
+ {
+ return PutResult::Conflict;
+ }
m_CacheStats.WriteCount++;
if (!WriteAttachmentBuffers.empty())
@@ -753,18 +765,23 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed();
if (StoreLocal)
{
+ bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::QueryLocal);
std::vector<IoHash> ReferencedAttachments;
ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
const IoHash ValueHash = HashView.AsHash();
ReferencedAttachments.push_back(ValueHash);
});
- m_CacheStore.Put(Context,
- *Namespace,
- Key.Bucket,
- Key.Hash,
- {.Value = {Request.RecordCacheValue}},
- ReferencedAttachments,
- nullptr);
+ if (!m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = {Request.RecordCacheValue}},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr))
+ {
+ return;
+ }
m_CacheStats.WriteCount++;
}
ParseValues(Request);
@@ -962,20 +979,25 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
{
- IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal);
+ IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
Value.SetContentType(ZenContentType::kCompressedBinary);
if (RawSize == 0)
{
RawSize = Chunk.DecodeRawSize();
}
- m_CacheStore.Put(Context,
- *Namespace,
- Key.Bucket,
- Key.Hash,
- {.Value = Value, .RawSize = RawSize, .RawHash = RawHash},
- {},
- Batch.get());
- m_CacheStats.WriteCount++;
+ bool PutSucceeded = m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = Value, .RawSize = RawSize, .RawHash = RawHash},
+ {},
+ Overwrite,
+ Batch.get());
+ if (PutSucceeded)
+ {
+ m_CacheStats.WriteCount++;
+ }
if (Batch)
{
BatchResultIndexes.push_back(Results.size());
@@ -983,7 +1005,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
}
else
{
- Results.push_back(true);
+ Results.push_back(PutSucceeded);
}
TransferredSize = Chunk.GetCompressedSize();
}
@@ -1225,6 +1247,7 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
const bool HasData = IsCompressedBinary(Params.Value.GetContentType());
const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData);
const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ const bool Overwrite = StoreData && !EnumHasAllFlags(Request.Policy, CachePolicy::QueryLocal);
const bool IsHit = SkipData || HasData;
if (IsHit)
{
@@ -1235,14 +1258,18 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
if (HasData && StoreData)
{
- m_CacheStore.Put(Context,
- *Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
- {},
- nullptr);
- m_CacheStats.WriteCount++;
+ if (m_CacheStore.Put(
+ Context,
+ *Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
+ {},
+ Overwrite,
+ nullptr))
+ {
+ m_CacheStats.WriteCount++;
+ }
}
ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
@@ -1510,36 +1537,47 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
if (!UpstreamRecordRequests.empty())
{
- const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](
- CacheRecordGetCompleteParams&& Params) {
- if (!Params.Record)
- {
- return;
- }
- CacheKeyRequest& RecordKey = Params.Request;
- size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
- RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- RecordBody& Record = Records[RecordIndex];
-
- const CacheKey& Key = RecordKey.Key;
- Record.Exists = true;
- CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- Record.CacheValue.SetContentType(ZenContentType::kCbObject);
- Record.Source = Params.Source;
-
- bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (StoreLocal)
- {
- std::vector<IoHash> ReferencedAttachments;
- ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- ReferencedAttachments.push_back(ValueHash);
- });
- m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments, nullptr);
- m_CacheStats.WriteCount++;
- }
- };
+ const auto OnCacheRecordGetComplete =
+ [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) {
+ if (!Params.Record)
+ {
+ return;
+ }
+ CacheKeyRequest& RecordKey = Params.Request;
+ size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
+ RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
+ RecordBody& Record = Records[RecordIndex];
+
+ const CacheKey& Key = RecordKey.Key;
+ Record.Exists = true;
+ CbObject ObjectBuffer = CbObject::Clone(Params.Record);
+ Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
+ Record.CacheValue.SetContentType(ZenContentType::kCbObject);
+ Record.Source = Params.Source;
+
+ bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
+ {
+ bool Overwrite = !EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal);
+ std::vector<IoHash> ReferencedAttachments;
+ ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ ReferencedAttachments.push_back(ValueHash);
+ });
+ if (!m_CacheStore.Put(Context,
+ Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = Record.CacheValue},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr))
+ {
+ return;
+ }
+ m_CacheStats.WriteCount++;
+ }
+ };
m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
}
@@ -1748,20 +1786,24 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
if (StoreLocal)
{
+ bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal);
if (Request.IsRecordRequest)
{
m_CidStore.AddChunk(Params.Value, Params.RawHash);
}
else
{
- m_CacheStore.Put(Context,
- Namespace,
- Key.Key.Bucket,
- Key.Key.Hash,
- {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
- {},
- nullptr);
- m_CacheStats.WriteCount++;
+ if (m_CacheStore.Put(Context,
+ Namespace,
+ Key.Key.Bucket,
+ Key.Key.Hash,
+ {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
+ {},
+ Overwrite,
+ nullptr))
+ {
+ m_CacheStats.WriteCount++;
+ }
}
}
if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index 133cb42d7..25fd23b93 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -252,11 +252,12 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc
return;
}
-void
+bool
ZenCacheNamespace::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU(OptionalBatchHandle ? "Z$::Namespace::Put(Batched)" : "Z$::Namespace::Put");
@@ -268,8 +269,12 @@ ZenCacheNamespace::Put(std::string_view InBucket,
ZEN_ASSERT(Value.Value.Size());
ZenCacheDiskLayer::PutBatchHandle* DiskLayerBatchHandle = OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr;
- m_DiskLayer.Put(InBucket, HashKey, Value, References, DiskLayerBatchHandle);
- m_WriteCount++;
+ bool RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle);
+ if (RetVal)
+ {
+ m_WriteCount++;
+ }
+ return RetVal;
}
bool
@@ -716,13 +721,14 @@ ZenCacheStore::Get(const CacheRequestContext& Context,
m_MissCount++;
}
-void
+bool
ZenCacheStore::Put(const CacheRequestContext& Context,
std::string_view Namespace,
std::string_view Bucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatch* OptionalBatchHandle)
{
// Ad hoc rejection of known bad usage patterns for DDC bucket names
@@ -730,7 +736,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
if (IsKnownBadBucketName(Bucket))
{
m_RejectedWriteCount++;
- return;
+ return false;
}
ZEN_MEMSCOPE(GetCacheStoreTag());
@@ -760,9 +766,16 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
{
ZenCacheNamespace::PutBatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr;
- Store->Put(Bucket, HashKey, Value, References, BatchHandle);
- m_WriteCount++;
- return;
+ bool RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle);
+ if (RetVal)
+ {
+ m_WriteCount++;
+ }
+ else
+ {
+ m_RejectedWriteCount++;
+ }
+ return RetVal;
}
ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put [{}] bucket '{}', key '{}'",
@@ -770,6 +783,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
Namespace,
Bucket,
HashKey.ToHexString());
+ return false;
}
bool
@@ -1363,7 +1377,7 @@ TEST_CASE("cachestore.store")
Value.Value = Obj.GetBuffer().AsIoBuffer();
Value.Value.SetContentType(ZenContentType::kCbObject);
- Zcs.Put("test_bucket"sv, Key, Value, {});
+ Zcs.Put("test_bucket"sv, Key, Value, {}, false);
}
for (int i = 0; i < kIterationCount; ++i)
@@ -1417,7 +1431,7 @@ TEST_CASE("cachestore.size")
const size_t Bucket = Key % 4;
std::string BucketName = fmt::format("test_bucket-{}", Bucket);
IoHash Hash = IoHash::HashBuffer(&Key, sizeof(uint32_t));
- Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {});
+ Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}, false);
Keys.push_back({BucketName, Hash});
}
CacheSize = Zcs.StorageSize();
@@ -1471,7 +1485,7 @@ TEST_CASE("cachestore.size")
for (size_t Key = 0; Key < Count; ++Key)
{
const size_t Bucket = Key % 4;
- Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {});
+ Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}, false);
}
CacheSize = Zcs.StorageSize();
@@ -1554,7 +1568,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
for (const auto& Chunk : Chunks)
{
ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {});
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
WorkCompleted.fetch_add(1);
});
}
@@ -1635,7 +1649,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
for (const auto& Chunk : NewChunks)
{
ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {});
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
AddedChunkCount.fetch_add(1);
WorkCompleted.fetch_add(1);
});
@@ -1740,14 +1754,14 @@ TEST_CASE("cachestore.namespaces")
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {});
+ Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {}, false);
ZenCacheValue GetValue;
CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
// This should just be dropped as we don't allow creating of namespaces on the fly
- Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {});
+ Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {}, false);
CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
}
@@ -1763,7 +1777,7 @@ TEST_CASE("cachestore.namespaces")
IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
Buffer2.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue2 = {.Value = Buffer2};
- Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {});
+ Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {}, false);
ZenCacheValue GetValue;
CHECK(!Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue));
@@ -1805,7 +1819,7 @@ TEST_CASE("cachestore.drop.bucket")
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {});
+ Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false);
return Key;
};
auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
@@ -1878,7 +1892,7 @@ TEST_CASE("cachestore.drop.namespace")
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {});
+ Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false);
return Key;
};
auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
@@ -1964,7 +1978,7 @@ TEST_CASE("cachestore.blocked.disklayer.put")
size_t Key = Buffer.Size();
IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t));
- Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {});
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {}, false);
ZenCacheValue BufferGet;
CHECK(Zcs.Get("test_bucket", HashKey, BufferGet));
@@ -1974,7 +1988,7 @@ TEST_CASE("cachestore.blocked.disklayer.put")
Buffer2.SetContentType(ZenContentType::kCbObject);
// We should be able to overwrite even if the file is open for read
- Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {});
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {}, false);
MemoryView OldView = BufferGet.Value.GetView();
@@ -2065,7 +2079,7 @@ TEST_CASE("cachestore.scrub")
AttachmentHashes.push_back(Attachment.DecodeRawHash());
CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), AttachmentHashes.back());
}
- Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes);
+ Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes, false);
}
};
@@ -2114,7 +2128,8 @@ TEST_CASE("cachestore.newgc.basics")
{.Value = Record.second,
.RawSize = Record.second.GetSize(),
.RawHash = IoHash::HashBuffer(Record.second.GetData(), Record.second.GetSize())},
- AttachmentKeys);
+ AttachmentKeys,
+ false);
for (const auto& Attachment : Attachments)
{
CidStore.AddChunk(Attachment.second.GetCompressed().Flatten().AsIoBuffer(), Attachment.second.DecodeRawHash());
@@ -2130,7 +2145,8 @@ TEST_CASE("cachestore.newgc.basics")
{.Value = CacheValue.second,
.RawSize = CacheValue.second.GetSize(),
.RawHash = IoHash::HashBuffer(CacheValue.second.GetData(), CacheValue.second.GetSize())},
- {});
+ {},
+ false);
CacheEntries.insert({Key, CacheEntry{CacheValue.second, {}}});
return Key;
};
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index b0b4f22cb..7de707a7f 100644
--- a/src/zenstore/include/zenstore/cache/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
@@ -178,10 +178,11 @@ public:
PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
void EndPutBatch(PutBatchHandle* Batch) noexcept;
- void Put(std::string_view Bucket,
+ bool Put(std::string_view Bucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle);
bool Drop();
bool DropBucket(std::string_view Bucket);
@@ -228,13 +229,17 @@ public:
void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle);
struct PutBatchHandle;
- PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
- void EndPutBatch(PutBatchHandle* Batch) noexcept;
- void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, PutBatchHandle* OptionalBatchHandle);
- uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
- bool Drop();
- void Flush();
- void ScrubStorage(ScrubContext& Ctx);
+ PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
+ void EndPutBatch(PutBatchHandle* Batch) noexcept;
+ bool Put(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatchHandle* OptionalBatchHandle);
+ uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
+ bool Drop();
+ void Flush();
+ void ScrubStorage(ScrubContext& Ctx);
RwLock::SharedLockScope GetGcReferencerLock();
struct ReferencesStats
diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h
index da8cf69fe..d489a5386 100644
--- a/src/zenstore/include/zenstore/cache/cacherpc.h
+++ b/src/zenstore/include/zenstore/cache/cacherpc.h
@@ -60,6 +60,7 @@ enum class PutResult
{
Success,
Fail,
+ Conflict,
Invalid,
};
diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h
index 82fec9b0e..1b5e0b76b 100644
--- a/src/zenstore/include/zenstore/cache/structuredcachestore.h
+++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h
@@ -91,10 +91,11 @@ public:
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle);
- void Put(std::string_view Bucket,
+ bool Put(std::string_view Bucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle = nullptr);
bool DropBucket(std::string_view Bucket);
@@ -243,12 +244,13 @@ public:
const IoHash& HashKey,
GetBatch& BatchHandle);
- void Put(const CacheRequestContext& Context,
+ bool Put(const CacheRequestContext& Context,
std::string_view Namespace,
std::string_view Bucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatch* OptionalBatchHandle = nullptr);
bool DropBucket(std::string_view Namespace, std::string_view Bucket);