diff options
| author | Dan Engelbrecht <[email protected]> | 2022-06-17 07:06:21 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-06-17 07:06:21 -0700 |
| commit | c7e22a4ef1cce7103b9afbeec487461cb32f8dbe (patch) | |
| tree | 8b99d51bf496c96f82161c18fbdcfd5c6f8f31fd /zenstore/filecas.cpp | |
| parent | fixed merge mistake which caused a build error (diff) | |
| download | zen-0.1.4-pre6.tar.xz zen-0.1.4-pre6.zip | |
Make cas storage an hidden implementation detail of CidStore (#130)v0.1.4-pre6v0.1.4-pre5
- Bumped ZEN_SCHEMA_VERSION
- CasStore no longer a public API, it is hidden behind CidStore
- Moved cas.h from public header folder
- CidStore no longer maps from Cid -> Cas, we store entries in Cas under RawHash
- CasStore now decompresses data to validate content (matching against RawHash)
- CasChunkSet renames to HashKeySet and put in separate header/cpp file
- Disabled "Chunk" command for now as it relied on CAS being exposed as a service
- Changed CAS http service to Cid http server
- Moved "Run" command completely inside ZEN_WITH_EXEC_SERVICES define
- Removed "cas.basic" test
- Uncommented ".exec.basic" test and added return-skip at start of test
- Moved ScrubContext to separate header file
- Renamed CasGC to GcManager
- Cleaned up configuration passing in cas store classes
- Removed CAS stuff from GcContext and clarified naming in class
- Remove migration code
Diffstat (limited to 'zenstore/filecas.cpp')
| -rw-r--r-- | zenstore/filecas.cpp | 111 |
1 files changed, 63 insertions, 48 deletions
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index d074a906f..23e3f4cd8 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -2,6 +2,7 @@ #include "filecas.h" +#include <zencore/compress.h> #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> @@ -16,6 +17,7 @@ #include <zencore/uid.h> #include <zenstore/basicfile.h> #include <zenstore/gc.h> +#include <zenstore/scrubcontext.h> #if ZEN_WITH_TESTS # include <zencore/compactbinarybuilder.h> @@ -71,10 +73,7 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo ////////////////////////////////////////////////////////////////////////// -FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc) -: GcStorage(Gc) -, m_Config(Config) -, m_Log(logging::Get("filecas")) +FileCasStrategy::FileCasStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("filecas")) { } @@ -83,17 +82,19 @@ FileCasStrategy::~FileCasStrategy() } void -FileCasStrategy::Initialize(bool IsNewStore) +FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore) { m_IsInitialized = true; - CreateDirectories(m_Config.RootDirectory); + m_RootDirectory = RootDirectory; - m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); + CreateDirectories(m_RootDirectory); + + m_CasLog.Open(m_RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); Stopwatch Timer; const auto _ = MakeGuard([&] { - ZEN_INFO("read log {} containing {}", m_Config.RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed))); + ZEN_INFO("read log {} containing {}", m_RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed))); }); std::unordered_set<IoHash> FoundEntries; @@ -127,13 +128,17 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { ZEN_ASSERT(m_IsInitialized); +#if !ZEN_WITH_TESTS + ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); +#endif + // File-based chunks have special case handling whereby we move the file into // place in the file store directory, thus avoiding unnecessary copying IoBufferFileReference FileRef; if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef)) { - ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); RwLock::ExclusiveLockScope _(LockForHash(ChunkHash)); @@ -340,7 +345,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize { ZEN_ASSERT(m_IsInitialized); - ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); // See if file already exists // @@ -485,7 +490,7 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash) { ZEN_ASSERT(m_IsInitialized); - ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); @@ -497,7 +502,7 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash) { ZEN_ASSERT(m_IsInitialized); - ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); @@ -513,7 +518,7 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash) void FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) { - ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec)); if (Ec) @@ -534,7 +539,7 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) } void -FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) +FileCasStrategy::FilterChunks(HashKeySet& InOutChunks) { ZEN_ASSERT(m_IsInitialized); @@ -546,7 +551,7 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) // a caller, this is something which needs to be taken into account by anyone consuming // this functionality in any case - InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); + InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } void @@ -602,12 +607,12 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile& const std::filesystem::path& RootDirectory; std::function<void(const IoHash& Hash, BasicFile& PayloadFile)> Callback; - } CasVisitor{m_Config.RootDirectory}; + } CasVisitor{m_RootDirectory}; CasVisitor.Callback = std::move(Callback); FileSystemTraversal Traversal; - Traversal.TraverseFileSystem(m_Config.RootDirectory, CasVisitor); + Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor); } void @@ -630,21 +635,34 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) { ZEN_ASSERT(m_IsInitialized); - std::vector<IoHash> BadHashes; - std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; + std::vector<IoHash> BadHashes; + uint64_t ChunkCount{0}, ChunkBytes{0}; IterateChunks([&](const IoHash& Hash, BasicFile& Payload) { + ++ChunkCount; + ChunkBytes += Payload.FileSize(); + + IoBuffer Buffer(IoBuffer::BorrowedFile, Payload.Handle(), 0, Payload.FileSize()); + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); Compressed) + { + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash) + { + // Hash mismatch + BadHashes.push_back(Hash); + return; + } + return; + } +#if ZEN_WITH_TESTS IoHashStream Hasher; - Payload.StreamFile([&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); + Payload.StreamByteRange(0, Payload.FileSize(), [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); IoHash ComputedHash = Hasher.GetHash(); - - if (ComputedHash != Hash) + if (ComputedHash == Hash) { - BadHashes.push_back(Hash); + return; } - - ++ChunkCount; - ChunkBytes.fetch_add(Payload.FileSize()); +#endif + BadHashes.push_back(Hash); }); Ctx.ReportScrubbed(ChunkCount, ChunkBytes); @@ -670,9 +688,12 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) } } - Ctx.ReportBadCasChunks(BadHashes); + // Let whomever it concerns know about the bad chunks. This could + // be used to invalidate higher level data structures more efficiently + // than a full validation pass might be able to do + Ctx.ReportBadCidChunks(BadHashes); - ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes)); + ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); } void @@ -680,7 +701,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) { ZEN_ASSERT(m_IsInitialized); - ZEN_INFO("collecting garbage from {}", m_Config.RootDirectory); + ZEN_INFO("collecting garbage from {}", m_RootDirectory); std::vector<IoHash> ChunksToDelete; std::atomic<uint64_t> ChunksToDeleteBytes{0}; @@ -694,7 +715,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) Stopwatch TotalTimer; const auto _ = MakeGuard([&] { ZEN_INFO("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}", - m_Config.RootDirectory, + m_RootDirectory, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), DeletedCount, ChunkCount, @@ -706,7 +727,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) bool KeepThis = false; CandidateCas.clear(); CandidateCas.push_back(Hash); - GcCtx.FilterCas(CandidateCas, [&](const IoHash& Hash) { + GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) { ZEN_UNUSED(Hash); KeepThis = true; }); @@ -725,12 +746,12 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) if (ChunksToDelete.empty()) { - ZEN_INFO("gc for '{}' SKIPPED, nothing to delete", m_Config.RootDirectory); + ZEN_INFO("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory); return; } ZEN_INFO("deleting file CAS garbage for '{}': {} out of {} chunks ({})", - m_Config.RootDirectory, + m_RootDirectory, ChunksToDelete.size(), ChunkCount.load(), NiceBytes(ChunksToDeleteBytes)); @@ -751,13 +772,13 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) if (Ec) { - ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_Config.RootDirectory, Hash, Ec.message()); + ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_RootDirectory, Hash, Ec.message()); continue; } DeletedCount++; } - GcCtx.DeletedCas(ChunksToDelete); + GcCtx.AddDeletedCids(ChunksToDelete); } ////////////////////////////////////////////////////////////////////////// @@ -769,13 +790,10 @@ TEST_CASE("cas.file.move") // specifying an absolute path here can be helpful when using procmon to dig into things ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; - CasGc Gc; + GcManager Gc; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path() / "cas"; - - FileCasStrategy FileCas(CasConfig, Gc); - FileCas.Initialize(/* IsNewStore */ true); + FileCasStrategy FileCas(Gc); + FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); { std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; @@ -850,12 +868,9 @@ TEST_CASE("cas.file.gc") // specifying an absolute path here can be helpful when using procmon to dig into things ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path() / "cas"; - - CasGc Gc; - FileCasStrategy FileCas(CasConfig, Gc); - FileCas.Initialize(/* IsNewStore */ true); + GcManager Gc; + FileCasStrategy FileCas(Gc); + FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); const int kIterationCount = 1000; std::vector<IoHash> Keys{kIterationCount}; @@ -903,7 +918,7 @@ TEST_CASE("cas.file.gc") { if (Key.Hash[0] & 1) { - Ctx.ContributeCas(std::vector<IoHash>{Key}); + Ctx.AddRetainedCids(std::vector<IoHash>{Key}); } } |