aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-06-09 09:03:39 +0200
committerGitHub Enterprise <[email protected]>2025-06-09 09:03:39 +0200
commit6f2d68d2c11011d541259d0037908dd76eadeb8a (patch)
tree4fa165343dd42544dded51fad0e13ebae44dd442
parent5.6.10-pre0 (diff)
downloadzen-6f2d68d2c11011d541259d0037908dd76eadeb8a.tar.xz
zen-6f2d68d2c11011d541259d0037908dd76eadeb8a.zip
missing chunks bugfix (#424)
* make sure to close log file when resetting log * drop entries that refers to missing blocks * Don't scrub keys that has been rewritten * currectly count added bytes / m_TotalSize * fix negative sleep time in BlockStoreFile::Open() * be defensive when fetching log position * append to log files *after* we updated all state successfully * explicitly close stuff in destructors with exception catching * clean up empty size block store files
-rw-r--r--CHANGELOG.md1
-rw-r--r--src/zenserver/projectstore/projectstore.cpp2
-rw-r--r--src/zenstore/blockstore.cpp53
-rw-r--r--src/zenstore/buildstore/buildstore.cpp98
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp84
-rw-r--r--src/zenstore/compactcas.cpp306
-rw-r--r--src/zenstore/filecas.cpp9
-rw-r--r--src/zenstore/include/zenstore/blockstore.h6
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h7
9 files changed, 481 insertions, 85 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9f90dfc19..b35eb650d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -13,6 +13,7 @@
- Bugfix: Zen CLI now initializes Sentry after command line options parsing, so that the options can be propetly taken into account during init
- Bugfix: Revert: `zen builds upload` now use the system temp directory for temporary files leaving the source folder untouched
- Bugfix: Use selected subcommand when displaying help for failed command line options in zen builds
+- Bugfix: Make sure we recreate the log file in CAS/cache bucket when creating snapshot at startup causing lost changes. UE-291196
## 5.6.9
- Bugfix: Remove long running exclusive namespace wide locks when dropping buckets or namespaces
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index a2e73380f..6359b9db9 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1010,8 +1010,8 @@ struct ProjectStore::OplogStorage : public RefCounted
.OpCoreHash = OpData.OpCoreHash,
.OpKeyHash = OpData.KeyHash};
- m_Oplog.Append(Entry);
m_OpBlobs.Write(OpData.Buffer.GetData(), WriteSize, WriteOffset);
+ m_Oplog.Append(Entry);
return Entry;
}
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 5081ae65d..7b56c64bd 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -70,7 +70,7 @@ BlockStoreFile::Open()
return false;
}
ZEN_WARN("Failed to open cas block '{}', reason: '{}', retries left: {}.", m_Path, Ec.message(), RetriesLeft);
- Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms
+ Sleep(100 + (3 - RetriesLeft) * 100); // Total 600 ms
RetriesLeft--;
return true;
});
@@ -286,6 +286,14 @@ BlockStore::BlockStore()
BlockStore::~BlockStore()
{
+ try
+ {
+ Close();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~BlockStore() failed with: ", Ex.what());
+ }
}
void
@@ -307,6 +315,7 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max
if (IsDir(m_BlocksBasePath))
{
+ std::vector<std::filesystem::path> EmptyBlockFiles;
uint32_t NextBlockIndex = 0;
std::vector<std::filesystem::path> FoldersToScan;
FoldersToScan.push_back(m_BlocksBasePath);
@@ -334,6 +343,12 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max
{
continue;
}
+ if (Entry.file_size() == 0)
+ {
+ EmptyBlockFiles.push_back(Path);
+ continue;
+ }
+
Ref<BlockStoreFile> BlockFile{new BlockStoreFile(Path)};
BlockFile->Open();
m_TotalSize.fetch_add(BlockFile->TotalSize(), std::memory_order::relaxed);
@@ -347,6 +362,17 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max
}
++FolderOffset;
}
+
+ for (const std::filesystem::path& EmptyBlockFile : EmptyBlockFiles)
+ {
+ std::error_code Ec;
+ RemoveFile(EmptyBlockFile, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Unable to remove empty block file {}. Reason: {}", EmptyBlockFile, Ec.message());
+ }
+ }
+
m_WriteBlockIndex.store(NextBlockIndex, std::memory_order_release);
}
else
@@ -355,7 +381,7 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max
}
}
-void
+BlockStore::BlockIndexSet
BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks)
{
ZEN_MEMSCOPE(GetBlocksTag());
@@ -363,8 +389,8 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks)
RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
- tsl::robin_set<uint32_t> MissingBlocks;
- tsl::robin_set<uint32_t> DeleteBlocks;
+ BlockIndexSet MissingBlocks;
+ BlockIndexSet DeleteBlocks;
DeleteBlocks.reserve(m_ChunkBlocks.size());
for (auto It : m_ChunkBlocks)
{
@@ -383,13 +409,6 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks)
MissingBlocks.insert(BlockIndex);
}
}
- for (std::uint32_t BlockIndex : MissingBlocks)
- {
- std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex);
- Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath));
- NewBlockFile->Create(0);
- m_ChunkBlocks[BlockIndex] = NewBlockFile;
- }
for (std::uint32_t BlockIndex : DeleteBlocks)
{
std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex);
@@ -400,6 +419,7 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks)
}
m_ChunkBlocks.erase(BlockIndex);
}
+ return MissingBlocks;
}
BlockStore::BlockEntryCountMap
@@ -1037,6 +1057,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
{
Continue = ChangeCallback(MovedChunks, ScrubbedChunks, RemovedSize > AddedSize ? RemovedSize - AddedSize : 0);
DeletedSize += RemovedSize;
+ m_TotalSize.fetch_add(AddedSize);
RemovedSize = 0;
AddedSize = 0;
MovedCount += MovedChunks.size();
@@ -1220,14 +1241,13 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
NiceBytes(Space.Free + ReclaimedSpace));
}
NewBlockFile->Create(m_MaxBlockSize);
- NewBlockIndex = NextBlockIndex;
- WriteOffset = 0;
- AddedSize += WriteOffset;
+ NewBlockIndex = NextBlockIndex;
WriteOffset = 0;
TargetFileBuffer = std::make_unique<BasicFileWriter>(NewBlockFile->GetBasicFile(), Min(256u * 1024u, m_MaxBlockSize));
}
- WriteOffset = TargetFileBuffer->AlignTo(PayloadAlignment);
+ const uint64_t OldWriteOffset = WriteOffset;
+ WriteOffset = TargetFileBuffer->AlignTo(PayloadAlignment);
TargetFileBuffer->Write(ChunkView.GetData(), ChunkLocation.Size, WriteOffset);
MovedChunks.push_back(
@@ -1235,8 +1255,9 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
WriteOffset += ChunkLocation.Size;
MovedFromBlock += RoundUp(ChunkLocation.Offset + ChunkLocation.Size, PayloadAlignment) - ChunkLocation.Offset;
+ uint64_t WrittenBytes = WriteOffset - OldWriteOffset;
+ AddedSize += WrittenBytes;
}
- AddedSize += WriteOffset;
ZEN_INFO("{}moved {} chunks ({}) from '{}' to new block, freeing {}",
LogPrefix,
KeepChunkIndexes.size(),
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp
index afb7e4bee..c25f762f5 100644
--- a/src/zenstore/buildstore/buildstore.cpp
+++ b/src/zenstore/buildstore/buildstore.cpp
@@ -177,22 +177,60 @@ BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc)
m_Config.SmallBlobBlockStoreAlignement,
IsNew);
m_MetadataBlockStore.Initialize(Config.RootDirectory / "metadata", m_Config.MetadataBlockStoreMaxBlockSize, 1u << 20);
+
+ BlockStore::BlockIndexSet KnownBlocks;
+ for (const BlobEntry& Blob : m_BlobEntries)
{
- BlockStore::BlockIndexSet KnownBlocks;
- for (const BlobEntry& Blob : m_BlobEntries)
+ if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex)
{
- if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex)
- {
- const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex];
- KnownBlocks.insert(Metadata.Location.BlockIndex);
- }
+ const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex];
+ KnownBlocks.insert(Metadata.Location.BlockIndex);
}
- m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
}
+ BlockStore::BlockIndexSet MissingBlocks = m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
m_PayloadlogFile.Open(BlobLogPath, CasLogFile::Mode::kWrite);
m_MetadatalogFile.Open(MetaLogPath, CasLogFile::Mode::kWrite);
+ if (!MissingBlocks.empty())
+ {
+ std::vector<MetadataDiskEntry> MissingMetadatas;
+ for (auto& It : m_BlobLookup)
+ {
+ const IoHash& BlobHash = It.first;
+ const BlobIndex ReadBlobIndex = It.second;
+ const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
+ if (ReadBlobEntry.Metadata)
+ {
+ const MetadataEntry& MetaData = m_MetadataEntries[ReadBlobEntry.Metadata];
+ if (MissingBlocks.contains(MetaData.Location.BlockIndex))
+ {
+ MissingMetadatas.push_back(
+ MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = BlobHash});
+ MissingMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone;
+ m_MetadataEntries[ReadBlobEntry.Metadata] = {};
+ m_BlobEntries[ReadBlobIndex].Metadata = {};
+ }
+ }
+ }
+ ZEN_ASSERT(!MissingMetadatas.empty());
+
+ for (const MetadataDiskEntry& Entry : MissingMetadatas)
+ {
+ auto It = m_BlobLookup.find(Entry.BlobHash);
+ ZEN_ASSERT(It != m_BlobLookup.end());
+
+ const BlobIndex ReadBlobIndex = It->second;
+ const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
+ if (!ReadBlobEntry.Payload)
+ {
+ m_BlobLookup.erase(It);
+ }
+ }
+ m_MetadatalogFile.Append(MissingMetadatas);
+ CompactState();
+ }
+
m_Gc.AddGcReferencer(*this);
m_Gc.AddGcReferenceLocker(*this);
m_Gc.AddGcStorage(this);
@@ -256,34 +294,36 @@ BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload)
ZEN_UNUSED(Result);
Entry = PayloadEntry(0, PayloadSize);
}
- m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
- RwLock::ExclusiveLockScope _(m_Lock);
- if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
{
- const BlobIndex ExistingBlobIndex = It->second;
- BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex];
- if (Blob.Payload)
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
{
- m_PayloadEntries[Blob.Payload] = Entry;
+ const BlobIndex ExistingBlobIndex = It->second;
+ BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex];
+ if (Blob.Payload)
+ {
+ m_PayloadEntries[Blob.Payload] = Entry;
+ }
+ else
+ {
+ Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
+ m_PayloadEntries.push_back(Entry);
+ }
+ Blob.LastAccessTime = GcClock::TickCount();
}
else
{
- Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
+ PayloadIndex NewPayloadIndex = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
m_PayloadEntries.push_back(Entry);
- }
- Blob.LastAccessTime = GcClock::TickCount();
- }
- else
- {
- PayloadIndex NewPayloadIndex = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
- m_PayloadEntries.push_back(Entry);
- const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size()));
- // we only remove during GC and compact this then...
- m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())});
- m_BlobLookup.insert({BlobHash, NewBlobIndex});
+ const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size()));
+ // we only remove during GC and compact this then...
+ m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())});
+ m_BlobLookup.insert({BlobHash, NewBlobIndex});
+ }
}
+ m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
m_LastAccessTimeUpdateCount++;
}
@@ -370,7 +410,6 @@ BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoB
const BlockStoreLocation& Location = Locations[LocationIndex];
MetadataEntry Entry = {.Location = Location, .ContentType = Data.GetContentType(), .Flags = 0};
- m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
{
@@ -396,6 +435,9 @@ BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoB
m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())});
m_BlobLookup.insert({BlobHash, NewBlobIndex});
}
+
+ m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
+
m_LastAccessTimeUpdateCount++;
WriteBlobIndex++;
if (m_TrackedCacheKeys)
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 3f1f0e34a..0ee70890c 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -751,6 +751,16 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
ZenCacheDiskLayer::CacheBucket::~CacheBucket()
{
+ try
+ {
+ m_SlogFile.Flush();
+ m_SlogFile.Close();
+ m_BlockStore.Close();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~CacheBucket() failed with: ", Ex.what());
+ }
m_Gc.RemoveGcReferencer(*this);
}
@@ -824,11 +834,13 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
}
void
-ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const std::function<uint64_t()>& ClaimDiskReserveFunc)
+ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(uint64_t LogPosition,
+ bool ResetLog,
+ const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot");
- if (m_LogFlushPosition == m_SlogFile.GetLogCount())
+ if (m_LogFlushPosition == LogPosition)
{
return;
}
@@ -877,7 +889,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const st
throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir));
}
- const uint64_t IndexLogPosition = ResetLog ? 0 : m_SlogFile.GetLogCount();
+ const uint64_t IndexLogPosition = ResetLog ? 0 : LogPosition;
cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount,
.LogPosition = IndexLogPosition,
@@ -930,12 +942,14 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const st
if (IsFile(LogPath))
{
+ m_SlogFile.Close();
if (!RemoveFile(LogPath, Ec) || Ec)
{
// This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in
// the end it will be the same result
ZEN_WARN("snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message());
}
+ m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
}
}
m_LogFlushPosition = IndexLogPosition;
@@ -1149,13 +1163,6 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco
}
}
- if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0)
- {
- WriteIndexSnapshot(IndexLock, /*Flush log*/ true);
- }
-
- m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
-
BlockStore::BlockIndexSet KnownBlocks;
for (const auto& Entry : m_Index)
{
@@ -1173,7 +1180,53 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco
KnownBlocks.insert(BlockIndex);
}
}
- m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+ BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+ m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ bool RemovedEntries = false;
+ if (!MissingBlocks.empty())
+ {
+ std::vector<DiskIndexEntry> MissingEntries;
+
+ for (auto& It : m_Index)
+ {
+ BucketPayload& Payload = m_Payloads[It.second];
+ DiskLocation Location = Payload.Location;
+ if (!Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ if (MissingBlocks.contains(Location.Location.BlockLocation.GetBlockIndex()))
+ {
+ RemoveMemCachedData(IndexLock, Payload);
+ RemoveMetaData(IndexLock, Payload);
+ }
+ }
+ Location.Flags |= DiskLocation::kTombStone;
+ MissingEntries.push_back(DiskIndexEntry{.Key = It.first, .Location = Location});
+ }
+
+ ZEN_ASSERT(!MissingEntries.empty());
+
+ for (const DiskIndexEntry& Entry : MissingEntries)
+ {
+ m_Index.erase(Entry.Key);
+ }
+ m_SlogFile.Append(MissingEntries);
+ m_SlogFile.Flush();
+ {
+ std::vector<BucketPayload> Payloads;
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketMetaData> MetaDatas;
+ std::vector<MemCacheData> MemCachedPayloads;
+ IndexMap Index;
+ CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index);
+ }
+ RemovedEntries = true;
+ }
+
+ if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0 || RemovedEntries)
+ {
+ WriteIndexSnapshot(IndexLock, m_SlogFile.GetLogCount(), /*Flush log*/ true);
+ }
}
void
@@ -2024,6 +2077,9 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot");
try
{
+ // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock
+ const uint64_t LogPosition = m_SlogFile.GetLogCount();
+
bool UseLegacyScheme = false;
IoBuffer Buffer;
@@ -2038,7 +2094,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
{
RwLock::SharedLockScope IndexLock(m_IndexLock);
- WriteIndexSnapshot(IndexLock, /*Flush log*/ false);
+ WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false);
// Note: this copy could be eliminated on shutdown to
// reduce memory usage and execution time
Index = m_Index;
@@ -2078,7 +2134,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
else
{
RwLock::SharedLockScope IndexLock(m_IndexLock);
- WriteIndexSnapshot(IndexLock, /*Flush log*/ false);
+ WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false);
const uint64_t EntryCount = m_Index.size();
Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount);
uint64_t SidecarSize = ManifestWriter.GetSidecarSize();
@@ -2727,7 +2783,6 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
ZEN_MEMSCOPE(GetCacheDiskTag());
ZEN_TRACE_CPU("Z$::Bucket::UpdateLocation");
DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags);
- m_SlogFile.Append({.Key = HashKey, .Location = Location});
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
if (m_TrackedCacheKeys)
@@ -2757,6 +2812,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
m_AccessTimes.emplace_back(GcClock::TickCount());
m_Index.insert_or_assign(HashKey, EntryIndex);
}
+ m_SlogFile.Append({.Key = HashKey, .Location = Location});
});
}
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 0c9302ec8..2ab5752ff 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -145,6 +145,16 @@ CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get("
CasContainerStrategy::~CasContainerStrategy()
{
+ try
+ {
+ m_BlockStore.Close();
+ m_CasLog.Flush();
+ m_CasLog.Close();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~CasContainerStrategy failed with: ", Ex.what());
+ }
m_Gc.RemoveGcReferenceStore(*this);
m_Gc.RemoveGcStorage(this);
}
@@ -204,12 +214,12 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
ZEN_TRACE_CPU("CasContainer::UpdateLocation");
BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
- m_CasLog.Append(IndexEntry);
{
RwLock::ExclusiveLockScope _(m_LocationMapLock);
m_LocationMap.emplace(ChunkHash, m_Locations.size());
m_Locations.push_back(DiskLocation);
}
+ m_CasLog.Append(IndexEntry);
});
return CasStore::InsertResult{.New = true};
@@ -273,7 +283,6 @@ CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<c
IndexEntries.emplace_back(
CasDiskIndexEntry{.Key = ChunkHashes[ChunkIndex], .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)});
}
- m_CasLog.Append(IndexEntries);
{
RwLock::ExclusiveLockScope _(m_LocationMapLock);
for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries)
@@ -282,6 +291,7 @@ CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<c
m_Locations.push_back(DiskIndexEntry.Location);
}
}
+ m_CasLog.Append(IndexEntries);
});
return Result;
}
@@ -746,19 +756,27 @@ public:
MovedEntries.push_back(CasDiskIndexEntry{.Key = Key, .Location = Location});
}
}
- for (size_t ScrubbedIndex : ScrubbedArray)
+ for (size_t ChunkIndex : ScrubbedArray)
{
- const IoHash& Key = BlockCompactStateKeys[ScrubbedIndex];
+ const IoHash& Key = BlockCompactStateKeys[ChunkIndex];
if (auto It = m_CasContainerStrategy.m_LocationMap.find(Key);
It != m_CasContainerStrategy.m_LocationMap.end())
{
- BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second];
+ BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second];
+ const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex);
+ if (Location.Get(m_CasContainerStrategy.m_PayloadAlignment) != OldLocation)
+ {
+ // Someone has moved our chunk so lets just skip the new location we were provided, it will be
+ // GC:d at a later time
+ continue;
+ }
MovedEntries.push_back(
CasDiskIndexEntry{.Key = Key, .Location = Location, .Flags = CasDiskIndexEntry::kTombstone});
m_CasContainerStrategy.m_LocationMap.erase(It);
}
}
m_CasContainerStrategy.m_CasLog.Append(MovedEntries);
+ m_CasContainerStrategy.m_CasLog.Flush();
Stats.RemovedDisk += FreedDiskSpace;
if (Ctx.IsCancelledFlag.load())
{
@@ -984,14 +1002,11 @@ CasContainerStrategy::MakeIndexSnapshot(bool ResetLog)
{
// Write the current state of the location map to a new index state
std::vector<CasDiskIndexEntry> Entries;
- uint64_t IndexLogPosition = 0;
+ // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock
+ const uint64_t IndexLogPosition = ResetLog ? 0 : m_CasLog.GetLogCount();
{
RwLock::SharedLockScope ___(m_LocationMapLock);
- if (!ResetLog)
- {
- IndexLogPosition = m_CasLog.GetLogCount();
- }
Entries.resize(m_LocationMap.size());
uint64_t EntryIndex = 0;
@@ -1036,12 +1051,14 @@ CasContainerStrategy::MakeIndexSnapshot(bool ResetLog)
if (IsFile(LogPath))
{
+ m_CasLog.Close();
if (!RemoveFile(LogPath, Ec) || Ec)
{
// This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in
// the end it will be the same result
ZEN_WARN("Snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message());
}
+ m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
}
}
m_LogFlushPosition = IndexLogPosition;
@@ -1154,7 +1171,7 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski
ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
SkipEntryCount = 0;
}
- LogEntryCount = EntryCount - SkipEntryCount;
+ LogEntryCount = SkipEntryCount;
CasLog.Replay(
[&](const CasDiskIndexEntry& Record) {
LogEntryCount++;
@@ -1173,7 +1190,6 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski
m_Locations.push_back(Record.Location);
},
SkipEntryCount);
-
return LogEntryCount;
}
return 0;
@@ -1229,8 +1245,6 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
}
}
- m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
-
BlockStore::BlockIndexSet KnownBlocks;
for (const auto& Entry : m_LocationMap)
@@ -1240,9 +1254,39 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
KnownBlocks.insert(BlockIndex);
}
- m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+ BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+
+ m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ bool RemovedEntries = false;
+ if (!MissingBlocks.empty())
+ {
+ std::vector<CasDiskIndexEntry> MissingEntries;
+ for (auto& It : m_LocationMap)
+ {
+ const uint32_t BlockIndex = m_Locations[It.second].GetBlockIndex();
+ if (MissingBlocks.contains(BlockIndex))
+ {
+ MissingEntries.push_back({.Key = It.first, .Location = m_Locations[It.second], .Flags = CasDiskIndexEntry::kTombstone});
+ }
+ }
+ ZEN_ASSERT(!MissingEntries.empty());
+
+ for (const CasDiskIndexEntry& Entry : MissingEntries)
+ {
+ m_LocationMap.erase(Entry.Key);
+ }
+ m_CasLog.Append(MissingEntries);
+ m_CasLog.Flush();
+
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_LocationMapLock);
+ CompactIndex(IndexLock);
+ }
+ RemovedEntries = true;
+ }
- if (IsNewStore || (LogEntryCount > 0))
+ if (IsNewStore || (LogEntryCount > 0) || RemovedEntries)
{
MakeIndexSnapshot(/*ResetLog*/ true);
}
@@ -1612,6 +1656,236 @@ TEST_CASE("compactcas.threadedinsert")
}
}
+TEST_CASE("compactcas.restart")
+{
+ uint64_t ExpectedSize = 0;
+
+ auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) {
+ WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put");
+
+ Latch WorkLatch(1);
+ tsl::robin_set<IoHash, IoHash::Hasher> ChunkHashesLookup;
+ ChunkHashesLookup.reserve(ChunkCount);
+ RwLock InsertLock;
+ for (size_t Offset = 0; Offset < ChunkCount;)
+ {
+ size_t BatchCount = Min<size_t>(ChunkCount - Offset, 512u);
+ WorkLatch.AddCount(1);
+ ThreadPool.ScheduleWork(
+ [&WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount, ChunkSize]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+
+ std::vector<IoBuffer> BatchBlobs;
+ std::vector<IoHash> BatchHashes;
+ BatchBlobs.reserve(BatchCount);
+ BatchHashes.reserve(BatchCount);
+
+ while (BatchBlobs.size() < BatchCount)
+ {
+ IoBuffer Chunk =
+ CreateSemiRandomBlob(ChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377));
+ IoHash Hash = IoHash::HashBuffer(Chunk);
+ {
+ RwLock::ExclusiveLockScope __(InsertLock);
+ if (ChunkHashesLookup.contains(Hash))
+ {
+ continue;
+ }
+ ChunkHashesLookup.insert(Hash);
+ ExpectedSize += Chunk.Size();
+ }
+
+ BatchBlobs.emplace_back(CompressedBuffer::Compress(SharedBuffer(Chunk)).GetCompressed().Flatten().AsIoBuffer());
+ BatchHashes.push_back(Hash);
+ }
+
+ Cas.InsertChunks(BatchBlobs, BatchHashes);
+ {
+ RwLock::ExclusiveLockScope __(InsertLock);
+ Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end());
+ }
+ });
+ Offset += BatchCount;
+ }
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+ };
+
+ ScopedTemporaryDirectory TempDir;
+ std::filesystem::path CasPath = TempDir.Path();
+ CreateDirectories(CasPath);
+
+ bool Generate = false;
+ if (!Generate)
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ }
+
+ const uint64_t kChunkSize = 1048 + 395;
+ const size_t kChunkCount = 7167;
+
+ std::vector<IoHash> Hashes;
+ Hashes.reserve(kChunkCount);
+
+ auto ValidateChunks = [&](CasContainerStrategy& Cas, std::span<const IoHash> Hashes, bool ShouldExist) {
+ for (const IoHash& Hash : Hashes)
+ {
+ if (ShouldExist)
+ {
+ CHECK(Cas.HaveChunk(Hash));
+ IoBuffer Buffer = Cas.FindChunk(Hash);
+ CHECK(Buffer);
+ IoHash ValidateHash;
+ uint64_t ValidateRawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize);
+ CHECK(Compressed);
+ CHECK(ValidateHash == Hash);
+ }
+ else
+ {
+ CHECK(!Cas.HaveChunk(Hash));
+ IoBuffer Buffer = Cas.FindChunk(Hash);
+ CHECK(!Buffer);
+ }
+ }
+ };
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, true);
+ GenerateChunks(Cas, kChunkCount, kChunkSize, Hashes);
+ ValidateChunks(Cas, Hashes, true);
+ Cas.Flush();
+ ValidateChunks(Cas, Hashes, true);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ ValidateChunks(Cas, Hashes, true);
+ GenerateChunks(Cas, kChunkCount, kChunkSize / 4, Hashes);
+ ValidateChunks(Cas, Hashes, true);
+ }
+
+ class GcRefChecker : public GcReferenceChecker
+ {
+ public:
+ explicit GcRefChecker(std::vector<IoHash>&& HashesToKeep) : m_HashesToKeep(std::move(HashesToKeep)) {}
+ ~GcRefChecker() {}
+ std::string GetGcName(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return "test";
+ }
+ void PreCache(GcCtx& Ctx) override { FilterReferences(Ctx, "test", m_HashesToKeep); }
+ void UpdateLockedState(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); }
+ std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override
+ {
+ ZEN_UNUSED(Ctx);
+ return KeepUnusedReferences(m_HashesToKeep, IoCids);
+ }
+
+ private:
+ std::vector<IoHash> m_HashesToKeep;
+ };
+
+ class GcRef : public GcReferencer
+ {
+ public:
+ GcRef(GcManager& Gc, std::span<const IoHash> HashesToKeep) : m_Gc(Gc)
+ {
+ m_HashesToKeep.insert(m_HashesToKeep.begin(), HashesToKeep.begin(), HashesToKeep.end());
+ m_Gc.AddGcReferencer(*this);
+ }
+ ~GcRef() { m_Gc.RemoveGcReferencer(*this); }
+ std::string GetGcName(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return "test";
+ }
+ GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override
+ {
+ ZEN_UNUSED(Ctx, Stats);
+ return nullptr;
+ }
+ std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return {new GcRefChecker(std::move(m_HashesToKeep))};
+ }
+ std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return {};
+ }
+
+ private:
+ GcManager& m_Gc;
+ std::vector<IoHash> m_HashesToKeep;
+ };
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ GenerateChunks(Cas, kChunkCount, kChunkSize / 5, Hashes);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ ValidateChunks(Cas, Hashes, true);
+ GenerateChunks(Cas, kChunkCount, kChunkSize / 2, Hashes);
+ ValidateChunks(Cas, Hashes, true);
+ if (true)
+ {
+ std::vector<IoHash> DropHashes;
+ std::vector<IoHash> KeepHashes;
+ for (size_t Index = 0; Index < Hashes.size(); Index++)
+ {
+ if (Index % 5 == 0)
+ {
+ KeepHashes.push_back(Hashes[Index]);
+ }
+ else
+ {
+ DropHashes.push_back(Hashes[Index]);
+ }
+ }
+ // std::span<const IoHash> KeepHashes(Hashes);
+ // ZEN_ASSERT(ExpectedGcCount < Hashes.size());
+ // KeepHashes = KeepHashes.subspan(ExpectedGcCount);
+ GcRef Ref(Gc, KeepHashes);
+ Gc.CollectGarbage(GcSettings{.CollectSmallObjects = true, .IsDeleteMode = true});
+ ValidateChunks(Cas, KeepHashes, true);
+ ValidateChunks(Cas, DropHashes, false);
+ Hashes = KeepHashes;
+ }
+ GenerateChunks(Cas, kChunkCount, kChunkSize / 3, Hashes);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ ValidateChunks(Cas, Hashes, true);
+ Cas.Flush();
+ ValidateChunks(Cas, Hashes, true);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ ValidateChunks(Cas, Hashes, true);
+ }
+}
+
TEST_CASE("compactcas.iteratechunks")
{
std::atomic<size_t> WorkCompleted = 0;
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 539b5e95b..11a266f1c 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -969,14 +969,11 @@ FileCasStrategy::MakeIndexSnapshot(bool ResetLog)
{
// Write the current state of the location map to a new index state
std::vector<FileCasIndexEntry> Entries;
- uint64_t IndexLogPosition = 0;
+ // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock
+ const uint64_t IndexLogPosition = ResetLog ? 0 : m_CasLog.GetLogCount();
{
RwLock::SharedLockScope __(m_Lock);
- if (!ResetLog)
- {
- IndexLogPosition = m_CasLog.GetLogCount();
- }
Entries.resize(m_Index.size());
uint64_t EntryIndex = 0;
@@ -1019,12 +1016,14 @@ FileCasStrategy::MakeIndexSnapshot(bool ResetLog)
if (IsFile(LogPath))
{
+ m_CasLog.Close();
if (!RemoveFile(LogPath, Ec) || Ec)
{
// This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in
// the end it will be the same result
ZEN_WARN("Snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message());
}
+ m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
}
}
m_LogFlushPosition = IndexLogPosition;
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index 8cbcad11b..fce05766f 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -147,10 +147,10 @@ public:
typedef tsl::robin_set<uint32_t> BlockIndexSet;
- // Ask the store to create empty blocks for all locations that does not have a block
// Remove any block that is not referenced
- void SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks);
- BlockEntryCountMap GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent);
+ // Return a list of blocks that are not present
+ [[nodiscard]] BlockIndexSet SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks);
+ BlockEntryCountMap GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent);
void Close();
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index b67a043df..3cd2d6423 100644
--- a/src/zenstore/include/zenstore/cache/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
@@ -409,19 +409,22 @@ public:
void SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; });
void WriteIndexSnapshot(
RwLock::ExclusiveLockScope&,
+ uint64_t LogPosition,
bool ResetLog,
const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; })
{
- WriteIndexSnapshotLocked(ResetLog, ClaimDiskReserveFunc);
+ WriteIndexSnapshotLocked(LogPosition, ResetLog, ClaimDiskReserveFunc);
}
void WriteIndexSnapshot(
RwLock::SharedLockScope&,
+ uint64_t LogPosition,
bool ResetLog,
const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; })
{
- WriteIndexSnapshotLocked(ResetLog, ClaimDiskReserveFunc);
+ WriteIndexSnapshotLocked(LogPosition, ResetLog, ClaimDiskReserveFunc);
}
void WriteIndexSnapshotLocked(
+ uint64_t LogPosition,
bool ResetLog,
const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; });