aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp227
1 files changed, 192 insertions, 35 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index fdf879e1f..e0030230f 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -233,6 +233,61 @@ using namespace std::literals;
namespace zen::cache::impl {
+static bool
+GetValueRawSizeAndHash(const ZenCacheValue& Value, uint64_t& OutValueRawSize, IoHash& OutValueRawHash)
+{
+ if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero))
+ {
+ OutValueRawSize = Value.RawSize;
+ OutValueRawHash = Value.RawHash;
+ return true;
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ return CompressedBuffer::ValidateCompressedHeader(Value.Value, OutValueRawHash, OutValueRawSize);
+ }
+
+ OutValueRawSize = Value.Value.GetSize();
+ OutValueRawHash = IoHash::HashBuffer(Value.Value);
+ return true;
+}
+
+static bool
+ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function<IoHash()>& RawHashProvider)
+{
+ if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero))
+ {
+ return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash));
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ uint64_t ValueRawSize = 0;
+ IoHash ValueRawHash = IoHash::Zero;
+ return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) &&
+ (RawHashProvider() == ValueRawHash);
+ }
+
+ return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value));
+}
+
+static bool
+ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2)
+{
+ if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero))
+ {
+ return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; });
+ }
+ else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ uint64_t Value1RawSize = 0;
+ IoHash Value1RawHash = IoHash::Zero;
+ return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) &&
+ ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; });
+ }
+
+ return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); });
+}
+
class BucketManifestSerializer
{
using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex;
@@ -1315,20 +1370,21 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle
{
- PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {}
struct Entry
{
std::vector<IoHash> HashKeyAndReferences;
+ bool Overwrite;
};
std::vector<IoBuffer> Buffers;
std::vector<Entry> Entries;
std::vector<size_t> EntryResultIndexes;
- std::vector<bool>& OutResults;
+ std::vector<ZenCacheDiskLayer::PutResult>& OutResults;
};
ZenCacheDiskLayer::CacheBucket::PutBatchHandle*
-ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults)
+ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResults)
{
ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch");
return new PutBatchHandle(OutResults);
@@ -1344,23 +1400,41 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
ZEN_ASSERT(Batch);
if (!Batch->Buffers.empty())
{
- std::vector<uint8_t> EntryFlags;
- for (const IoBuffer& Buffer : Batch->Buffers)
+ ZEN_ASSERT(Batch->Buffers.size() == Batch->Entries.size());
+ std::vector<uint8_t> EntryFlags;
+ std::vector<size_t> BufferToEntryIndexes;
+ std::vector<IoBuffer> BuffersToCommit;
+ BuffersToCommit.reserve(Batch->Buffers.size());
+ for (size_t Index = 0; Index < Batch->Entries.size(); Index++)
{
- uint8_t Flags = 0;
- if (Buffer.GetContentType() == ZenContentType::kCbObject)
+ const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences;
+ ZEN_ASSERT(HashKeyAndReferences.size() >= 1);
+
+ ZenCacheValue TemporaryValue;
+ TemporaryValue.Value = Batch->Buffers[Index];
+ std::span<const IoHash> ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end());
+ PutResult& OutResult = Batch->OutResults[Batch->EntryResultIndexes[Index]];
+ OutResult = PutResult{zen::PutStatus::Success};
+ if (!ShouldRejectPut(HashKeyAndReferences[0], TemporaryValue, ReferenceSpan, Batch->Entries[Index].Overwrite, OutResult))
{
- Flags |= DiskLocation::kStructured;
- }
- else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary)
- {
- Flags |= DiskLocation::kCompressed;
+ BufferToEntryIndexes.push_back(Index);
+ BuffersToCommit.push_back(TemporaryValue.Value);
+
+ uint8_t Flags = 0;
+ if (TemporaryValue.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ Flags |= DiskLocation::kStructured;
+ }
+ else if (TemporaryValue.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ Flags |= DiskLocation::kCompressed;
+ }
+ EntryFlags.push_back(Flags);
}
- EntryFlags.push_back(Flags);
}
size_t IndexOffset = 0;
- m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
+ m_BlockStore.WriteChunks(BuffersToCommit, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
ZEN_MEMSCOPE(GetCacheDiskTag());
std::vector<DiskIndexEntry> DiskEntries;
{
@@ -1368,7 +1442,8 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
for (size_t Index = 0; Index < Locations.size(); Index++)
{
DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]);
- const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences;
+ const std::vector<IoHash>& HashKeyAndReferences =
+ Batch->Entries[BufferToEntryIndexes[IndexOffset + Index]].HashKeyAndReferences;
ZEN_ASSERT(HashKeyAndReferences.size() >= 1);
const IoHash HashKey = HashKeyAndReferences[0];
DiskEntries.push_back({.Key = HashKey, .Location = Location});
@@ -1404,12 +1479,6 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
}
}
m_SlogFile.Append(DiskEntries);
- for (size_t Index = 0; Index < Locations.size(); Index++)
- {
- size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index];
- ZEN_ASSERT(ResultIndex < Batch->OutResults.size());
- Batch->OutResults[ResultIndex] = true;
- }
IndexOffset += Locations.size();
});
}
@@ -1905,30 +1974,105 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
}
-void
+bool
+ZenCacheDiskLayer::CacheBucket::ShouldRejectPut(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<const IoHash> References,
+ bool Overwrite,
+ ZenCacheDiskLayer::PutResult& OutPutResult)
+{
+ ZEN_UNUSED(References);
+
+ const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite;
+ if (CheckExisting)
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ auto It = m_Index.find(HashKey);
+ if (It != m_Index.end())
+ {
+ PayloadIndex EntryIndex = It.value();
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+ DiskLocation Location = m_Payloads[EntryIndex].Location;
+
+ bool ComparisonComplete = false;
+ const BucketPayload* Payload = &m_Payloads[EntryIndex];
+ if (Payload->MetaData)
+ {
+ const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData];
+ if (MetaData)
+ {
+ if (!cache::impl::ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; }))
+ {
+ OutPutResult = PutResult{
+ zen::PutStatus::Conflict,
+ fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)};
+ return true;
+ }
+ ComparisonComplete = true;
+ }
+ }
+
+ if (!ComparisonComplete)
+ {
+ // We must release the index lock before calling Get as that will acquire it.
+ // Having the Get method not acquire the index lock is not viable because Get has potentially multiple acquisitions
+ // of the index lock, first as a shared lock, then as an exclusive lock. If the caller has an exclusive lock, we
+ // could just accept that, and not have Get do any lock acquisitions, but it creates situations where Get is doing
+ // slow operations (eg: reading a file from disk) while under an exclusive index lock, and this would lead to
+ // responsiveness issues where zenserver could fail to respond to requests because of a long-held exclusive lock.
+ IndexLock.ReleaseNow();
+ ZenCacheValue ExistingValue;
+ if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue))
+ {
+ uint64_t RawSize = 0;
+ IoHash RawHash = IoHash::Zero;
+ cache::impl::GetValueRawSizeAndHash(ExistingValue, RawSize, RawHash);
+ OutPutResult = PutResult{zen::PutStatus::Conflict,
+ fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)};
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+}
+
+ZenCacheDiskLayer::PutResult
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::Put");
metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
+ PutResult Result{zen::PutStatus::Success};
+
if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
{
+ if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result))
+ {
+ if (OptionalBatchHandle)
+ {
+ OptionalBatchHandle->OutResults.push_back(Result);
+ }
+ return Result;
+ }
PutStandaloneCacheValue(HashKey, Value, References);
if (OptionalBatchHandle)
{
- OptionalBatchHandle->OutResults.push_back(true);
+ OptionalBatchHandle->OutResults.push_back({zen::PutStatus::Success});
}
}
else
{
- PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle);
+ Result = PutInlineCacheValue(HashKey, Value, References, Overwrite, OptionalBatchHandle);
}
m_DiskWriteCount++;
+ return Result;
}
uint64_t
@@ -2777,25 +2921,35 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck
return {};
}
-void
+ZenCacheDiskLayer::PutResult
ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue");
+ PutResult Result{zen::PutStatus::Success};
if (OptionalBatchHandle != nullptr)
{
OptionalBatchHandle->Buffers.push_back(Value.Value);
OptionalBatchHandle->Entries.push_back({});
OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size());
- OptionalBatchHandle->OutResults.push_back(false);
+ OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail});
+ PutBatchHandle::Entry& CurrentEntry = OptionalBatchHandle->Entries.back();
+ CurrentEntry.Overwrite = Overwrite;
std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences;
HashKeyAndReferences.reserve(1 + References.size());
HashKeyAndReferences.push_back(HashKey);
HashKeyAndReferences.insert(HashKeyAndReferences.end(), References.begin(), References.end());
- return;
+ return Result;
+ }
+
+ if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result))
+ {
+ return Result;
}
+
uint8_t EntryFlags = 0;
if (Value.Value.GetContentType() == ZenContentType::kCbObject)
@@ -2845,6 +2999,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
}
m_SlogFile.Append({.Key = HashKey, .Location = Location});
});
+ return Result;
}
std::string
@@ -3781,7 +3936,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
struct ZenCacheDiskLayer::PutBatchHandle
{
- PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {}
struct BucketHandle
{
CacheBucket* Bucket;
@@ -3840,13 +3995,13 @@ struct ZenCacheDiskLayer::PutBatchHandle
return NewBucketHandle;
}
- RwLock Lock;
- std::vector<BucketHandle> BucketHandles;
- std::vector<bool>& OutResults;
+ RwLock Lock;
+ std::vector<BucketHandle> BucketHandles;
+ std::vector<ZenCacheDiskLayer::PutResult>& OutResults;
};
ZenCacheDiskLayer::PutBatchHandle*
-ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults)
+ZenCacheDiskLayer::BeginPutBatch(std::vector<PutResult>& OutResults)
{
return new PutBatchHandle(OutResults);
}
@@ -3983,21 +4138,23 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc
}
}
-void
+ZenCacheDiskLayer::PutResult
ZenCacheDiskLayer::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Put");
-
+ PutResult RetVal = {zen::PutStatus::Fail};
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket);
- Bucket->Put(HashKey, Value, References, BucketBatchHandle);
+ RetVal = Bucket->Put(HashKey, Value, References, Overwrite, BucketBatchHandle);
TryMemCacheTrim();
}
+ return RetVal;
}
void