aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-06-09 09:03:39 +0200
committerGitHub Enterprise <[email protected]>2025-06-09 09:03:39 +0200
commit6f2d68d2c11011d541259d0037908dd76eadeb8a (patch)
tree4fa165343dd42544dded51fad0e13ebae44dd442 /src/zenstore/cache/cachedisklayer.cpp
parent5.6.10-pre0 (diff)
downloadzen-6f2d68d2c11011d541259d0037908dd76eadeb8a.tar.xz
zen-6f2d68d2c11011d541259d0037908dd76eadeb8a.zip
missing chunks bugfix (#424)
* make sure to close log file when resetting log * drop entries that refers to missing blocks * Don't scrub keys that has been rewritten * currectly count added bytes / m_TotalSize * fix negative sleep time in BlockStoreFile::Open() * be defensive when fetching log position * append to log files *after* we updated all state successfully * explicitly close stuff in destructors with exception catching * clean up empty size block store files
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp84
1 files changed, 70 insertions, 14 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 3f1f0e34a..0ee70890c 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -751,6 +751,16 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
ZenCacheDiskLayer::CacheBucket::~CacheBucket()
{
+ try
+ {
+ m_SlogFile.Flush();
+ m_SlogFile.Close();
+ m_BlockStore.Close();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~CacheBucket() failed with: ", Ex.what());
+ }
m_Gc.RemoveGcReferencer(*this);
}
@@ -824,11 +834,13 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
}
void
-ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const std::function<uint64_t()>& ClaimDiskReserveFunc)
+ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(uint64_t LogPosition,
+ bool ResetLog,
+ const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot");
- if (m_LogFlushPosition == m_SlogFile.GetLogCount())
+ if (m_LogFlushPosition == LogPosition)
{
return;
}
@@ -877,7 +889,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const st
throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir));
}
- const uint64_t IndexLogPosition = ResetLog ? 0 : m_SlogFile.GetLogCount();
+ const uint64_t IndexLogPosition = ResetLog ? 0 : LogPosition;
cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount,
.LogPosition = IndexLogPosition,
@@ -930,12 +942,14 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const st
if (IsFile(LogPath))
{
+ m_SlogFile.Close();
if (!RemoveFile(LogPath, Ec) || Ec)
{
// This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in
// the end it will be the same result
ZEN_WARN("snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message());
}
+ m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
}
}
m_LogFlushPosition = IndexLogPosition;
@@ -1149,13 +1163,6 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco
}
}
- if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0)
- {
- WriteIndexSnapshot(IndexLock, /*Flush log*/ true);
- }
-
- m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
-
BlockStore::BlockIndexSet KnownBlocks;
for (const auto& Entry : m_Index)
{
@@ -1173,7 +1180,53 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco
KnownBlocks.insert(BlockIndex);
}
}
- m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+ BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+ m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ bool RemovedEntries = false;
+ if (!MissingBlocks.empty())
+ {
+ std::vector<DiskIndexEntry> MissingEntries;
+
+ for (auto& It : m_Index)
+ {
+ BucketPayload& Payload = m_Payloads[It.second];
+ DiskLocation Location = Payload.Location;
+ if (!Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ if (MissingBlocks.contains(Location.Location.BlockLocation.GetBlockIndex()))
+ {
+ RemoveMemCachedData(IndexLock, Payload);
+ RemoveMetaData(IndexLock, Payload);
+ }
+ }
+ Location.Flags |= DiskLocation::kTombStone;
+ MissingEntries.push_back(DiskIndexEntry{.Key = It.first, .Location = Location});
+ }
+
+ ZEN_ASSERT(!MissingEntries.empty());
+
+ for (const DiskIndexEntry& Entry : MissingEntries)
+ {
+ m_Index.erase(Entry.Key);
+ }
+ m_SlogFile.Append(MissingEntries);
+ m_SlogFile.Flush();
+ {
+ std::vector<BucketPayload> Payloads;
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketMetaData> MetaDatas;
+ std::vector<MemCacheData> MemCachedPayloads;
+ IndexMap Index;
+ CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index);
+ }
+ RemovedEntries = true;
+ }
+
+ if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0 || RemovedEntries)
+ {
+ WriteIndexSnapshot(IndexLock, m_SlogFile.GetLogCount(), /*Flush log*/ true);
+ }
}
void
@@ -2024,6 +2077,9 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot");
try
{
+ // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock
+ const uint64_t LogPosition = m_SlogFile.GetLogCount();
+
bool UseLegacyScheme = false;
IoBuffer Buffer;
@@ -2038,7 +2094,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
{
RwLock::SharedLockScope IndexLock(m_IndexLock);
- WriteIndexSnapshot(IndexLock, /*Flush log*/ false);
+ WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false);
// Note: this copy could be eliminated on shutdown to
// reduce memory usage and execution time
Index = m_Index;
@@ -2078,7 +2134,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
else
{
RwLock::SharedLockScope IndexLock(m_IndexLock);
- WriteIndexSnapshot(IndexLock, /*Flush log*/ false);
+ WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false);
const uint64_t EntryCount = m_Index.size();
Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount);
uint64_t SidecarSize = ManifestWriter.GetSidecarSize();
@@ -2727,7 +2783,6 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
ZEN_MEMSCOPE(GetCacheDiskTag());
ZEN_TRACE_CPU("Z$::Bucket::UpdateLocation");
DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags);
- m_SlogFile.Append({.Key = HashKey, .Location = Location});
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
if (m_TrackedCacheKeys)
@@ -2757,6 +2812,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
m_AccessTimes.emplace_back(GcClock::TickCount());
m_Index.insert_or_assign(HashKey, EntryIndex);
}
+ m_SlogFile.Append({.Key = HashKey, .Location = Location});
});
}