aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache
diff options
context:
space:
mode:
authorzousar <[email protected]>2025-06-24 16:26:29 -0600
committerzousar <[email protected]>2025-06-24 16:26:29 -0600
commitbb298631ba35a323827dda0b8cd6158e276b5f61 (patch)
tree7ba8db91c44ce83f2c518f80f80ab14910eefa6f /src/zenstore/cache
parentChange to PutResult structure (diff)
parent5.6.14 (diff)
downloadzen-bb298631ba35a323827dda0b8cd6158e276b5f61.tar.xz
zen-bb298631ba35a323827dda0b8cd6158e276b5f61.zip
Merge branch 'main' into zs/put-overwrite-policy
Diffstat (limited to 'src/zenstore/cache')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp671
-rw-r--r--src/zenstore/cache/cacherpc.cpp116
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp51
3 files changed, 530 insertions, 308 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 72a767645..1ebb8f144 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -14,6 +14,7 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zencore/xxhash.h>
+#include <zenutil/parallelwork.h>
#include <zenutil/referencemetadata.h>
#include <zenutil/workerpools.h>
@@ -195,34 +196,33 @@ namespace cache::impl {
return true;
}
- bool MoveAndDeleteDirectory(const std::filesystem::path& Dir)
+ std::filesystem::path MoveDroppedDirectory(const std::filesystem::path& Dir)
{
int DropIndex = 0;
do
{
- if (!std::filesystem::exists(Dir))
+ if (!IsDir(Dir))
{
- return false;
+ return {};
}
std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex);
std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName;
- if (std::filesystem::exists(DroppedBucketPath))
+ if (IsDir(DroppedBucketPath))
{
DropIndex++;
continue;
}
std::error_code Ec;
- std::filesystem::rename(Dir, DroppedBucketPath, Ec);
+ RenameDirectory(Dir, DroppedBucketPath, Ec);
if (!Ec)
{
- DeleteDirectories(DroppedBucketPath);
- return true;
+ return DroppedBucketPath;
}
- // TODO: Do we need to bail at some point?
zen::Sleep(100);
- } while (true);
+ } while (DropIndex < 10);
+ return {};
}
} // namespace cache::impl
@@ -373,10 +373,10 @@ private:
#pragma pack(4)
struct ManifestData
{
- uint32_t RawSize; // 4
- AccessTime Timestamp; // 4
- IoHash RawHash; // 20
- IoHash Key; // 20
+ uint32_t RawSize; // 4
+ uint32_t SecondsSinceEpoch; // 4
+ IoHash RawHash; // 20
+ IoHash Key; // 20
};
#pragma pack(pop)
@@ -658,7 +658,7 @@ BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& B
ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex];
- AccessTimes[PlIndex] = Entry->Timestamp;
+ AccessTimes[PlIndex].SetSecondsSinceEpoch(Entry->SecondsSinceEpoch);
if (Entry->RawSize && Entry->RawHash != IoHash::Zero)
{
@@ -685,6 +685,16 @@ BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&,
{
ZEN_TRACE_CPU("Z$::WriteSidecarFile");
+ ZEN_DEBUG("writing store sidecar for '{}'", SidecarPath);
+ const uint64_t EntryCount = Index.size();
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("wrote store sidecar for '{}' containing {} entries in {}",
+ SidecarPath,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
BucketMetaHeader Header;
Header.EntryCount = m_ManifestEntryCount;
Header.LogPosition = SnapshotLogPosition;
@@ -702,43 +712,44 @@ BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&,
SidecarFile.Write(&Header, sizeof Header, 0);
- // TODO: make this batching for better performance
{
uint64_t WriteOffset = sizeof Header;
- // BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024);
+ const size_t MaxManifestDataBufferCount = (512u * 1024u) / sizeof(ManifestData);
- std::vector<ManifestData> ManifestDataBuffer;
- const size_t MaxManifestDataBufferCount = Min(Index.size(), 8192u); // 512 Kb
- ManifestDataBuffer.reserve(MaxManifestDataBufferCount);
+ std::vector<ManifestData> ManifestDataBuffer(Min(m_ManifestEntryCount, MaxManifestDataBufferCount));
+ auto WriteIt = ManifestDataBuffer.begin();
for (auto& Kv : Index)
{
- const IoHash& Key = Kv.first;
- const PayloadIndex PlIndex = Kv.second;
+ ManifestData& Data = *WriteIt++;
- IoHash RawHash = IoHash::Zero;
- uint32_t RawSize = 0;
+ const PayloadIndex PlIndex = Kv.second;
+ Data.Key = Kv.first;
+ Data.SecondsSinceEpoch = AccessTimes[PlIndex].GetSecondsSinceEpoch();
if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData)
{
- RawHash = MetaDatas[MetaIndex].RawHash;
- RawSize = MetaDatas[MetaIndex].RawSize;
+ Data.RawHash = MetaDatas[MetaIndex].RawHash;
+ Data.RawSize = MetaDatas[MetaIndex].RawSize;
+ }
+ else
+ {
+ Data.RawHash = IoHash::Zero;
+ Data.RawSize = 0;
}
- ManifestDataBuffer.emplace_back(
- ManifestData{.RawSize = RawSize, .Timestamp = AccessTimes[PlIndex], .RawHash = RawHash, .Key = Key});
- if (ManifestDataBuffer.size() == MaxManifestDataBufferCount)
+ if (WriteIt == ManifestDataBuffer.end())
{
- const uint64_t WriteSize = sizeof(ManifestData) * ManifestDataBuffer.size();
+ uint64_t WriteSize = std::distance(ManifestDataBuffer.begin(), WriteIt) * sizeof(ManifestData);
SidecarFile.Write(ManifestDataBuffer.data(), WriteSize, WriteOffset);
WriteOffset += WriteSize;
- ManifestDataBuffer.clear();
- ManifestDataBuffer.reserve(MaxManifestDataBufferCount);
+ WriteIt = ManifestDataBuffer.begin();
}
}
- if (ManifestDataBuffer.size() > 0)
+ if (WriteIt != ManifestDataBuffer.begin())
{
- SidecarFile.Write(ManifestDataBuffer.data(), sizeof(ManifestData) * ManifestDataBuffer.size(), WriteOffset);
+ uint64_t WriteSize = std::distance(ManifestDataBuffer.begin(), WriteIt) * sizeof(ManifestData);
+ SidecarFile.Write(ManifestDataBuffer.data(), WriteSize, WriteOffset);
}
}
@@ -763,11 +774,11 @@ namespace zen {
ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
std::atomic_uint64_t& OuterCacheMemoryUsage,
- std::string BucketName,
+ std::string_view BucketName,
const BucketConfiguration& Config)
: m_Gc(Gc)
, m_OuterCacheMemoryUsage(OuterCacheMemoryUsage)
-, m_BucketName(std::move(BucketName))
+, m_BucketName(BucketName)
, m_Configuration(Config)
, m_BucketId(Oid::Zero)
{
@@ -795,6 +806,16 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
ZenCacheDiskLayer::CacheBucket::~CacheBucket()
{
+ try
+ {
+ m_SlogFile.Flush();
+ m_SlogFile.Close();
+ m_BlockStore.Close();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~CacheBucket() failed with: ", Ex.what());
+ }
m_Gc.RemoveGcReferencer(*this);
}
@@ -868,12 +889,13 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
}
void
-ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition, const std::function<uint64_t()>& ClaimDiskReserveFunc)
+ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(uint64_t LogPosition,
+ bool ResetLog,
+ const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot");
- const uint64_t LogCount = FlushLockPosition ? 0 : m_SlogFile.GetLogCount();
- if (m_LogFlushPosition == LogCount)
+ if (m_LogFlushPosition == LogPosition)
{
return;
}
@@ -890,7 +912,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition,
namespace fs = std::filesystem;
- fs::path IndexPath = cache::impl::GetIndexPath(m_BucketDir, m_BucketName);
+ const fs::path IndexPath = cache::impl::GetIndexPath(m_BucketDir, m_BucketName);
try
{
@@ -922,66 +944,70 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition,
throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir));
}
- {
- // This is in a separate scope just to ensure IndexWriter goes out
- // of scope before the file is flushed/closed, in order to ensure
- // all data is written to the file
- BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024);
+ const uint64_t IndexLogPosition = ResetLog ? 0 : LogPosition;
- cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount,
- .LogPosition = LogCount,
- .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
+ cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount,
+ .LogPosition = IndexLogPosition,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
- Header.Checksum = cache::impl::CacheBucketIndexHeader::ComputeChecksum(Header);
- IndexWriter.Write(&Header, sizeof(cache::impl::CacheBucketIndexHeader), 0);
+ Header.Checksum = cache::impl::CacheBucketIndexHeader::ComputeChecksum(Header);
+ ObjectIndexFile.Write(&Header, sizeof(cache::impl::CacheBucketIndexHeader), 0);
+ if (EntryCount > 0)
+ {
uint64_t IndexWriteOffset = sizeof(cache::impl::CacheBucketIndexHeader);
+ size_t MaxWriteEntryCount = (512u * 1024u) / sizeof(DiskIndexEntry);
+ std::vector<DiskIndexEntry> DiskEntryBuffer(Min(m_Index.size(), MaxWriteEntryCount));
+
+ auto WriteIt = DiskEntryBuffer.begin();
for (auto& Entry : m_Index)
{
- DiskIndexEntry IndexEntry;
- IndexEntry.Key = Entry.first;
- IndexEntry.Location = m_Payloads[Entry.second].Location;
- IndexWriter.Write(&IndexEntry, sizeof(DiskIndexEntry), IndexWriteOffset);
-
- IndexWriteOffset += sizeof(DiskIndexEntry);
+ *WriteIt++ = {.Key = Entry.first, .Location = m_Payloads[Entry.second].Location};
+ if (WriteIt == DiskEntryBuffer.end())
+ {
+ uint64_t WriteSize = std::distance(DiskEntryBuffer.begin(), WriteIt) * sizeof(DiskIndexEntry);
+ ObjectIndexFile.Write(DiskEntryBuffer.data(), WriteSize, IndexWriteOffset);
+ IndexWriteOffset += WriteSize;
+ WriteIt = DiskEntryBuffer.begin();
+ }
}
- IndexWriter.Flush();
+ if (WriteIt != DiskEntryBuffer.begin())
+ {
+ uint64_t WriteSize = std::distance(DiskEntryBuffer.begin(), WriteIt) * sizeof(DiskIndexEntry);
+ ObjectIndexFile.Write(DiskEntryBuffer.data(), WriteSize, IndexWriteOffset);
+ }
}
ObjectIndexFile.Flush();
ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec);
if (Ec)
{
- std::filesystem::path TempFilePath = ObjectIndexFile.GetPath();
- ZEN_WARN("snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", TempFilePath, IndexPath, Ec.message());
+ throw std::system_error(Ec,
+ fmt::format("Snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'",
+ ObjectIndexFile.GetPath(),
+ IndexPath,
+ Ec.message()));
}
- else
+
+ if (ResetLog)
{
- // We must only update the log flush position once the snapshot write succeeds
- if (FlushLockPosition)
- {
- std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName);
+ const std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName);
- if (std::filesystem::is_regular_file(LogPath))
+ if (IsFile(LogPath))
+ {
+ m_SlogFile.Close();
+ if (!RemoveFile(LogPath, Ec) || Ec)
{
- if (!std::filesystem::remove(LogPath, Ec) || Ec)
- {
- ZEN_WARN("snapshot failed to clean log file '{}', removing index at '{}', reason: '{}'",
- LogPath,
- IndexPath,
- Ec.message());
- std::error_code RemoveIndexEc;
- std::filesystem::remove(IndexPath, RemoveIndexEc);
- }
+ // This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in
+ // the end it will be the same result
+ ZEN_WARN("snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message());
}
- }
- if (!Ec)
- {
- m_LogFlushPosition = LogCount;
+ m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
}
}
+ m_LogFlushPosition = IndexLogPosition;
}
catch (const std::exception& Err)
{
@@ -994,7 +1020,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const
{
ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile");
- if (!std::filesystem::is_regular_file(IndexPath))
+ if (!IsFile(IndexPath))
{
return 0;
}
@@ -1078,7 +1104,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::
{
ZEN_TRACE_CPU("Z$::Bucket::ReadLog");
- if (!std::filesystem::is_regular_file(LogPath))
+ if (!IsFile(LogPath))
{
return 0;
}
@@ -1158,47 +1184,40 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco
if (IsNew)
{
- fs::remove(LogPath);
- fs::remove(IndexPath);
- fs::remove_all(m_BlocksBasePath);
+ RemoveFile(LogPath);
+ RemoveFile(IndexPath);
+ DeleteDirectories(m_BlocksBasePath);
}
CreateDirectories(m_BucketDir);
m_BlockStore.Initialize(m_BlocksBasePath, m_Configuration.MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1);
- if (std::filesystem::is_regular_file(IndexPath))
+ if (IsFile(IndexPath))
{
uint32_t IndexVersion = 0;
m_LogFlushPosition = ReadIndexFile(IndexLock, IndexPath, IndexVersion);
if (IndexVersion == 0)
{
ZEN_WARN("removing invalid index file at '{}'", IndexPath);
- std::filesystem::remove(IndexPath);
+ RemoveFile(IndexPath);
}
}
uint64_t LogEntryCount = 0;
- if (std::filesystem::is_regular_file(LogPath))
+ if (IsFile(LogPath))
{
if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath))
{
LogEntryCount = ReadLog(IndexLock, LogPath, m_LogFlushPosition);
}
- else if (fs::is_regular_file(LogPath))
+ else if (IsFile(LogPath))
{
ZEN_WARN("removing invalid log at '{}'", LogPath);
- std::filesystem::remove(LogPath);
+ RemoveFile(LogPath);
}
}
- if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0)
- {
- WriteIndexSnapshot(IndexLock, /*Flush log*/ true);
- }
-
- m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
-
BlockStore::BlockIndexSet KnownBlocks;
for (const auto& Entry : m_Index)
{
@@ -1216,7 +1235,53 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco
KnownBlocks.insert(BlockIndex);
}
}
- m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+ BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+ m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ bool RemovedEntries = false;
+ if (!MissingBlocks.empty())
+ {
+ std::vector<DiskIndexEntry> MissingEntries;
+
+ for (auto& It : m_Index)
+ {
+ BucketPayload& Payload = m_Payloads[It.second];
+ DiskLocation Location = Payload.Location;
+ if (!Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ if (MissingBlocks.contains(Location.Location.BlockLocation.GetBlockIndex()))
+ {
+ RemoveMemCachedData(IndexLock, Payload);
+ RemoveMetaData(IndexLock, Payload);
+ }
+ }
+ Location.Flags |= DiskLocation::kTombStone;
+ MissingEntries.push_back(DiskIndexEntry{.Key = It.first, .Location = Location});
+ }
+
+ ZEN_ASSERT(!MissingEntries.empty());
+
+ for (const DiskIndexEntry& Entry : MissingEntries)
+ {
+ m_Index.erase(Entry.Key);
+ }
+ m_SlogFile.Append(MissingEntries);
+ m_SlogFile.Flush();
+ {
+ std::vector<BucketPayload> Payloads;
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketMetaData> MetaDatas;
+ std::vector<MemCacheData> MemCachedPayloads;
+ IndexMap Index;
+ CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index);
+ }
+ RemovedEntries = true;
+ }
+
+ if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0 || RemovedEntries)
+ {
+ WriteIndexSnapshot(IndexLock, m_SlogFile.GetLogCount(), /*Flush log*/ true);
+ }
}
void
@@ -1384,7 +1449,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle
{
- GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults)
+ GetBatchHandle(ZenCacheValueVec_t& OutResults) : OutResults(OutResults)
{
Keys.reserve(OutResults.capacity());
ResultIndexes.reserve(OutResults.capacity());
@@ -1395,11 +1460,11 @@ struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle
std::vector<IoHash> Keys;
std::vector<size_t> ResultIndexes;
- std::vector<ZenCacheValue>& OutResults;
+ ZenCacheValueVec_t& OutResults;
};
ZenCacheDiskLayer::CacheBucket::GetBatchHandle*
-ZenCacheDiskLayer::CacheBucket::BeginGetBatch(std::vector<ZenCacheValue>& OutResult)
+ZenCacheDiskLayer::CacheBucket::BeginGetBatch(ZenCacheValueVec_t& OutResult)
{
ZEN_TRACE_CPU("Z$::Bucket::BeginGetBatch");
return new GetBatchHandle(OutResult);
@@ -1419,13 +1484,13 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
if (!Batch->ResultIndexes.empty())
{
- std::vector<DiskLocation> StandaloneDiskLocations;
- std::vector<size_t> StandaloneKeyIndexes;
- std::vector<size_t> MemCachedKeyIndexes;
- std::vector<DiskLocation> InlineDiskLocations;
- std::vector<BlockStoreLocation> InlineBlockLocations;
- std::vector<size_t> InlineKeyIndexes;
- std::vector<bool> FillRawHashAndRawSize(Batch->Keys.size(), false);
+ eastl::fixed_vector<DiskLocation, 16> StandaloneDiskLocations;
+ eastl::fixed_vector<size_t, 16> StandaloneKeyIndexes;
+ eastl::fixed_vector<size_t, 16> MemCachedKeyIndexes;
+ eastl::fixed_vector<DiskLocation, 16> InlineDiskLocations;
+ eastl::fixed_vector<BlockStoreLocation, 16> InlineBlockLocations;
+ eastl::fixed_vector<size_t, 16> InlineKeyIndexes;
+ eastl::fixed_vector<bool, 16> FillRawHashAndRawSize(Batch->Keys.size(), false);
{
RwLock::SharedLockScope IndexLock(m_IndexLock);
for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++)
@@ -1479,6 +1544,13 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
}
}
}
+ else
+ {
+ if (m_Configuration.MemCacheSizeThreshold > 0)
+ {
+ m_MemoryMissCount++;
+ }
+ }
}
}
@@ -1487,7 +1559,7 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
// Often we will find the metadata due to the thread setting the mem cached part doing it before us so it is worth
// checking if it is present once more before spending time fetching and setting the RawHash and RawSize in metadata
- auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value) {
+ auto FillOne = [&](const DiskLocation& Location, size_t KeyIndex, IoBuffer&& Value, bool UsesTemporaryMemory) {
if (!Value)
{
return;
@@ -1510,6 +1582,12 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
}
}
+ if (AddToMemCache || UsesTemporaryMemory)
+ {
+ // We need to own it if we want to add it to the memcache or the buffer is just a range of the block iteration buffer
+ OutValue.Value.MakeOwned();
+ }
+
if (SetMetaInfo)
{
// See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the
@@ -1581,33 +1659,42 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
if (!InlineDiskLocations.empty())
{
ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline");
- m_BlockStore.IterateChunks(InlineBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
- // Only read into memory the IoBuffers we could potentially add to memcache
- const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 1u * 1024u);
- m_BlockStore.IterateBlock(
- InlineBlockLocations,
- ChunkIndexes,
- [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
- const void* Data,
- uint64_t Size) -> bool {
- if (Data != nullptr)
- {
- FillOne(InlineDiskLocations[ChunkIndex],
- InlineKeyIndexes[ChunkIndex],
- IoBufferBuilder::MakeCloneFromMemory(Data, Size));
- }
- return true;
- },
- [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
- BlockStoreFile& File,
- uint64_t Offset,
- uint64_t Size) -> bool {
- FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size));
- return true;
- },
- LargeChunkSizeLimit);
- return true;
- });
+ m_BlockStore.IterateChunks(std::span{begin(InlineBlockLocations), end(InlineBlockLocations)},
+ [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
+ // Up to 8KB or m_Configuration.MemCacheSizeThreshold depending on configuration
+ const uint64_t LargeChunkSizeLimit =
+ m_Configuration.MemCacheSizeThreshold == 0
+ ? Min(m_Configuration.LargeObjectThreshold, 8u * 1024u)
+ : Max(m_Configuration.MemCacheSizeThreshold, 8u * 1024u);
+
+ m_BlockStore.IterateBlock(
+ std::span{begin(InlineBlockLocations), end(InlineBlockLocations)},
+ ChunkIndexes,
+ [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
+ const void* Data,
+ uint64_t Size) -> bool {
+ if (Data != nullptr)
+ {
+ FillOne(InlineDiskLocations[ChunkIndex],
+ InlineKeyIndexes[ChunkIndex],
+ IoBufferBuilder::MakeFromMemory(MemoryView(Data, Size)),
+ /*UsesTemporaryMemory*/ true);
+ }
+ return true;
+ },
+ [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
+ BlockStoreFile& File,
+ uint64_t Offset,
+ uint64_t Size) -> bool {
+ FillOne(InlineDiskLocations[ChunkIndex],
+ InlineKeyIndexes[ChunkIndex],
+ File.GetChunk(Offset, Size),
+ /*UsesTemporaryMemory*/ false);
+ return true;
+ },
+ LargeChunkSizeLimit);
+ return true;
+ });
}
if (!StandaloneDiskLocations.empty())
@@ -1617,7 +1704,7 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
{
size_t KeyIndex = StandaloneKeyIndexes[Index];
const DiskLocation& Location = StandaloneDiskLocations[Index];
- FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex]));
+ FillOne(Location, KeyIndex, GetStandaloneCacheValue(Location, Batch->Keys[KeyIndex]), /*UsesTemporaryMemory*/ false);
}
}
@@ -1697,10 +1784,6 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
else
{
m_DiskMissCount++;
- if (m_Configuration.MemCacheSizeThreshold > 0)
- {
- m_MemoryMissCount++;
- }
}
}
}
@@ -2029,11 +2112,13 @@ ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock
}
}
-bool
+std::function<void()>
ZenCacheDiskLayer::CacheBucket::Drop()
{
ZEN_TRACE_CPU("Z$::Bucket::Drop");
+ m_Gc.RemoveGcReferencer(*this);
+
RwLock::ExclusiveLockScope _(m_IndexLock);
std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks;
@@ -2045,7 +2130,7 @@ ZenCacheDiskLayer::CacheBucket::Drop()
m_BlockStore.Close();
m_SlogFile.Close();
- const bool Deleted = cache::impl::MoveAndDeleteDirectory(m_BucketDir);
+ std::filesystem::path DroppedPath = cache::impl::MoveDroppedDirectory(m_BucketDir);
m_Index.clear();
m_Payloads.clear();
@@ -2058,7 +2143,21 @@ ZenCacheDiskLayer::CacheBucket::Drop()
m_OuterCacheMemoryUsage.fetch_sub(m_MemCachedSize.load());
m_MemCachedSize.store(0);
- return Deleted;
+ if (DroppedPath.empty())
+ {
+ return {};
+ }
+ else
+ {
+ return [DroppedPath = std::move(DroppedPath)]() {
+ std::error_code Ec;
+ (void)DeleteDirectories(DroppedPath, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to clean up dropped bucket directory '{}', reason: '{}'", DroppedPath, Ec.message());
+ }
+ };
+ }
}
void
@@ -2093,6 +2192,9 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot");
try
{
+ // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock
+ const uint64_t LogPosition = m_SlogFile.GetLogCount();
+
bool UseLegacyScheme = false;
IoBuffer Buffer;
@@ -2107,7 +2209,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
{
RwLock::SharedLockScope IndexLock(m_IndexLock);
- WriteIndexSnapshot(IndexLock, /*Flush log*/ false);
+ WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false);
// Note: this copy could be eliminated on shutdown to
// reduce memory usage and execution time
Index = m_Index;
@@ -2147,7 +2249,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
else
{
RwLock::SharedLockScope IndexLock(m_IndexLock);
- WriteIndexSnapshot(IndexLock, /*Flush log*/ false);
+ WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false);
const uint64_t EntryCount = m_Index.size();
Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount);
uint64_t SidecarSize = ManifestWriter.GetSidecarSize();
@@ -2257,7 +2359,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
std::error_code Ec;
- uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec);
+ uintmax_t size = FileSizeFromPath(DataFilePath.ToPath(), Ec);
if (Ec)
{
ReportBadKey(HashKey);
@@ -2398,11 +2500,11 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
BuildPath(Path, Entry.Key);
fs::path FilePath = Path.ToPath();
RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key));
- if (fs::is_regular_file(FilePath))
+ if (IsFile(FilePath))
{
ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8());
std::error_code Ec;
- fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file...
+ RemoveFile(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file...
}
}
}
@@ -2535,7 +2637,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
if (CleanUpTempFile)
{
std::error_code Ec;
- std::filesystem::remove(DataFile.GetPath(), Ec);
+ RemoveFile(DataFile.GetPath(), Ec);
if (Ec)
{
ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'",
@@ -2563,7 +2665,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
// We do a speculative remove of the file instead of probing with a exists call and check the error code instead
- std::filesystem::remove(FsPath, Ec);
+ RemoveFile(FsPath, Ec);
if (Ec)
{
if (Ec.value() != ENOENT)
@@ -2571,7 +2673,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message());
Sleep(100);
Ec.clear();
- std::filesystem::remove(FsPath, Ec);
+ RemoveFile(FsPath, Ec);
if (Ec && Ec.value() != ENOENT)
{
throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir));
@@ -2796,7 +2898,6 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
ZEN_MEMSCOPE(GetCacheDiskTag());
ZEN_TRACE_CPU("Z$::Bucket::UpdateLocation");
DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags);
- m_SlogFile.Append({.Key = HashKey, .Location = Location});
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
if (m_TrackedCacheKeys)
@@ -2826,6 +2927,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
m_AccessTimes.emplace_back(GcClock::TickCount());
m_Index.insert_or_assign(HashKey, EntryIndex);
}
+ m_SlogFile.Append({.Key = HashKey, .Location = Location});
});
}
@@ -2842,9 +2944,10 @@ class DiskBucketStoreCompactor : public GcStoreCompactor
using CacheBucket = ZenCacheDiskLayer::CacheBucket;
public:
- DiskBucketStoreCompactor(CacheBucket& Bucket, std::vector<std::pair<IoHash, uint64_t>>&& ExpiredStandaloneKeys)
+ DiskBucketStoreCompactor(CacheBucket& Bucket, std::vector<std::pair<IoHash, uint64_t>>&& ExpiredStandaloneKeys, bool FlushBucket)
: m_Bucket(Bucket)
, m_ExpiredStandaloneKeys(std::move(ExpiredStandaloneKeys))
+ , m_FlushBucket(FlushBucket)
{
m_ExpiredStandaloneKeys.shrink_to_fit();
}
@@ -2902,7 +3005,7 @@ public:
ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': deleting standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8());
std::error_code Ec;
- if (!fs::remove(FilePath, Ec))
+ if (!RemoveFile(FilePath, Ec))
{
continue;
}
@@ -2923,7 +3026,7 @@ public:
ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': checking standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8());
std::error_code Ec;
- bool Existed = std::filesystem::is_regular_file(FilePath, Ec);
+ bool Existed = IsFile(FilePath, Ec);
if (Ec)
{
ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': failed checking cache payload file '{}'. Reason '{}'",
@@ -3023,10 +3126,12 @@ public:
m_Bucket.m_BlockStore.CompactBlocks(
BlockCompactState,
m_Bucket.m_Configuration.PayloadAlignment,
- [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) {
+ [&](const BlockStore::MovedChunksArray& MovedArray,
+ const BlockStore::ChunkIndexArray& ScrubbedArray,
+ uint64_t FreedDiskSpace) {
std::vector<DiskIndexEntry> MovedEntries;
MovedEntries.reserve(MovedArray.size());
- RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock);
+ RwLock::ExclusiveLockScope IndexLock(m_Bucket.m_IndexLock);
for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray)
{
size_t ChunkIndex = Moved.first;
@@ -3048,6 +3153,24 @@ public:
MovedEntries.push_back({.Key = Key, .Location = Payload.Location});
}
}
+
+ for (size_t ScrubbedIndex : ScrubbedArray)
+ {
+ const IoHash& Key = BlockCompactStateKeys[ScrubbedIndex];
+
+ if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end())
+ {
+ BucketPayload& Payload = m_Bucket.m_Payloads[It->second];
+ DiskLocation Location = Payload.Location;
+
+ m_Bucket.RemoveMemCachedData(IndexLock, Payload);
+ m_Bucket.RemoveMetaData(IndexLock, Payload);
+
+ Location.Flags |= DiskLocation::kTombStone;
+ MovedEntries.push_back(DiskIndexEntry{.Key = Key, .Location = Location});
+ }
+ }
+
m_Bucket.m_SlogFile.Append(MovedEntries);
Stats.RemovedDisk += FreedDiskSpace;
if (Ctx.IsCancelledFlag.load())
@@ -3071,6 +3194,10 @@ public:
}
}
}
+ if (m_FlushBucket)
+ {
+ m_Bucket.Flush();
+ }
}
virtual std::string GetGcName(GcCtx& Ctx) override { return m_Bucket.GetGcName(Ctx); }
@@ -3078,6 +3205,7 @@ public:
private:
ZenCacheDiskLayer::CacheBucket& m_Bucket;
std::vector<std::pair<IoHash, uint64_t>> m_ExpiredStandaloneKeys;
+ bool m_FlushBucket = false;
};
GcStoreCompactor*
@@ -3101,24 +3229,6 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
NiceBytes(Stats.FreedMemory),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
- if (Stats.DeletedCount > 0)
- {
- bool Expected = false;
- if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
- {
- return;
- }
- auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
-
- try
- {
- SaveSnapshot([]() { return 0; });
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed to write index and manifest after RemoveExpiredData in '{}'. Reason: '{}'", m_BucketDir, Ex.what());
- }
- }
});
const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count();
@@ -3170,7 +3280,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
return nullptr;
}
- if (Ctx.Settings.IsDeleteMode)
+ if (Ctx.Settings.IsDeleteMode && !ExpiredEntries.empty())
{
for (const DiskIndexEntry& Entry : ExpiredEntries)
{
@@ -3205,7 +3315,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
return nullptr;
}
- return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys));
+ return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys), /*FlushBucket*/ Stats.DeletedCount > 0);
}
bool
@@ -3395,7 +3505,7 @@ ZenCacheDiskLayer::CacheBucket::GetReferences(const LoggerRef& Logger,
CaptureAttachments(ChunkIndex, File.GetChunk(Offset, Size).GetView());
return !IsCancelledFlag.load();
},
- 0);
+ 32u * 1024);
if (Continue)
{
@@ -3698,11 +3808,11 @@ ZenCacheDiskLayer::CacheBucket*
ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
{
ZEN_TRACE_CPU("Z$::GetOrCreateBucket");
- const auto BucketName = std::string(InBucket);
{
RwLock::SharedLockScope SharedLock(m_Lock);
- if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
+ if (auto It = m_Buckets.find_as(InBucket, std::hash<std::string_view>(), eastl::equal_to_2<std::string, std::string_view>());
+ It != m_Buckets.end())
{
return It->second.get();
}
@@ -3710,31 +3820,40 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
// We create the bucket without holding a lock since contructor calls GcManager::AddGcReferencer which takes an exclusive lock.
// This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock
- std::unique_ptr<CacheBucket> Bucket(
- std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig));
+ BucketConfiguration* BucketConfig = &m_Configuration.BucketConfig;
+ if (auto It = m_Configuration.BucketConfigMap.find_as(InBucket,
+ std::hash<std::string_view>(),
+ eastl::equal_to_2<std::string, std::string_view>());
+ It != m_Configuration.BucketConfigMap.end())
+ {
+ BucketConfig = &It->second;
+ }
+ std::unique_ptr<CacheBucket> Bucket(std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, InBucket, *BucketConfig));
RwLock::ExclusiveLockScope Lock(m_Lock);
- if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
+ if (auto It = m_Buckets.find_as(InBucket, std::hash<std::string_view>(), eastl::equal_to_2<std::string, std::string_view>());
+ It != m_Buckets.end())
{
return It->second.get();
}
std::filesystem::path BucketPath = m_RootDir;
- BucketPath /= BucketName;
+ BucketPath /= InBucket;
try
{
if (!Bucket->OpenOrCreate(BucketPath))
{
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", InBucket, m_RootDir);
return nullptr;
}
}
catch (const std::exception& Err)
{
- ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
+ ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", InBucket, BucketPath, Err.what());
throw;
}
+ std::string BucketName{InBucket};
CacheBucket* Result = Bucket.get();
m_Buckets.emplace(BucketName, std::move(Bucket));
if (m_CapturedBuckets)
@@ -3833,7 +3952,7 @@ ZenCacheDiskLayer::EndPutBatch(PutBatchHandle* Batch) noexcept
struct ZenCacheDiskLayer::GetBatchHandle
{
- GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults) {}
+ GetBatchHandle(ZenCacheValueVec_t& OutResults) : OutResults(OutResults) {}
struct BucketHandle
{
CacheBucket* Bucket;
@@ -3893,13 +4012,13 @@ struct ZenCacheDiskLayer::GetBatchHandle
return NewBucketHandle;
}
- RwLock Lock;
- std::vector<BucketHandle> BucketHandles;
- std::vector<ZenCacheValue>& OutResults;
+ RwLock Lock;
+ eastl::fixed_vector<BucketHandle, 4> BucketHandles;
+ ZenCacheValueVec_t& OutResults;
};
ZenCacheDiskLayer::GetBatchHandle*
-ZenCacheDiskLayer::BeginGetBatch(std::vector<ZenCacheValue>& OutResults)
+ZenCacheDiskLayer::BeginGetBatch(ZenCacheValueVec_t& OutResults)
{
return new GetBatchHandle(OutResults);
}
@@ -3994,7 +4113,11 @@ ZenCacheDiskLayer::DiscoverBuckets()
if (IsKnownBadBucketName(BucketName))
{
BadBucketDirectories.push_back(BucketPath);
-
+ continue;
+ }
+ else if (BucketName.starts_with("[dropped]"))
+ {
+ BadBucketDirectories.push_back(BucketPath);
continue;
}
@@ -4027,50 +4150,66 @@ ZenCacheDiskLayer::DiscoverBuckets()
RwLock SyncLock;
WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst);
- Latch WorkLatch(1);
- for (auto& BucketPath : FoundBucketDirectories)
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
+ try
{
- WorkLatch.AddCount(1);
- Pool.ScheduleWork([this, &WorkLatch, &SyncLock, BucketPath]() {
- ZEN_MEMSCOPE(GetCacheDiskTag());
-
- auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
- const std::string BucketName = PathToUtf8(BucketPath.stem());
- try
- {
- std::unique_ptr<CacheBucket> NewBucket =
- std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig);
+ for (auto& BucketPath : FoundBucketDirectories)
+ {
+ Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) {
+ ZEN_MEMSCOPE(GetCacheDiskTag());
- CacheBucket* Bucket = nullptr;
+ const std::string BucketName = PathToUtf8(BucketPath.stem());
+ try
{
- RwLock::ExclusiveLockScope __(SyncLock);
- auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket));
- Bucket = InsertResult.first->second.get();
- }
- ZEN_ASSERT(Bucket);
+ BucketConfiguration* BucketConfig = &m_Configuration.BucketConfig;
+ if (auto It = m_Configuration.BucketConfigMap.find_as(std::string_view(BucketName),
+ std::hash<std::string_view>(),
+ eastl::equal_to_2<std::string, std::string_view>());
+ It != m_Configuration.BucketConfigMap.end())
+ {
+ BucketConfig = &It->second;
+ }
- if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false))
- {
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+ std::unique_ptr<CacheBucket> NewBucket =
+ std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, *BucketConfig);
+ CacheBucket* Bucket = nullptr;
{
RwLock::ExclusiveLockScope __(SyncLock);
- m_Buckets.erase(BucketName);
+ auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket));
+ Bucket = InsertResult.first->second.get();
+ }
+ ZEN_ASSERT(Bucket);
+
+ if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false))
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+
+ {
+ RwLock::ExclusiveLockScope __(SyncLock);
+ m_Buckets.erase(BucketName);
+ }
}
}
- }
- catch (const std::exception& Err)
- {
- ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
- return;
- }
- });
+ catch (const std::exception& Err)
+ {
+ ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
+ return;
+ }
+ });
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed discovering buckets in {}. Reason: '{}'", m_RootDir, Ex.what());
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
+ Work.Wait();
}
-bool
+std::function<void()>
ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
{
ZEN_TRACE_CPU("Z$::DropBucket");
@@ -4088,33 +4227,72 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
return Bucket.Drop();
}
- // Make sure we remove the folder even if we don't know about the bucket
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= std::string(InBucket);
- return cache::impl::MoveAndDeleteDirectory(BucketPath);
+ std::filesystem::path DroppedPath = cache::impl::MoveDroppedDirectory(BucketPath);
+ if (DroppedPath.empty())
+ {
+ return {};
+ }
+ else
+ {
+ return [DroppedPath = std::move(DroppedPath)]() {
+ std::error_code Ec;
+ (void)DeleteDirectories(DroppedPath, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to clean up dropped bucket directory '{}', reason: '{}'", DroppedPath, Ec.message());
+ }
+ };
+ }
}
-bool
+std::function<void()>
ZenCacheDiskLayer::Drop()
{
ZEN_TRACE_CPU("Z$::Drop");
- RwLock::ExclusiveLockScope _(m_Lock);
-
- std::vector<std::unique_ptr<CacheBucket>> Buckets;
- Buckets.reserve(m_Buckets.size());
- while (!m_Buckets.empty())
+ std::vector<std::function<void()>> PostDropOps;
{
- const auto& It = m_Buckets.begin();
- CacheBucket& Bucket = *It->second;
- m_DroppedBuckets.push_back(std::move(It->second));
- m_Buckets.erase(It->first);
- if (!Bucket.Drop())
+ RwLock::ExclusiveLockScope _(m_Lock);
+ PostDropOps.reserve(m_Buckets.size());
+ while (!m_Buckets.empty())
{
- return false;
+ const auto& It = m_Buckets.begin();
+ CacheBucket& Bucket = *It->second;
+ m_DroppedBuckets.push_back(std::move(It->second));
+ m_Buckets.erase(It->first);
+ if (std::function<void()> PostDropOp = Bucket.Drop(); !PostDropOp)
+ {
+ return {};
+ }
+ else
+ {
+ PostDropOps.emplace_back(std::move(PostDropOp));
+ }
}
}
- return cache::impl::MoveAndDeleteDirectory(m_RootDir);
+
+ std::filesystem::path DroppedPath = cache::impl::MoveDroppedDirectory(m_RootDir);
+ if (DroppedPath.empty())
+ {
+ return {};
+ }
+ else
+ {
+ return [DroppedPath = std::move(DroppedPath), PostDropOps = std::move(PostDropOps)]() {
+ for (auto& PostDropOp : PostDropOps)
+ {
+ PostDropOp();
+ }
+ std::error_code Ec;
+ (void)DeleteDirectories(DroppedPath, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to clean up dropped bucket directory '{}', reason: '{}'", DroppedPath, Ec.message());
+ }
+ };
+ }
}
void
@@ -4144,16 +4322,16 @@ ZenCacheDiskLayer::Flush()
}
{
WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst);
- Latch WorkLatch(1);
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
try
{
for (auto& Bucket : Buckets)
{
- WorkLatch.AddCount(1);
- Pool.ScheduleWork([&WorkLatch, Bucket]() {
+ Work.ScheduleWork(Pool, [Bucket](std::atomic<bool>&) {
ZEN_MEMSCOPE(GetCacheDiskTag());
- auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
try
{
Bucket->Flush();
@@ -4167,13 +4345,14 @@ ZenCacheDiskLayer::Flush()
}
catch (const std::exception& Ex)
{
+ AbortFlag.store(true);
ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what());
}
- WorkLatch.CountDown();
- while (!WorkLatch.Wait(1000))
- {
- ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir);
- }
+
+ Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t RemainingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused);
+ ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", RemainingWork, m_RootDir);
+ });
}
}
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 20c244250..436e8a083 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -20,6 +20,8 @@
#include <zencore/memory/llm.h>
+#include <EASTL/fixed_vector.h>
+
//////////////////////////////////////////////////////////////////////////
namespace zen {
@@ -89,7 +91,7 @@ GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key)
return false;
}
IoHash Hash = HashField.AsHash();
- Key = CacheKey::Create(*Bucket, Hash);
+ Key = CacheKey::CreateValidated(std::move(*Bucket), Hash);
return true;
}
@@ -218,6 +220,11 @@ CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context,
ZEN_WARN("Content format not supported, expected package message format");
return RpcResponseCode::BadRequest;
}
+ if (CbValidateError Error = ValidateCompactBinary(Object.GetView(), CbValidateMode::Default); Error != CbValidateError::None)
+ {
+ ZEN_WARN("Content format is corrupt, compact binary format validation failed. Reason: '{}'", ToString(Error));
+ return RpcResponseCode::BadRequest;
+ }
}
if (!UriNamespace.empty())
@@ -305,7 +312,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
}
DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::vector<bool> Results;
+ eastl::fixed_vector<bool, 32> Results;
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
for (CbFieldView RequestField : RequestsArray)
@@ -495,16 +502,15 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
bool Exists = false;
bool ReadFromUpstream = false;
};
- struct RecordRequestData
+ struct RecordRequestData : public CacheKeyRequest
{
- CacheKeyRequest Upstream;
- CbObjectView RecordObject;
- IoBuffer RecordCacheValue;
- CacheRecordPolicy DownstreamPolicy;
- std::vector<ValueRequestData> Values;
- bool Complete = false;
- const UpstreamEndpointInfo* Source = nullptr;
- uint64_t ElapsedTimeUs;
+ CbObjectView RecordObject;
+ IoBuffer RecordCacheValue;
+ CacheRecordPolicy DownstreamPolicy;
+ eastl::fixed_vector<ValueRequestData, 4> Values;
+ bool Complete = false;
+ const UpstreamEndpointInfo* Source = nullptr;
+ uint64_t ElapsedTimeUs;
};
std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
@@ -517,8 +523,8 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
const bool HasUpstream = m_UpstreamCache.IsActive();
- std::vector<RecordRequestData> Requests;
- std::vector<size_t> UpstreamIndexes;
+ eastl::fixed_vector<RecordRequestData, 16> Requests;
+ eastl::fixed_vector<size_t, 16> UpstreamIndexes;
auto ParseValues = [](RecordRequestData& Request) {
CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView();
@@ -549,7 +555,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
CbObjectView RequestObject = RequestField.AsObjectView();
CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
- CacheKey& Key = Request.Upstream.Key;
+ CacheKey& Key = Request.Key;
if (!GetRpcRequestCacheKey(KeyObject, Key))
{
return CbPackage{};
@@ -571,6 +577,13 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
{
FoundLocalInvalid = true;
}
+ else if (CbValidateError Error = ValidateCompactBinary(Request.RecordCacheValue.GetView(), CbValidateMode::Default);
+ Error != CbValidateError::None)
+ {
+ ZEN_WARN("HandleRpcGetCacheRecords stored record is corrupt, compact binary format validation failed. Reason: '{}'",
+ ToString(Error));
+ FoundLocalInvalid = true;
+ }
else
{
Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData());
@@ -654,7 +667,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
{
m_CidStore.IterateChunks(
CidHashes,
- [this, &Request, ValueCount, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool {
+ [this, &Request, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool {
try
{
const size_t ValueIndex = RequestValueIndexes[Index];
@@ -721,7 +734,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
for (size_t Index : UpstreamIndexes)
{
RecordRequestData& Request = Requests[Index];
- UpstreamRequests.push_back(&Request.Upstream);
+ UpstreamRequests.push_back(&Request);
if (Request.Values.size())
{
@@ -735,13 +748,13 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None;
Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy);
}
- Request.Upstream.Policy = Builder.Build();
+ Request.Policy = Builder.Build();
}
else
{
// We don't know which Values exist in the Record; ask the upstrem for all values that the client wants,
// and convert the CacheRecordPolicy to an upstream policy
- Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream();
+ Request.Policy = Request.DownstreamPolicy.ConvertToUpstream();
}
}
@@ -751,10 +764,9 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
return;
}
- RecordRequestData& Request =
- *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream));
+ RecordRequestData& Request = *static_cast<RecordRequestData*>(&Params.Request);
Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- const CacheKey& Key = Request.Upstream.Key;
+ const CacheKey& Key = Request.Key;
Stopwatch Timer;
auto TimeGuard = MakeGuard([&Timer, &Request]() { Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); });
if (!Request.RecordObject)
@@ -852,10 +864,12 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
CbPackage ResponsePackage;
CbObjectWriter ResponseObject{2048};
+ ResponsePackage.ReserveAttachments(Requests.size());
+
ResponseObject.BeginArray("Result"sv);
for (RecordRequestData& Request : Requests)
{
- const CacheKey& Key = Request.Upstream.Key;
+ const CacheKey& Key = Request.Key;
if (Request.Complete ||
(Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)))
{
@@ -930,11 +944,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
const bool HasUpstream = m_UpstreamCache.IsActive();
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- std::vector<ZenCacheStore::PutResult> BatchResults;
- std::vector<size_t> BatchResultIndexes;
- std::vector<ZenCacheStore::PutResult> Results;
- std::vector<CacheKey> UpstreamCacheKeys;
- uint64_t RequestCount = RequestsArray.Num();
+ std::vector<ZenCacheStore::PutResult> BatchResults;
+ eastl::fixed_vector<size_t, 32> BatchResultIndexes;
+ eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results;
+ eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys;
+
+ uint64_t RequestCount = RequestsArray.Num();
{
Results.reserve(RequestCount);
std::unique_ptr<ZenCacheStore::PutBatch> Batch;
@@ -1145,15 +1160,15 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
uint64_t RawSize = 0;
CompressedBuffer Result;
};
- std::vector<RequestData> Requests;
+ eastl::fixed_vector<RequestData, 16> Requests;
- std::vector<size_t> RemoteRequestIndexes;
+ eastl::fixed_vector<size_t, 16> RemoteRequestIndexes;
const bool HasUpstream = m_UpstreamCache.IsActive();
- CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- std::vector<ZenCacheValue> CacheValues;
- const uint64_t RequestCount = RequestsArray.Num();
+ CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+ ZenCacheValueVec_t CacheValues;
+ const uint64_t RequestCount = RequestsArray.Num();
CacheValues.reserve(RequestCount);
{
std::unique_ptr<ZenCacheStore::GetBatch> Batch;
@@ -1182,7 +1197,6 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
CacheKey& Key = Request.Key;
CachePolicy Policy = Request.Policy;
- ZenCacheValue CacheValue;
if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
{
if (Batch)
@@ -1328,6 +1342,9 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Response");
CbPackage RpcResponse;
CbObjectWriter ResponseObject{1024};
+
+ RpcResponse.ReserveAttachments(Requests.size());
+
ResponseObject.BeginArray("Result"sv);
for (const RequestData& Request : Requests)
{
@@ -1622,18 +1639,27 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
Record.ValuesRead = true;
if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject)
{
- CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData());
- CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
- Record.Values.reserve(ValuesArray.Num());
- for (CbFieldView ValueField : ValuesArray)
+ if (CbValidateError Error = ValidateCompactBinary(Record.CacheValue.GetView(), CbValidateMode::Default);
+ Error != CbValidateError::None)
{
- CbObjectView ValueObject = ValueField.AsObjectView();
- Oid ValueId = ValueObject["Id"sv].AsObjectId();
- CbFieldView RawHashField = ValueObject["RawHash"sv];
- IoHash RawHash = RawHashField.AsBinaryAttachment();
- if (ValueId && !RawHashField.HasError())
+ ZEN_WARN("GetLocalCacheRecords stored record for is corrupt, compact binary format validation failed. Reason: '{}'",
+ ToString(Error));
+ }
+ else
+ {
+ CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData());
+ CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
+ Record.Values.reserve(ValuesArray.Num());
+ for (CbFieldView ValueField : ValuesArray)
{
- Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()});
+ CbObjectView ValueObject = ValueField.AsObjectView();
+ Oid ValueId = ValueObject["Id"sv].AsObjectId();
+ CbFieldView RawHashField = ValueObject["RawHash"sv];
+ IoHash RawHash = RawHashField.AsBinaryAttachment();
+ if (ValueId && !RawHashField.HasError())
+ {
+ Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()});
+ }
}
}
}
@@ -1706,7 +1732,7 @@ CacheRpcHandler::GetLocalCacheValues(const CacheRequestContext& Context,
using namespace cache::detail;
const bool HasUpstream = m_UpstreamCache.IsActive();
- std::vector<ZenCacheValue> Chunks;
+ ZenCacheValueVec_t Chunks;
Chunks.reserve(ValueRequests.size());
{
std::unique_ptr<ZenCacheStore::GetBatch> Batch;
@@ -1866,6 +1892,8 @@ CacheRpcHandler::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequest
CbPackage RpcResponse;
CbObjectWriter Writer{1024};
+ RpcResponse.ReserveAttachments(Requests.size());
+
Writer.BeginArray("Result"sv);
for (ChunkRequest& Request : Requests)
{
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index a3f80099f..973af52b2 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -178,13 +178,13 @@ ZenCacheNamespace::EndPutBatch(PutBatchHandle* Batch) noexcept
struct ZenCacheNamespace::GetBatchHandle
{
- GetBatchHandle(std::vector<ZenCacheValue>& OutResult) : Results(OutResult) {}
- std::vector<ZenCacheValue>& Results;
+ GetBatchHandle(ZenCacheValueVec_t& OutResult) : Results(OutResult) {}
+ ZenCacheValueVec_t& Results;
ZenCacheDiskLayer::GetBatchHandle* DiskLayerHandle = nullptr;
};
ZenCacheNamespace::GetBatchHandle*
-ZenCacheNamespace::BeginGetBatch(std::vector<ZenCacheValue>& OutResult)
+ZenCacheNamespace::BeginGetBatch(ZenCacheValueVec_t& OutResult)
{
ZenCacheNamespace::GetBatchHandle* Handle = new ZenCacheNamespace::GetBatchHandle(OutResult);
Handle->DiskLayerHandle = m_DiskLayer.BeginGetBatch(OutResult);
@@ -282,11 +282,14 @@ ZenCacheNamespace::DropBucket(std::string_view Bucket)
{
ZEN_INFO("dropping bucket '{}'", Bucket);
- const bool Dropped = m_DiskLayer.DropBucket(Bucket);
-
- ZEN_INFO("bucket '{}' was {}", Bucket, Dropped ? "dropped" : "not found");
-
- return Dropped;
+ std::function<void()> PostDropOp = m_DiskLayer.DropBucket(Bucket);
+ if (!PostDropOp)
+ {
+ ZEN_INFO("bucket '{}' was not found in {}", Bucket, m_RootDir);
+ return false;
+ }
+ PostDropOp();
+ return true;
}
void
@@ -296,9 +299,10 @@ ZenCacheNamespace::EnumerateBucketContents(std::string_view
m_DiskLayer.EnumerateBucketContents(Bucket, Fn);
}
-bool
+std::function<void()>
ZenCacheNamespace::Drop()
{
+ m_Gc.RemoveGcStorage(this);
return m_DiskLayer.Drop();
}
@@ -585,7 +589,7 @@ ZenCacheStore::PutBatch::~PutBatch()
}
}
-ZenCacheStore::GetBatch::GetBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<ZenCacheValue>& OutResult)
+ZenCacheStore::GetBatch::GetBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, ZenCacheValueVec_t& OutResult)
: m_CacheStore(CacheStore)
, Results(OutResult)
{
@@ -800,16 +804,27 @@ ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket)
bool
ZenCacheStore::DropNamespace(std::string_view InNamespace)
{
- RwLock::SharedLockScope _(m_NamespacesLock);
- if (auto It = m_Namespaces.find(std::string(InNamespace)); It != m_Namespaces.end())
+ std::function<void()> PostDropOp;
+ {
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(InNamespace)); It != m_Namespaces.end())
+ {
+ ZenCacheNamespace& Namespace = *It->second;
+ m_DroppedNamespaces.push_back(std::move(It->second));
+ m_Namespaces.erase(It);
+ PostDropOp = Namespace.Drop();
+ }
+ else
+ {
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropNamespace", InNamespace);
+ return false;
+ }
+ }
+ if (PostDropOp)
{
- ZenCacheNamespace& Namespace = *It->second;
- m_DroppedNamespaces.push_back(std::move(It->second));
- m_Namespaces.erase(It);
- return Namespace.Drop();
+ PostDropOp();
}
- ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropNamespace", InNamespace);
- return false;
+ return true;
}
void