aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/compactcas.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore/compactcas.cpp')
-rw-r--r--src/zenstore/compactcas.cpp707
1 files changed, 607 insertions, 100 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 2be0542db..b00abb2cb 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -15,6 +15,7 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenstore/scrubcontext.h>
+#include <zenutil/parallelwork.h>
#include <gsl/gsl-lite.hpp>
@@ -144,6 +145,16 @@ CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get("
CasContainerStrategy::~CasContainerStrategy()
{
+ try
+ {
+ m_BlockStore.Close();
+ m_CasLog.Flush();
+ m_CasLog.Close();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~CasContainerStrategy failed with: ", Ex.what());
+ }
m_Gc.RemoveGcReferenceStore(*this);
m_Gc.RemoveGcStorage(this);
}
@@ -203,12 +214,12 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
ZEN_TRACE_CPU("CasContainer::UpdateLocation");
BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
- m_CasLog.Append(IndexEntry);
{
RwLock::ExclusiveLockScope _(m_LocationMapLock);
m_LocationMap.emplace(ChunkHash, m_Locations.size());
m_Locations.push_back(DiskLocation);
}
+ m_CasLog.Append(IndexEntry);
});
return CasStore::InsertResult{.New = true};
@@ -226,7 +237,7 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
}
std::vector<CasStore::InsertResult>
-CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ChunkHashes)
+CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<const IoHash> ChunkHashes)
{
ZEN_MEMSCOPE(GetCasContainerTag());
@@ -272,7 +283,6 @@ CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash>
IndexEntries.emplace_back(
CasDiskIndexEntry{.Key = ChunkHashes[ChunkIndex], .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)});
}
- m_CasLog.Append(IndexEntries);
{
RwLock::ExclusiveLockScope _(m_LocationMapLock);
for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries)
@@ -281,6 +291,7 @@ CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash>
m_Locations.push_back(DiskIndexEntry.Location);
}
}
+ m_CasLog.Append(IndexEntries);
});
return Result;
}
@@ -306,7 +317,12 @@ bool
CasContainerStrategy::HaveChunk(const IoHash& ChunkHash)
{
RwLock::SharedLockScope _(m_LocationMapLock);
- return m_LocationMap.contains(ChunkHash);
+ if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
+ {
+ const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment);
+ return m_BlockStore.HasChunk(Location);
+ }
+ return false;
}
void
@@ -323,7 +339,7 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks)
}
bool
-CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
+CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
WorkerThreadPool* OptionalWorkerPool,
uint64_t LargeSizeLimit)
@@ -360,7 +376,11 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
return true;
}
- auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) {
+ auto DoOneBlock = [this](const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
+ uint64_t LargeSizeLimit,
+ std::span<const size_t> FoundChunkIndexes,
+ std::span<const BlockStoreLocation> FoundChunkLocations,
+ std::span<const size_t> ChunkIndexes) {
if (ChunkIndexes.size() < 4)
{
for (size_t ChunkIndex : ChunkIndexes)
@@ -376,57 +396,96 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
return m_BlockStore.IterateBlock(
FoundChunkLocations,
ChunkIndexes,
- [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ [AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) {
if (Data == nullptr)
{
return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer());
}
return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size));
},
- [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ [AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size));
},
LargeSizeLimit);
};
- Latch WorkLatch(1);
- std::atomic_bool AsyncContinue = true;
- bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
- if (OptionalWorkerPool && (ChunkIndexes.size() > 3))
+ std::atomic<bool> AbortFlag;
+ {
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
+ try
{
- WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- if (!AsyncContinue)
- {
- return;
- }
- try
- {
- bool Continue = DoOneBlock(ChunkIndexes);
- if (!Continue)
+ const bool Continue = m_BlockStore.IterateChunks(
+ FoundChunkLocations,
+ [this,
+ &Work,
+ &AbortFlag,
+ &AsyncCallback,
+ LargeSizeLimit,
+ DoOneBlock,
+ &FoundChunkIndexes,
+ &FoundChunkLocations,
+ OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
+ if (OptionalWorkerPool && (ChunkIndexes.size() > 3))
{
- AsyncContinue.store(false);
- }
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
- m_RootDirectory,
+ std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this,
+ &AsyncCallback,
+ LargeSizeLimit,
+ DoOneBlock,
BlockIndex,
- Ex.what());
- }
- });
- return AsyncContinue.load();
+ &FoundChunkIndexes,
+ &FoundChunkLocations,
+ ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
+ try
+ {
+ bool Continue =
+ DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
+ if (!Continue)
+ {
+ AbortFlag.store(true);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
+ m_RootDirectory,
+ BlockIndex,
+ Ex.what());
+ AbortFlag.store(true);
+ }
+ });
+ return !AbortFlag.load();
+ }
+ else
+ {
+ if (!DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes))
+ {
+ AbortFlag.store(true);
+ }
+ return !AbortFlag.load();
+ }
+ });
+ if (!Continue)
+ {
+ AbortFlag.store(true);
+ }
}
- else
+ catch (const std::exception& Ex)
{
- return DoOneBlock(ChunkIndexes);
+ AbortFlag.store(true);
+ ZEN_WARN("Failed iterating chunks for cas root path {}. Reason: '{}'", m_RootDirectory, Ex.what());
}
- });
- WorkLatch.CountDown();
- WorkLatch.Wait();
- return AsyncContinue.load() && Continue;
+
+ Work.Wait();
+ }
+ return !AbortFlag.load();
}
void
@@ -437,7 +496,7 @@ CasContainerStrategy::Flush()
ZEN_TRACE_CPU("CasContainer::Flush");
m_BlockStore.Flush(/*ForceNewBlock*/ false);
m_CasLog.Flush();
- MakeIndexSnapshot();
+ MakeIndexSnapshot(/*ResetLog*/ false);
}
void
@@ -677,7 +736,9 @@ public:
m_CasContainerStrategy.m_BlockStore.CompactBlocks(
BlockCompactState,
m_CasContainerStrategy.m_PayloadAlignment,
- [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) {
+ [&](const BlockStore::MovedChunksArray& MovedArray,
+ const BlockStore::ChunkIndexArray& ScrubbedArray,
+ uint64_t FreedDiskSpace) {
std::vector<CasDiskIndexEntry> MovedEntries;
RwLock::ExclusiveLockScope _(m_CasContainerStrategy.m_LocationMapLock);
for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray)
@@ -702,7 +763,27 @@ public:
MovedEntries.push_back(CasDiskIndexEntry{.Key = Key, .Location = Location});
}
}
+ for (size_t ChunkIndex : ScrubbedArray)
+ {
+ const IoHash& Key = BlockCompactStateKeys[ChunkIndex];
+ if (auto It = m_CasContainerStrategy.m_LocationMap.find(Key);
+ It != m_CasContainerStrategy.m_LocationMap.end())
+ {
+ BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second];
+ const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex);
+ if (Location.Get(m_CasContainerStrategy.m_PayloadAlignment) != OldLocation)
+ {
+ // Someone has moved our chunk so lets just skip the new location we were provided, it will be
+ // GC:d at a later time
+ continue;
+ }
+ MovedEntries.push_back(
+ CasDiskIndexEntry{.Key = Key, .Location = Location, .Flags = CasDiskIndexEntry::kTombstone});
+ m_CasContainerStrategy.m_LocationMap.erase(It);
+ }
+ }
m_CasContainerStrategy.m_CasLog.Append(MovedEntries);
+ m_CasContainerStrategy.m_CasLog.Flush();
Stats.RemovedDisk += FreedDiskSpace;
if (Ctx.IsCancelledFlag.load())
{
@@ -900,13 +981,12 @@ CasContainerStrategy::StorageSize() const
}
void
-CasContainerStrategy::MakeIndexSnapshot()
+CasContainerStrategy::MakeIndexSnapshot(bool ResetLog)
{
ZEN_MEMSCOPE(GetCasContainerTag());
ZEN_TRACE_CPU("CasContainer::MakeIndexSnapshot");
- uint64_t LogCount = m_CasLog.GetLogCount();
- if (m_LogFlushPosition == LogCount)
+ if (m_LogFlushPosition == m_CasLog.GetLogCount())
{
return;
}
@@ -923,34 +1003,17 @@ CasContainerStrategy::MakeIndexSnapshot()
namespace fs = std::filesystem;
- fs::path IndexPath = cas::impl::GetIndexPath(m_RootDirectory, m_ContainerBaseName);
- fs::path TempIndexPath = cas::impl::GetTempIndexPath(m_RootDirectory, m_ContainerBaseName);
-
- // Move index away, we keep it if something goes wrong
- if (fs::is_regular_file(TempIndexPath))
- {
- std::error_code Ec;
- if (!fs::remove(TempIndexPath, Ec) || Ec)
- {
- ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", TempIndexPath, Ec.message());
- return;
- }
- }
+ const fs::path IndexPath = cas::impl::GetIndexPath(m_RootDirectory, m_ContainerBaseName);
try
{
- if (fs::is_regular_file(IndexPath))
- {
- fs::rename(IndexPath, TempIndexPath);
- }
-
// Write the current state of the location map to a new index state
std::vector<CasDiskIndexEntry> Entries;
- uint64_t IndexLogPosition = 0;
+ // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock
+ const uint64_t IndexLogPosition = ResetLog ? 0 : m_CasLog.GetLogCount();
{
RwLock::SharedLockScope ___(m_LocationMapLock);
- IndexLogPosition = m_CasLog.GetLogCount();
Entries.resize(m_LocationMap.size());
uint64_t EntryIndex = 0;
@@ -960,6 +1023,7 @@ CasContainerStrategy::MakeIndexSnapshot()
IndexEntry.Key = Entry.first;
IndexEntry.Location = m_Locations[Entry.second];
}
+ EntryCount = m_LocationMap.size();
}
TemporaryFile ObjectIndexFile;
@@ -969,7 +1033,7 @@ CasContainerStrategy::MakeIndexSnapshot()
{
throw std::system_error(Ec, fmt::format("Failed to create temp file for index snapshot at '{}'", IndexPath));
}
- CasDiskIndexHeader Header = {.EntryCount = Entries.size(),
+ CasDiskIndexHeader Header = {.EntryCount = EntryCount,
.LogPosition = IndexLogPosition,
.PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
@@ -981,35 +1045,34 @@ CasContainerStrategy::MakeIndexSnapshot()
ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec);
if (Ec)
{
- throw std::system_error(Ec, fmt::format("Failed to move temp file '{}' to '{}'", ObjectIndexFile.GetPath(), IndexPath));
+ throw std::system_error(Ec,
+ fmt::format("Snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'",
+ ObjectIndexFile.GetPath(),
+ IndexPath,
+ Ec.message()));
}
- EntryCount = Entries.size();
- m_LogFlushPosition = IndexLogPosition;
- }
- catch (const std::exception& Err)
- {
- ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what());
-
- // Restore any previous snapshot
- if (fs::is_regular_file(TempIndexPath))
+ if (ResetLog)
{
- std::error_code Ec;
- fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless
- fs::rename(TempIndexPath, IndexPath, Ec);
- if (Ec)
+ const std::filesystem::path LogPath = cas::impl::GetLogPath(m_RootDirectory, m_ContainerBaseName);
+
+ if (IsFile(LogPath))
{
- ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", TempIndexPath, Ec.message());
+ m_CasLog.Close();
+ if (!RemoveFile(LogPath, Ec) || Ec)
+ {
+ // This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in
+ // the end it will be the same result
+ ZEN_WARN("Snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message());
+ }
+ m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
}
}
+ m_LogFlushPosition = IndexLogPosition;
}
- if (fs::is_regular_file(TempIndexPath))
+ catch (const std::exception& Err)
{
- std::error_code Ec;
- if (!fs::remove(TempIndexPath, Ec) || Ec)
- {
- ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempIndexPath, Ec.message());
- }
+ ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what());
}
}
@@ -1092,7 +1155,7 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski
if (!TCasLogFile<CasDiskIndexEntry>::IsValid(LogPath))
{
ZEN_WARN("removing invalid cas log at '{}'", LogPath);
- std::filesystem::remove(LogPath);
+ RemoveFile(LogPath);
return 0;
}
@@ -1115,7 +1178,7 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski
ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
SkipEntryCount = 0;
}
- LogEntryCount = EntryCount - SkipEntryCount;
+ LogEntryCount = SkipEntryCount;
CasLog.Replay(
[&](const CasDiskIndexEntry& Record) {
LogEntryCount++;
@@ -1134,7 +1197,6 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski
m_Locations.push_back(Record.Location);
},
SkipEntryCount);
-
return LogEntryCount;
}
return 0;
@@ -1155,7 +1217,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
if (IsNewStore)
{
- std::filesystem::remove_all(BasePath);
+ DeleteDirectories(BasePath);
}
CreateDirectories(BasePath);
@@ -1165,19 +1227,19 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
std::filesystem::path LogPath = cas::impl::GetLogPath(m_RootDirectory, m_ContainerBaseName);
std::filesystem::path IndexPath = cas::impl::GetIndexPath(m_RootDirectory, m_ContainerBaseName);
- if (std::filesystem::is_regular_file(IndexPath))
+ if (IsFile(IndexPath))
{
uint32_t IndexVersion = 0;
m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion);
if (IndexVersion == 0)
{
ZEN_WARN("removing invalid index file at '{}'", IndexPath);
- std::filesystem::remove(IndexPath);
+ RemoveFile(IndexPath);
}
}
uint64_t LogEntryCount = 0;
- if (std::filesystem::is_regular_file(LogPath))
+ if (IsFile(LogPath))
{
if (TCasLogFile<CasDiskIndexEntry>::IsValid(LogPath))
{
@@ -1186,12 +1248,10 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
else
{
ZEN_WARN("removing invalid cas log at '{}'", LogPath);
- std::filesystem::remove(LogPath);
+ RemoveFile(LogPath);
}
}
- m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
-
BlockStore::BlockIndexSet KnownBlocks;
for (const auto& Entry : m_LocationMap)
@@ -1201,11 +1261,41 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
KnownBlocks.insert(BlockIndex);
}
- m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+ BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+
+ m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ bool RemovedEntries = false;
+ if (!MissingBlocks.empty())
+ {
+ std::vector<CasDiskIndexEntry> MissingEntries;
+ for (auto& It : m_LocationMap)
+ {
+ const uint32_t BlockIndex = m_Locations[It.second].GetBlockIndex();
+ if (MissingBlocks.contains(BlockIndex))
+ {
+ MissingEntries.push_back({.Key = It.first, .Location = m_Locations[It.second], .Flags = CasDiskIndexEntry::kTombstone});
+ }
+ }
+ ZEN_ASSERT(!MissingEntries.empty());
+
+ for (const CasDiskIndexEntry& Entry : MissingEntries)
+ {
+ m_LocationMap.erase(Entry.Key);
+ }
+ m_CasLog.Append(MissingEntries);
+ m_CasLog.Flush();
+
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_LocationMapLock);
+ CompactIndex(IndexLock);
+ }
+ RemovedEntries = true;
+ }
- if (IsNewStore || (LogEntryCount > 0))
+ if (IsNewStore || (LogEntryCount > 0) || RemovedEntries)
{
- MakeIndexSnapshot();
+ MakeIndexSnapshot(/*ResetLog*/ true);
}
// TODO: should validate integrity of container files here
@@ -1573,6 +1663,423 @@ TEST_CASE("compactcas.threadedinsert")
}
}
+TEST_CASE("compactcas.restart")
+{
+ uint64_t ExpectedSize = 0;
+
+ auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) {
+ WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put");
+
+ Latch WorkLatch(1);
+ tsl::robin_set<IoHash, IoHash::Hasher> ChunkHashesLookup;
+ ChunkHashesLookup.reserve(ChunkCount);
+ RwLock InsertLock;
+ for (size_t Offset = 0; Offset < ChunkCount;)
+ {
+ size_t BatchCount = Min<size_t>(ChunkCount - Offset, 512u);
+ WorkLatch.AddCount(1);
+ ThreadPool.ScheduleWork(
+ [&WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount, ChunkSize]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+
+ std::vector<IoBuffer> BatchBlobs;
+ std::vector<IoHash> BatchHashes;
+ BatchBlobs.reserve(BatchCount);
+ BatchHashes.reserve(BatchCount);
+
+ while (BatchBlobs.size() < BatchCount)
+ {
+ IoBuffer Chunk =
+ CreateSemiRandomBlob(ChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377));
+ IoHash Hash = IoHash::HashBuffer(Chunk);
+ {
+ RwLock::ExclusiveLockScope __(InsertLock);
+ if (ChunkHashesLookup.contains(Hash))
+ {
+ continue;
+ }
+ ChunkHashesLookup.insert(Hash);
+ ExpectedSize += Chunk.Size();
+ }
+
+ BatchBlobs.emplace_back(CompressedBuffer::Compress(SharedBuffer(Chunk)).GetCompressed().Flatten().AsIoBuffer());
+ BatchHashes.push_back(Hash);
+ }
+
+ Cas.InsertChunks(BatchBlobs, BatchHashes);
+ {
+ RwLock::ExclusiveLockScope __(InsertLock);
+ Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end());
+ }
+ });
+ Offset += BatchCount;
+ }
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+ };
+
+ ScopedTemporaryDirectory TempDir;
+ std::filesystem::path CasPath = TempDir.Path();
+ CreateDirectories(CasPath);
+
+ bool Generate = false;
+ if (!Generate)
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ }
+
+ const uint64_t kChunkSize = 1048 + 395;
+ const size_t kChunkCount = 7167;
+
+ std::vector<IoHash> Hashes;
+ Hashes.reserve(kChunkCount);
+
+ auto ValidateChunks = [&](CasContainerStrategy& Cas, std::span<const IoHash> Hashes, bool ShouldExist) {
+ for (const IoHash& Hash : Hashes)
+ {
+ if (ShouldExist)
+ {
+ CHECK(Cas.HaveChunk(Hash));
+ IoBuffer Buffer = Cas.FindChunk(Hash);
+ CHECK(Buffer);
+ IoHash ValidateHash;
+ uint64_t ValidateRawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize);
+ CHECK(Compressed);
+ CHECK(ValidateHash == Hash);
+ }
+ else
+ {
+ CHECK(!Cas.HaveChunk(Hash));
+ IoBuffer Buffer = Cas.FindChunk(Hash);
+ CHECK(!Buffer);
+ }
+ }
+ };
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, true);
+ GenerateChunks(Cas, kChunkCount, kChunkSize, Hashes);
+ ValidateChunks(Cas, Hashes, true);
+ Cas.Flush();
+ ValidateChunks(Cas, Hashes, true);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ ValidateChunks(Cas, Hashes, true);
+ GenerateChunks(Cas, kChunkCount, kChunkSize / 4, Hashes);
+ ValidateChunks(Cas, Hashes, true);
+ }
+
+ class GcRefChecker : public GcReferenceChecker
+ {
+ public:
+ explicit GcRefChecker(std::vector<IoHash>&& HashesToKeep) : m_HashesToKeep(std::move(HashesToKeep)) {}
+ ~GcRefChecker() {}
+ std::string GetGcName(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return "test";
+ }
+ void PreCache(GcCtx& Ctx) override { FilterReferences(Ctx, "test", m_HashesToKeep); }
+ void UpdateLockedState(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); }
+ std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override
+ {
+ ZEN_UNUSED(Ctx);
+ return KeepUnusedReferences(m_HashesToKeep, IoCids);
+ }
+
+ private:
+ std::vector<IoHash> m_HashesToKeep;
+ };
+
+ class GcRef : public GcReferencer
+ {
+ public:
+ GcRef(GcManager& Gc, std::span<const IoHash> HashesToKeep) : m_Gc(Gc)
+ {
+ m_HashesToKeep.insert(m_HashesToKeep.begin(), HashesToKeep.begin(), HashesToKeep.end());
+ m_Gc.AddGcReferencer(*this);
+ }
+ ~GcRef() { m_Gc.RemoveGcReferencer(*this); }
+ std::string GetGcName(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return "test";
+ }
+ GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override
+ {
+ ZEN_UNUSED(Ctx, Stats);
+ return nullptr;
+ }
+ std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return {new GcRefChecker(std::move(m_HashesToKeep))};
+ }
+ std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return {};
+ }
+
+ private:
+ GcManager& m_Gc;
+ std::vector<IoHash> m_HashesToKeep;
+ };
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ GenerateChunks(Cas, kChunkCount, kChunkSize / 5, Hashes);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ ValidateChunks(Cas, Hashes, true);
+ GenerateChunks(Cas, kChunkCount, kChunkSize / 2, Hashes);
+ ValidateChunks(Cas, Hashes, true);
+ if (true)
+ {
+ std::vector<IoHash> DropHashes;
+ std::vector<IoHash> KeepHashes;
+ for (size_t Index = 0; Index < Hashes.size(); Index++)
+ {
+ if (Index % 5 == 0)
+ {
+ KeepHashes.push_back(Hashes[Index]);
+ }
+ else
+ {
+ DropHashes.push_back(Hashes[Index]);
+ }
+ }
+ // std::span<const IoHash> KeepHashes(Hashes);
+ // ZEN_ASSERT(ExpectedGcCount < Hashes.size());
+ // KeepHashes = KeepHashes.subspan(ExpectedGcCount);
+ GcRef Ref(Gc, KeepHashes);
+ Gc.CollectGarbage(GcSettings{.CollectSmallObjects = true, .IsDeleteMode = true});
+ ValidateChunks(Cas, KeepHashes, true);
+ ValidateChunks(Cas, DropHashes, false);
+ Hashes = KeepHashes;
+ }
+ GenerateChunks(Cas, kChunkCount, kChunkSize / 3, Hashes);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ ValidateChunks(Cas, Hashes, true);
+ Cas.Flush();
+ ValidateChunks(Cas, Hashes, true);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ ValidateChunks(Cas, Hashes, true);
+ }
+}
+
+TEST_CASE("compactcas.iteratechunks")
+{
+ std::atomic<size_t> WorkCompleted = 0;
+ WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put");
+
+ const uint64_t kChunkSize = 1048 + 395;
+ const size_t kChunkCount = 63840;
+
+ for (uint32_t N = 0; N < 4; N++)
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ ScopedTemporaryDirectory TempDir;
+ Cas.Initialize(TempDir.Path(), "test", 65536 * 128, 8, true);
+
+ CHECK(Cas.IterateChunks(
+ {},
+ [](size_t Index, const IoBuffer& Payload) {
+ ZEN_UNUSED(Index, Payload);
+ return true;
+ },
+ &ThreadPool,
+ 2048u));
+
+ uint64_t ExpectedSize = 0;
+
+ std::vector<IoHash> Hashes;
+ Hashes.reserve(kChunkCount);
+
+ {
+ Latch WorkLatch(1);
+ tsl::robin_set<IoHash, IoHash::Hasher> ChunkHashesLookup;
+ ChunkHashesLookup.reserve(kChunkCount);
+ RwLock InsertLock;
+ for (size_t Offset = 0; Offset < kChunkCount;)
+ {
+ size_t BatchCount = Min<size_t>(kChunkCount - Offset, 512u);
+ WorkLatch.AddCount(1);
+ ThreadPool.ScheduleWork(
+ [N, &WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+
+ std::vector<IoBuffer> BatchBlobs;
+ std::vector<IoHash> BatchHashes;
+ BatchBlobs.reserve(BatchCount);
+ BatchHashes.reserve(BatchCount);
+
+ while (BatchBlobs.size() < BatchCount)
+ {
+ IoBuffer Chunk = CreateRandomBlob(
+ N + kChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377));
+ IoHash Hash = IoHash::HashBuffer(Chunk);
+ {
+ RwLock::ExclusiveLockScope __(InsertLock);
+ if (ChunkHashesLookup.contains(Hash))
+ {
+ continue;
+ }
+ ChunkHashesLookup.insert(Hash);
+ ExpectedSize += Chunk.Size();
+ }
+
+ BatchBlobs.emplace_back(std::move(Chunk));
+ BatchHashes.push_back(Hash);
+ }
+
+ Cas.InsertChunks(BatchBlobs, BatchHashes);
+ {
+ RwLock::ExclusiveLockScope __(InsertLock);
+ Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end());
+ }
+ });
+ Offset += BatchCount;
+ }
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+ }
+
+ WorkerThreadPool BatchWorkerPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "fetch");
+ {
+ std::vector<std::atomic<bool>> FetchedFlags(Hashes.size());
+ std::atomic<uint64_t> FetchedSize = 0;
+ CHECK(Cas.IterateChunks(
+ Hashes,
+ [&Hashes, &FetchedFlags, &FetchedSize](size_t Index, const IoBuffer& Payload) {
+ CHECK(FetchedFlags[Index].load() == false);
+ FetchedFlags[Index].store(true);
+ const IoHash& Hash = Hashes[Index];
+ CHECK(Hash == IoHash::HashBuffer(Payload));
+ FetchedSize += Payload.GetSize();
+ return true;
+ },
+ &BatchWorkerPool,
+ 2048u));
+ for (const auto& Flag : FetchedFlags)
+ {
+ CHECK(Flag.load());
+ }
+ CHECK(FetchedSize == ExpectedSize);
+ }
+
+ Latch WorkLatch(1);
+ for (size_t I = 0; I < 2; I++)
+ {
+ WorkLatch.AddCount(1);
+ ThreadPool.ScheduleWork([&Cas, &Hashes, &BatchWorkerPool, &WorkLatch, I]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ std::vector<IoHash> PartialHashes;
+ PartialHashes.reserve(Hashes.size() / 4);
+ for (size_t Index = 0; Index < Hashes.size(); Index++)
+ {
+ size_t TestIndex = Index + I;
+ if ((TestIndex % 7 == 1) || (TestIndex % 13 == 1) || (TestIndex % 17 == 1))
+ {
+ PartialHashes.push_back(Hashes[Index]);
+ }
+ }
+ std::reverse(PartialHashes.begin(), PartialHashes.end());
+
+ std::vector<IoHash> NoFoundHashes;
+ std::vector<size_t> NoFindIndexes;
+
+ NoFoundHashes.reserve(9);
+ for (size_t J = 0; J < 9; J++)
+ {
+ std::string Data = fmt::format("oh no, we don't exist {}", J + 1);
+ NoFoundHashes.push_back(IoHash::HashBuffer(Data.data(), Data.length()));
+ }
+
+ NoFindIndexes.reserve(9);
+
+ // Sprinkle in chunks that are not found!
+ auto It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0, NoFoundHashes[0]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0 + 1, NoFoundHashes[1]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1, NoFoundHashes[2]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1 + 1, NoFoundHashes[3]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 2, NoFoundHashes[4]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3, NoFoundHashes[5]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3 + 1, NoFoundHashes[6]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 4, NoFoundHashes[7]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.end(), NoFoundHashes[8]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+
+ std::vector<std::atomic<bool>> FoundFlags(PartialHashes.size() + NoFoundHashes.size());
+ std::vector<std::atomic<uint32_t>> FetchedCounts(PartialHashes.size() + NoFoundHashes.size());
+
+ CHECK(Cas.IterateChunks(
+ PartialHashes,
+ [&PartialHashes, &FoundFlags, &FetchedCounts, &NoFindIndexes](size_t Index, const IoBuffer& Payload) {
+ CHECK_EQ(NoFindIndexes.end(), std::find(NoFindIndexes.begin(), NoFindIndexes.end(), Index));
+ uint32_t PreviousCount = FetchedCounts[Index].fetch_add(1);
+ CHECK(PreviousCount == 0);
+ FoundFlags[Index] = !!Payload;
+ const IoHash& Hash = PartialHashes[Index];
+ CHECK(Hash == IoHash::HashBuffer(Payload));
+ return true;
+ },
+ &BatchWorkerPool,
+ 2048u));
+
+ for (size_t FoundIndex = 0; FoundIndex < PartialHashes.size(); FoundIndex++)
+ {
+ CHECK(FetchedCounts[FoundIndex].load() <= 1);
+ if (std::find(NoFindIndexes.begin(), NoFindIndexes.end(), FoundIndex) == NoFindIndexes.end())
+ {
+ CHECK(FoundFlags[FoundIndex]);
+ }
+ else
+ {
+ CHECK(!FoundFlags[FoundIndex]);
+ }
+ }
+ });
+ }
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+ }
+}
+
#endif
void