aboutsummaryrefslogtreecommitdiff
path: root/zenstore/filecas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-06-17 07:06:21 -0700
committerGitHub <[email protected]>2022-06-17 07:06:21 -0700
commitc7e22a4ef1cce7103b9afbeec487461cb32f8dbe (patch)
tree8b99d51bf496c96f82161c18fbdcfd5c6f8f31fd /zenstore/filecas.cpp
parentfixed merge mistake which caused a build error (diff)
downloadzen-0.1.4-pre6.tar.xz
zen-0.1.4-pre6.zip
Make cas storage an hidden implementation detail of CidStore (#130)v0.1.4-pre6v0.1.4-pre5
- Bumped ZEN_SCHEMA_VERSION - CasStore no longer a public API, it is hidden behind CidStore - Moved cas.h from public header folder - CidStore no longer maps from Cid -> Cas, we store entries in Cas under RawHash - CasStore now decompresses data to validate content (matching against RawHash) - CasChunkSet renames to HashKeySet and put in separate header/cpp file - Disabled "Chunk" command for now as it relied on CAS being exposed as a service - Changed CAS http service to Cid http server - Moved "Run" command completely inside ZEN_WITH_EXEC_SERVICES define - Removed "cas.basic" test - Uncommented ".exec.basic" test and added return-skip at start of test - Moved ScrubContext to separate header file - Renamed CasGC to GcManager - Cleaned up configuration passing in cas store classes - Removed CAS stuff from GcContext and clarified naming in class - Remove migration code
Diffstat (limited to 'zenstore/filecas.cpp')
-rw-r--r--zenstore/filecas.cpp111
1 files changed, 63 insertions, 48 deletions
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index d074a906f..23e3f4cd8 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -2,6 +2,7 @@
#include "filecas.h"
+#include <zencore/compress.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
@@ -16,6 +17,7 @@
#include <zencore/uid.h>
#include <zenstore/basicfile.h>
#include <zenstore/gc.h>
+#include <zenstore/scrubcontext.h>
#if ZEN_WITH_TESTS
# include <zencore/compactbinarybuilder.h>
@@ -71,10 +73,7 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo
//////////////////////////////////////////////////////////////////////////
-FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc)
-: GcStorage(Gc)
-, m_Config(Config)
-, m_Log(logging::Get("filecas"))
+FileCasStrategy::FileCasStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("filecas"))
{
}
@@ -83,17 +82,19 @@ FileCasStrategy::~FileCasStrategy()
}
void
-FileCasStrategy::Initialize(bool IsNewStore)
+FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore)
{
m_IsInitialized = true;
- CreateDirectories(m_Config.RootDirectory);
+ m_RootDirectory = RootDirectory;
- m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
+ CreateDirectories(m_RootDirectory);
+
+ m_CasLog.Open(m_RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
- ZEN_INFO("read log {} containing {}", m_Config.RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed)));
+ ZEN_INFO("read log {} containing {}", m_RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed)));
});
std::unordered_set<IoHash> FoundEntries;
@@ -127,13 +128,17 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
{
ZEN_ASSERT(m_IsInitialized);
+#if !ZEN_WITH_TESTS
+ ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
+#endif
+
// File-based chunks have special case handling whereby we move the file into
// place in the file store directory, thus avoiding unnecessary copying
IoBufferFileReference FileRef;
if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef))
{
- ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
RwLock::ExclusiveLockScope _(LockForHash(ChunkHash));
@@ -340,7 +345,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
{
ZEN_ASSERT(m_IsInitialized);
- ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
// See if file already exists
//
@@ -485,7 +490,7 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash)
{
ZEN_ASSERT(m_IsInitialized);
- ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
@@ -497,7 +502,7 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
{
ZEN_ASSERT(m_IsInitialized);
- ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
@@ -513,7 +518,7 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
void
FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
{
- ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec));
if (Ec)
@@ -534,7 +539,7 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
}
void
-FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks)
+FileCasStrategy::FilterChunks(HashKeySet& InOutChunks)
{
ZEN_ASSERT(m_IsInitialized);
@@ -546,7 +551,7 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks)
// a caller, this is something which needs to be taken into account by anyone consuming
// this functionality in any case
- InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
+ InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
}
void
@@ -602,12 +607,12 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile&
const std::filesystem::path& RootDirectory;
std::function<void(const IoHash& Hash, BasicFile& PayloadFile)> Callback;
- } CasVisitor{m_Config.RootDirectory};
+ } CasVisitor{m_RootDirectory};
CasVisitor.Callback = std::move(Callback);
FileSystemTraversal Traversal;
- Traversal.TraverseFileSystem(m_Config.RootDirectory, CasVisitor);
+ Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor);
}
void
@@ -630,21 +635,34 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
{
ZEN_ASSERT(m_IsInitialized);
- std::vector<IoHash> BadHashes;
- std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};
+ std::vector<IoHash> BadHashes;
+ uint64_t ChunkCount{0}, ChunkBytes{0};
IterateChunks([&](const IoHash& Hash, BasicFile& Payload) {
+ ++ChunkCount;
+ ChunkBytes += Payload.FileSize();
+
+ IoBuffer Buffer(IoBuffer::BorrowedFile, Payload.Handle(), 0, Payload.FileSize());
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); Compressed)
+ {
+ if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash)
+ {
+ // Hash mismatch
+ BadHashes.push_back(Hash);
+ return;
+ }
+ return;
+ }
+#if ZEN_WITH_TESTS
IoHashStream Hasher;
- Payload.StreamFile([&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
+ Payload.StreamByteRange(0, Payload.FileSize(), [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
IoHash ComputedHash = Hasher.GetHash();
-
- if (ComputedHash != Hash)
+ if (ComputedHash == Hash)
{
- BadHashes.push_back(Hash);
+ return;
}
-
- ++ChunkCount;
- ChunkBytes.fetch_add(Payload.FileSize());
+#endif
+ BadHashes.push_back(Hash);
});
Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
@@ -670,9 +688,12 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
}
}
- Ctx.ReportBadCasChunks(BadHashes);
+ // Let whomever it concerns know about the bad chunks. This could
+ // be used to invalidate higher level data structures more efficiently
+ // than a full validation pass might be able to do
+ Ctx.ReportBadCidChunks(BadHashes);
- ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes));
+ ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
}
void
@@ -680,7 +701,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
{
ZEN_ASSERT(m_IsInitialized);
- ZEN_INFO("collecting garbage from {}", m_Config.RootDirectory);
+ ZEN_INFO("collecting garbage from {}", m_RootDirectory);
std::vector<IoHash> ChunksToDelete;
std::atomic<uint64_t> ChunksToDeleteBytes{0};
@@ -694,7 +715,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
Stopwatch TotalTimer;
const auto _ = MakeGuard([&] {
ZEN_INFO("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}",
- m_Config.RootDirectory,
+ m_RootDirectory,
NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
DeletedCount,
ChunkCount,
@@ -706,7 +727,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
bool KeepThis = false;
CandidateCas.clear();
CandidateCas.push_back(Hash);
- GcCtx.FilterCas(CandidateCas, [&](const IoHash& Hash) {
+ GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) {
ZEN_UNUSED(Hash);
KeepThis = true;
});
@@ -725,12 +746,12 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
if (ChunksToDelete.empty())
{
- ZEN_INFO("gc for '{}' SKIPPED, nothing to delete", m_Config.RootDirectory);
+ ZEN_INFO("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory);
return;
}
ZEN_INFO("deleting file CAS garbage for '{}': {} out of {} chunks ({})",
- m_Config.RootDirectory,
+ m_RootDirectory,
ChunksToDelete.size(),
ChunkCount.load(),
NiceBytes(ChunksToDeleteBytes));
@@ -751,13 +772,13 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
if (Ec)
{
- ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_Config.RootDirectory, Hash, Ec.message());
+ ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_RootDirectory, Hash, Ec.message());
continue;
}
DeletedCount++;
}
- GcCtx.DeletedCas(ChunksToDelete);
+ GcCtx.AddDeletedCids(ChunksToDelete);
}
//////////////////////////////////////////////////////////////////////////
@@ -769,13 +790,10 @@ TEST_CASE("cas.file.move")
// specifying an absolute path here can be helpful when using procmon to dig into things
ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
- CasGc Gc;
+ GcManager Gc;
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path() / "cas";
-
- FileCasStrategy FileCas(CasConfig, Gc);
- FileCas.Initialize(/* IsNewStore */ true);
+ FileCasStrategy FileCas(Gc);
+ FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);
{
std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"};
@@ -850,12 +868,9 @@ TEST_CASE("cas.file.gc")
// specifying an absolute path here can be helpful when using procmon to dig into things
ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path() / "cas";
-
- CasGc Gc;
- FileCasStrategy FileCas(CasConfig, Gc);
- FileCas.Initialize(/* IsNewStore */ true);
+ GcManager Gc;
+ FileCasStrategy FileCas(Gc);
+ FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);
const int kIterationCount = 1000;
std::vector<IoHash> Keys{kIterationCount};
@@ -903,7 +918,7 @@ TEST_CASE("cas.file.gc")
{
if (Key.Hash[0] & 1)
{
- Ctx.ContributeCas(std::vector<IoHash>{Key});
+ Ctx.AddRetainedCids(std::vector<IoHash>{Key});
}
}