diff options
| author | Dan Engelbrecht <[email protected]> | 2025-06-09 09:03:39 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-06-09 09:03:39 +0200 |
| commit | 6f2d68d2c11011d541259d0037908dd76eadeb8a (patch) | |
| tree | 4fa165343dd42544dded51fad0e13ebae44dd442 /src/zenstore/cache/cachedisklayer.cpp | |
| parent | 5.6.10-pre0 (diff) | |
| download | zen-6f2d68d2c11011d541259d0037908dd76eadeb8a.tar.xz zen-6f2d68d2c11011d541259d0037908dd76eadeb8a.zip | |
missing chunks bugfix (#424)
* make sure to close log file when resetting log
* drop entries that refers to missing blocks
* Don't scrub keys that has been rewritten
* currectly count added bytes / m_TotalSize
* fix negative sleep time in BlockStoreFile::Open()
* be defensive when fetching log position
* append to log files *after* we updated all state successfully
* explicitly close stuff in destructors with exception catching
* clean up empty size block store files
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 84 |
1 files changed, 70 insertions, 14 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 3f1f0e34a..0ee70890c 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -751,6 +751,16 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, ZenCacheDiskLayer::CacheBucket::~CacheBucket() { + try + { + m_SlogFile.Flush(); + m_SlogFile.Close(); + m_BlockStore.Close(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~CacheBucket() failed with: ", Ex.what()); + } m_Gc.RemoveGcReferencer(*this); } @@ -824,11 +834,13 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo } void -ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const std::function<uint64_t()>& ClaimDiskReserveFunc) +ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(uint64_t LogPosition, + bool ResetLog, + const std::function<uint64_t()>& ClaimDiskReserveFunc) { ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot"); - if (m_LogFlushPosition == m_SlogFile.GetLogCount()) + if (m_LogFlushPosition == LogPosition) { return; } @@ -877,7 +889,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const st throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir)); } - const uint64_t IndexLogPosition = ResetLog ? 0 : m_SlogFile.GetLogCount(); + const uint64_t IndexLogPosition = ResetLog ? 0 : LogPosition; cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount, .LogPosition = IndexLogPosition, @@ -930,12 +942,14 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const st if (IsFile(LogPath)) { + m_SlogFile.Close(); if (!RemoveFile(LogPath, Ec) || Ec) { // This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in // the end it will be the same result ZEN_WARN("snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message()); } + m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); } } m_LogFlushPosition = IndexLogPosition; @@ -1149,13 +1163,6 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco } } - if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0) - { - WriteIndexSnapshot(IndexLock, /*Flush log*/ true); - } - - m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); - BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_Index) { @@ -1173,7 +1180,53 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco KnownBlocks.insert(BlockIndex); } } - m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); + + bool RemovedEntries = false; + if (!MissingBlocks.empty()) + { + std::vector<DiskIndexEntry> MissingEntries; + + for (auto& It : m_Index) + { + BucketPayload& Payload = m_Payloads[It.second]; + DiskLocation Location = Payload.Location; + if (!Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + if (MissingBlocks.contains(Location.Location.BlockLocation.GetBlockIndex())) + { + RemoveMemCachedData(IndexLock, Payload); + RemoveMetaData(IndexLock, Payload); + } + } + Location.Flags |= DiskLocation::kTombStone; + MissingEntries.push_back(DiskIndexEntry{.Key = It.first, .Location = Location}); + } + + ZEN_ASSERT(!MissingEntries.empty()); + + for (const DiskIndexEntry& Entry : MissingEntries) + { + m_Index.erase(Entry.Key); + } + m_SlogFile.Append(MissingEntries); + m_SlogFile.Flush(); + { + std::vector<BucketPayload> Payloads; + std::vector<AccessTime> AccessTimes; + std::vector<BucketMetaData> MetaDatas; + std::vector<MemCacheData> MemCachedPayloads; + IndexMap Index; + CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index); + } + RemovedEntries = true; + } + + if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0 || RemovedEntries) + { + WriteIndexSnapshot(IndexLock, m_SlogFile.GetLogCount(), /*Flush log*/ true); + } } void @@ -2024,6 +2077,9 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot"); try { + // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock + const uint64_t LogPosition = m_SlogFile.GetLogCount(); + bool UseLegacyScheme = false; IoBuffer Buffer; @@ -2038,7 +2094,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl { RwLock::SharedLockScope IndexLock(m_IndexLock); - WriteIndexSnapshot(IndexLock, /*Flush log*/ false); + WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false); // Note: this copy could be eliminated on shutdown to // reduce memory usage and execution time Index = m_Index; @@ -2078,7 +2134,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl else { RwLock::SharedLockScope IndexLock(m_IndexLock); - WriteIndexSnapshot(IndexLock, /*Flush log*/ false); + WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false); const uint64_t EntryCount = m_Index.size(); Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount); uint64_t SidecarSize = ManifestWriter.GetSidecarSize(); @@ -2727,7 +2783,6 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, ZEN_MEMSCOPE(GetCacheDiskTag()); ZEN_TRACE_CPU("Z$::Bucket::UpdateLocation"); DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags); - m_SlogFile.Append({.Key = HashKey, .Location = Location}); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); if (m_TrackedCacheKeys) @@ -2757,6 +2812,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(HashKey, EntryIndex); } + m_SlogFile.Append({.Key = HashKey, .Location = Location}); }); } |