aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-06-09 09:03:39 +0200
committerGitHub Enterprise <[email protected]>2025-06-09 09:03:39 +0200
commit6f2d68d2c11011d541259d0037908dd76eadeb8a (patch)
tree4fa165343dd42544dded51fad0e13ebae44dd442 /src/zenstore/compactcas.cpp
parent5.6.10-pre0 (diff)
downloadzen-6f2d68d2c11011d541259d0037908dd76eadeb8a.tar.xz
zen-6f2d68d2c11011d541259d0037908dd76eadeb8a.zip
missing chunks bugfix (#424)
* make sure to close log file when resetting log * drop entries that refers to missing blocks * Don't scrub keys that has been rewritten * currectly count added bytes / m_TotalSize * fix negative sleep time in BlockStoreFile::Open() * be defensive when fetching log position * append to log files *after* we updated all state successfully * explicitly close stuff in destructors with exception catching * clean up empty size block store files
Diffstat (limited to 'src/zenstore/compactcas.cpp')
-rw-r--r--src/zenstore/compactcas.cpp306
1 files changed, 290 insertions, 16 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 0c9302ec8..2ab5752ff 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -145,6 +145,16 @@ CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get("
CasContainerStrategy::~CasContainerStrategy()
{
+ try
+ {
+ m_BlockStore.Close();
+ m_CasLog.Flush();
+ m_CasLog.Close();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~CasContainerStrategy failed with: ", Ex.what());
+ }
m_Gc.RemoveGcReferenceStore(*this);
m_Gc.RemoveGcStorage(this);
}
@@ -204,12 +214,12 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
ZEN_TRACE_CPU("CasContainer::UpdateLocation");
BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
- m_CasLog.Append(IndexEntry);
{
RwLock::ExclusiveLockScope _(m_LocationMapLock);
m_LocationMap.emplace(ChunkHash, m_Locations.size());
m_Locations.push_back(DiskLocation);
}
+ m_CasLog.Append(IndexEntry);
});
return CasStore::InsertResult{.New = true};
@@ -273,7 +283,6 @@ CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<c
IndexEntries.emplace_back(
CasDiskIndexEntry{.Key = ChunkHashes[ChunkIndex], .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)});
}
- m_CasLog.Append(IndexEntries);
{
RwLock::ExclusiveLockScope _(m_LocationMapLock);
for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries)
@@ -282,6 +291,7 @@ CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<c
m_Locations.push_back(DiskIndexEntry.Location);
}
}
+ m_CasLog.Append(IndexEntries);
});
return Result;
}
@@ -746,19 +756,27 @@ public:
MovedEntries.push_back(CasDiskIndexEntry{.Key = Key, .Location = Location});
}
}
- for (size_t ScrubbedIndex : ScrubbedArray)
+ for (size_t ChunkIndex : ScrubbedArray)
{
- const IoHash& Key = BlockCompactStateKeys[ScrubbedIndex];
+ const IoHash& Key = BlockCompactStateKeys[ChunkIndex];
if (auto It = m_CasContainerStrategy.m_LocationMap.find(Key);
It != m_CasContainerStrategy.m_LocationMap.end())
{
- BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second];
+ BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second];
+ const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex);
+ if (Location.Get(m_CasContainerStrategy.m_PayloadAlignment) != OldLocation)
+ {
+ // Someone has moved our chunk so lets just skip the new location we were provided, it will be
+ // GC:d at a later time
+ continue;
+ }
MovedEntries.push_back(
CasDiskIndexEntry{.Key = Key, .Location = Location, .Flags = CasDiskIndexEntry::kTombstone});
m_CasContainerStrategy.m_LocationMap.erase(It);
}
}
m_CasContainerStrategy.m_CasLog.Append(MovedEntries);
+ m_CasContainerStrategy.m_CasLog.Flush();
Stats.RemovedDisk += FreedDiskSpace;
if (Ctx.IsCancelledFlag.load())
{
@@ -984,14 +1002,11 @@ CasContainerStrategy::MakeIndexSnapshot(bool ResetLog)
{
// Write the current state of the location map to a new index state
std::vector<CasDiskIndexEntry> Entries;
- uint64_t IndexLogPosition = 0;
+ // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock
+ const uint64_t IndexLogPosition = ResetLog ? 0 : m_CasLog.GetLogCount();
{
RwLock::SharedLockScope ___(m_LocationMapLock);
- if (!ResetLog)
- {
- IndexLogPosition = m_CasLog.GetLogCount();
- }
Entries.resize(m_LocationMap.size());
uint64_t EntryIndex = 0;
@@ -1036,12 +1051,14 @@ CasContainerStrategy::MakeIndexSnapshot(bool ResetLog)
if (IsFile(LogPath))
{
+ m_CasLog.Close();
if (!RemoveFile(LogPath, Ec) || Ec)
{
// This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in
// the end it will be the same result
ZEN_WARN("Snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message());
}
+ m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
}
}
m_LogFlushPosition = IndexLogPosition;
@@ -1154,7 +1171,7 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski
ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
SkipEntryCount = 0;
}
- LogEntryCount = EntryCount - SkipEntryCount;
+ LogEntryCount = SkipEntryCount;
CasLog.Replay(
[&](const CasDiskIndexEntry& Record) {
LogEntryCount++;
@@ -1173,7 +1190,6 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski
m_Locations.push_back(Record.Location);
},
SkipEntryCount);
-
return LogEntryCount;
}
return 0;
@@ -1229,8 +1245,6 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
}
}
- m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
-
BlockStore::BlockIndexSet KnownBlocks;
for (const auto& Entry : m_LocationMap)
@@ -1240,9 +1254,39 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
KnownBlocks.insert(BlockIndex);
}
- m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+ BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+
+ m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ bool RemovedEntries = false;
+ if (!MissingBlocks.empty())
+ {
+ std::vector<CasDiskIndexEntry> MissingEntries;
+ for (auto& It : m_LocationMap)
+ {
+ const uint32_t BlockIndex = m_Locations[It.second].GetBlockIndex();
+ if (MissingBlocks.contains(BlockIndex))
+ {
+ MissingEntries.push_back({.Key = It.first, .Location = m_Locations[It.second], .Flags = CasDiskIndexEntry::kTombstone});
+ }
+ }
+ ZEN_ASSERT(!MissingEntries.empty());
+
+ for (const CasDiskIndexEntry& Entry : MissingEntries)
+ {
+ m_LocationMap.erase(Entry.Key);
+ }
+ m_CasLog.Append(MissingEntries);
+ m_CasLog.Flush();
+
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_LocationMapLock);
+ CompactIndex(IndexLock);
+ }
+ RemovedEntries = true;
+ }
- if (IsNewStore || (LogEntryCount > 0))
+ if (IsNewStore || (LogEntryCount > 0) || RemovedEntries)
{
MakeIndexSnapshot(/*ResetLog*/ true);
}
@@ -1612,6 +1656,236 @@ TEST_CASE("compactcas.threadedinsert")
}
}
+TEST_CASE("compactcas.restart")
+{
+ uint64_t ExpectedSize = 0;
+
+ auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) {
+ WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put");
+
+ Latch WorkLatch(1);
+ tsl::robin_set<IoHash, IoHash::Hasher> ChunkHashesLookup;
+ ChunkHashesLookup.reserve(ChunkCount);
+ RwLock InsertLock;
+ for (size_t Offset = 0; Offset < ChunkCount;)
+ {
+ size_t BatchCount = Min<size_t>(ChunkCount - Offset, 512u);
+ WorkLatch.AddCount(1);
+ ThreadPool.ScheduleWork(
+ [&WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount, ChunkSize]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+
+ std::vector<IoBuffer> BatchBlobs;
+ std::vector<IoHash> BatchHashes;
+ BatchBlobs.reserve(BatchCount);
+ BatchHashes.reserve(BatchCount);
+
+ while (BatchBlobs.size() < BatchCount)
+ {
+ IoBuffer Chunk =
+ CreateSemiRandomBlob(ChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377));
+ IoHash Hash = IoHash::HashBuffer(Chunk);
+ {
+ RwLock::ExclusiveLockScope __(InsertLock);
+ if (ChunkHashesLookup.contains(Hash))
+ {
+ continue;
+ }
+ ChunkHashesLookup.insert(Hash);
+ ExpectedSize += Chunk.Size();
+ }
+
+ BatchBlobs.emplace_back(CompressedBuffer::Compress(SharedBuffer(Chunk)).GetCompressed().Flatten().AsIoBuffer());
+ BatchHashes.push_back(Hash);
+ }
+
+ Cas.InsertChunks(BatchBlobs, BatchHashes);
+ {
+ RwLock::ExclusiveLockScope __(InsertLock);
+ Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end());
+ }
+ });
+ Offset += BatchCount;
+ }
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+ };
+
+ ScopedTemporaryDirectory TempDir;
+ std::filesystem::path CasPath = TempDir.Path();
+ CreateDirectories(CasPath);
+
+ bool Generate = false;
+ if (!Generate)
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ }
+
+ const uint64_t kChunkSize = 1048 + 395;
+ const size_t kChunkCount = 7167;
+
+ std::vector<IoHash> Hashes;
+ Hashes.reserve(kChunkCount);
+
+ auto ValidateChunks = [&](CasContainerStrategy& Cas, std::span<const IoHash> Hashes, bool ShouldExist) {
+ for (const IoHash& Hash : Hashes)
+ {
+ if (ShouldExist)
+ {
+ CHECK(Cas.HaveChunk(Hash));
+ IoBuffer Buffer = Cas.FindChunk(Hash);
+ CHECK(Buffer);
+ IoHash ValidateHash;
+ uint64_t ValidateRawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize);
+ CHECK(Compressed);
+ CHECK(ValidateHash == Hash);
+ }
+ else
+ {
+ CHECK(!Cas.HaveChunk(Hash));
+ IoBuffer Buffer = Cas.FindChunk(Hash);
+ CHECK(!Buffer);
+ }
+ }
+ };
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, true);
+ GenerateChunks(Cas, kChunkCount, kChunkSize, Hashes);
+ ValidateChunks(Cas, Hashes, true);
+ Cas.Flush();
+ ValidateChunks(Cas, Hashes, true);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ ValidateChunks(Cas, Hashes, true);
+ GenerateChunks(Cas, kChunkCount, kChunkSize / 4, Hashes);
+ ValidateChunks(Cas, Hashes, true);
+ }
+
+ class GcRefChecker : public GcReferenceChecker
+ {
+ public:
+ explicit GcRefChecker(std::vector<IoHash>&& HashesToKeep) : m_HashesToKeep(std::move(HashesToKeep)) {}
+ ~GcRefChecker() {}
+ std::string GetGcName(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return "test";
+ }
+ void PreCache(GcCtx& Ctx) override { FilterReferences(Ctx, "test", m_HashesToKeep); }
+ void UpdateLockedState(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); }
+ std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override
+ {
+ ZEN_UNUSED(Ctx);
+ return KeepUnusedReferences(m_HashesToKeep, IoCids);
+ }
+
+ private:
+ std::vector<IoHash> m_HashesToKeep;
+ };
+
+ class GcRef : public GcReferencer
+ {
+ public:
+ GcRef(GcManager& Gc, std::span<const IoHash> HashesToKeep) : m_Gc(Gc)
+ {
+ m_HashesToKeep.insert(m_HashesToKeep.begin(), HashesToKeep.begin(), HashesToKeep.end());
+ m_Gc.AddGcReferencer(*this);
+ }
+ ~GcRef() { m_Gc.RemoveGcReferencer(*this); }
+ std::string GetGcName(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return "test";
+ }
+ GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override
+ {
+ ZEN_UNUSED(Ctx, Stats);
+ return nullptr;
+ }
+ std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return {new GcRefChecker(std::move(m_HashesToKeep))};
+ }
+ std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return {};
+ }
+
+ private:
+ GcManager& m_Gc;
+ std::vector<IoHash> m_HashesToKeep;
+ };
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ GenerateChunks(Cas, kChunkCount, kChunkSize / 5, Hashes);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ ValidateChunks(Cas, Hashes, true);
+ GenerateChunks(Cas, kChunkCount, kChunkSize / 2, Hashes);
+ ValidateChunks(Cas, Hashes, true);
+ if (true)
+ {
+ std::vector<IoHash> DropHashes;
+ std::vector<IoHash> KeepHashes;
+ for (size_t Index = 0; Index < Hashes.size(); Index++)
+ {
+ if (Index % 5 == 0)
+ {
+ KeepHashes.push_back(Hashes[Index]);
+ }
+ else
+ {
+ DropHashes.push_back(Hashes[Index]);
+ }
+ }
+ // std::span<const IoHash> KeepHashes(Hashes);
+ // ZEN_ASSERT(ExpectedGcCount < Hashes.size());
+ // KeepHashes = KeepHashes.subspan(ExpectedGcCount);
+ GcRef Ref(Gc, KeepHashes);
+ Gc.CollectGarbage(GcSettings{.CollectSmallObjects = true, .IsDeleteMode = true});
+ ValidateChunks(Cas, KeepHashes, true);
+ ValidateChunks(Cas, DropHashes, false);
+ Hashes = KeepHashes;
+ }
+ GenerateChunks(Cas, kChunkCount, kChunkSize / 3, Hashes);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ ValidateChunks(Cas, Hashes, true);
+ Cas.Flush();
+ ValidateChunks(Cas, Hashes, true);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ ValidateChunks(Cas, Hashes, true);
+ }
+}
+
TEST_CASE("compactcas.iteratechunks")
{
std::atomic<size_t> WorkCompleted = 0;