diff options
| author | Stefan Boberg <[email protected]> | 2021-09-19 23:19:06 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-09-19 23:19:06 +0200 |
| commit | 3cf9dedfd08fe4d7a049e51b14a937f7a34afce3 (patch) | |
| tree | d68b5203818665f6f54aceea586f550dc6aadca4 | |
| parent | Added BasicFile::StreamFile helper function to support reading large files in... (diff) | |
| download | zen-3cf9dedfd08fe4d7a049e51b14a937f7a34afce3.tar.xz zen-3cf9dedfd08fe4d7a049e51b14a937f7a34afce3.zip | |
Implemended basic scrubbing / detection of disk corruption. Still needs more code to propagate errors and make adjustments to account for them in higher level data structures
| -rw-r--r-- | zenstore/CAS.cpp | 24 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 93 | ||||
| -rw-r--r-- | zenstore/compactcas.h | 7 | ||||
| -rw-r--r-- | zenstore/filecas.cpp | 95 | ||||
| -rw-r--r-- | zenstore/filecas.h | 15 | ||||
| -rw-r--r-- | zenstore/include/zenstore/CAS.h | 21 |
6 files changed, 207 insertions, 48 deletions
diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp index af0fcc609..b3ef7a320 100644 --- a/zenstore/CAS.cpp +++ b/zenstore/CAS.cpp @@ -25,6 +25,11 @@ namespace zen { +void +ScrubContext::ReportBadChunks(std::span<IoHash> BadChunks) +{ +} + /** * CAS store implementation * @@ -44,7 +49,7 @@ public: virtual IoBuffer FindChunk(const IoHash& ChunkHash) override; virtual void FilterChunks(CasChunkSet& InOutChunks) override; virtual void Flush() override; - virtual void Scrub() override; + virtual void Scrub(ScrubContext& Ctx) override; private: CasContainerStrategy m_TinyStrategy; @@ -52,7 +57,7 @@ private: FileCasStrategy m_LargeStrategy; }; -CasImpl::CasImpl() : m_TinyStrategy(m_Config, m_Stats), m_SmallStrategy(m_Config, m_Stats), m_LargeStrategy(m_Config, m_Stats) +CasImpl::CasImpl() : m_TinyStrategy(m_Config), m_SmallStrategy(m_Config), m_LargeStrategy(m_Config) { } @@ -105,6 +110,9 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig) m_TinyStrategy.Initialize("tobs", 16, IsNewStore); m_SmallStrategy.Initialize("sobs", 4096, IsNewStore); + + ScrubContext Ctx; + Scrub(Ctx); } CasStore::InsertResult @@ -165,11 +173,11 @@ CasImpl::Flush() } void -CasImpl::Scrub() +CasImpl::Scrub(ScrubContext& Ctx) { - m_SmallStrategy.Scrub(); - m_TinyStrategy.Scrub(); - m_LargeStrategy.Scrub(); + m_SmallStrategy.Scrub(Ctx); + m_TinyStrategy.Scrub(Ctx); + m_LargeStrategy.Scrub(Ctx); } ////////////////////////////////////////////////////////////////////////// @@ -194,7 +202,9 @@ TEST_CASE("CasStore") std::unique_ptr<zen::CasStore> Store{CreateCasStore()}; Store->Initialize(config); - Store->Scrub(); + + ScrubContext Ctx; + Store->Scrub(Ctx); IoBuffer Value1{16}; memcpy(Value1.MutableData(), "1234567890123456", 16); diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index fd223e284..0f9349ab0 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -18,6 +18,14 @@ namespace zen { +CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config) : m_Config(Config) +{ +} + +CasContainerStrategy::~CasContainerStrategy() +{ +} + void CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore) { @@ -101,9 +109,8 @@ IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); - auto KeyIt = m_LocationMap.find(ChunkHash); - if (KeyIt != m_LocationMap.end()) + if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { const CasDiskLocation& Location = KeyIt->second; return zen::IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.Offset, Location.Size); @@ -118,9 +125,8 @@ bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); - auto KeyIt = m_LocationMap.find(ChunkHash); - if (KeyIt != m_LocationMap.end()) + if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { return true; } @@ -163,9 +169,84 @@ CasContainerStrategy::Flush() } void -CasContainerStrategy::Scrub() +CasContainerStrategy::Scrub(ScrubContext& Ctx) { - RwLock::SharedLockScope _(m_LocationMapLock); + const uint64_t WindowSize = 4 * 1024 * 1024; + uint64_t WindowStart = 0; + uint64_t WindowEnd = WindowSize; + const uint64_t FileSize = m_SmallObjectFile.FileSize(); + + std::vector<CasDiskIndexEntry> BigChunks; + std::vector<CasDiskIndexEntry> BadChunks; + + // We do a read sweep through the payloads file and validate + // any entries that are contained within each segment, with + // the assumption that most entries will be checked in this + // pass. An alternative strategy would be to use memory mapping. + + { + IoBuffer ReadBuffer{WindowSize}; + void* BufferBase = ReadBuffer.MutableData(); + + RwLock::SharedLockScope _(m_LocationMapLock); + + do + { + const uint64_t ChunkSize = zen::Min(WindowSize, FileSize - WindowStart); + m_SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart); + + for (auto& Entry : m_LocationMap) + { + const uint64_t EntryOffset = Entry.second.Offset; + + if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) + { + const uint64_t EntryEnd = EntryOffset + Entry.second.Size; + + if (EntryEnd >= WindowEnd) + { + BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); + + continue; + } + + const IoHash ComputedHash = IoHash::HashBuffer(BufferBase, Entry.second.Size); + + if (Entry.first != ComputedHash) + { + // Hash mismatch + + BadChunks.push_back({.Key = Entry.first, .Location = Entry.second}); + } + } + } + + WindowStart += WindowSize; + WindowEnd += WindowSize; + } while (WindowStart < FileSize); + } + + // Deal with large chunks + + for (const CasDiskIndexEntry& Entry : BigChunks) + { + } + + // Deal with bad chunks by removing them from our lookup map + + std::vector<IoHash> BadChunkHashes; + + for (const CasDiskIndexEntry& Entry : BadChunks) + { + BadChunkHashes.push_back(Entry.Key); + m_LocationMap.erase(Entry.Key); + } + + // Let whomever it concerns know about the bad chunks. This could + // be used to invalidate higher level data structures more efficiently + // than a full validation pass might be able to do + + Ctx.ReportBadChunks(BadChunkHashes); } void diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h index 9921c9e6c..101e6b1b7 100644 --- a/zenstore/compactcas.h +++ b/zenstore/compactcas.h @@ -50,7 +50,9 @@ static_assert(sizeof(CasDiskIndexEntry) == 32); struct CasContainerStrategy { - CasContainerStrategy(const CasStoreConfiguration& Config, CasStore::Stats& Stats) : m_Config(Config), m_Stats(Stats) {} + CasContainerStrategy(const CasStoreConfiguration& Config); + ~CasContainerStrategy(); + CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& chunkHash); IoBuffer FindChunk(const IoHash& ChunkHash); @@ -58,11 +60,10 @@ struct CasContainerStrategy void FilterChunks(CasChunkSet& InOutChunks); void Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore); void Flush(); - void Scrub(); + void Scrub(ScrubContext& Ctx); private: const CasStoreConfiguration& m_Config; - CasStore::Stats& m_Stats; uint64_t m_PayloadAlignment = 1 << 4; bool m_IsInitialized = false; BasicFile m_SmallObjectFile; diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index 5fdf505d4..3314beb7e 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -10,6 +10,7 @@ #include <zencore/string.h> #include <zencore/thread.h> #include <zencore/uid.h> +#include <zenstore/basicfile.h> #include <gsl/gsl-lite.hpp> @@ -17,6 +18,7 @@ #include <functional> #include <unordered_map> +// clang-format off #include <zencore/prewindows.h> struct IUnknown; // Workaround for "combaseapi.h(229): error C2187: syntax error: 'identifier' was unexpected here" when using /permissive- @@ -24,13 +26,19 @@ struct IUnknown; // Workaround for "combaseapi.h(229): error C2187: syntax erro #include <zencore/postwindows.h> // clang-format on -// -////////////////////////////////////////////////////////////////////////// namespace zen { using namespace fmt::literals; +FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config) : m_Config(Config) +{ +} + +FileCasStrategy::~FileCasStrategy() +{ +} + WideStringBuilderBase& FileCasStrategy::MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHash& ChunkHash, size_t& OutShard2len) { @@ -56,7 +64,7 @@ FileCasStrategy::MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHas OutShard2len = ShardedPath.Size(); ShardedPath.Append('\\'); - ShardedPath.AppendAsciiRange(str + 6, str + 64); + ShardedPath.AppendAsciiRange(str + 5, str + 64); return ShardedPath; } @@ -259,12 +267,9 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize } // We cannot rely on RAII to close the file handle since it would be closed - // *after* the lock is released due to the initialization order. + // *after* the lock is released due to the initialization order PayloadFile.Close(); - AtomicIncrement(m_Stats.PutCount); - AtomicAdd(m_Stats.PutBytes, ChunkSize); - return {.New = true}; } @@ -279,15 +284,7 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash) RwLock::SharedLockScope _(LockForHash(ChunkHash)); - auto Chunk = IoBufferBuilder::MakeFromFile(ShardedPath.c_str()); - - if (Chunk) - { - AtomicIncrement(m_Stats.GetCount); - AtomicAdd(m_Stats.GetBytes, Chunk.Size()); - } - - return Chunk; + return IoBufferBuilder::MakeFromFile(ShardedPath.c_str()); } bool @@ -338,6 +335,56 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) } void +FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback) +{ + struct Visitor : public FileSystemTraversal::TreeVisitor + { + Visitor(const std::filesystem::path& RootDir) : RootDirectory(RootDir) {} + virtual void VisitFile(const std::filesystem::path& Parent, const std::wstring_view& File, uint64_t FileSize) override + { + std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory); + + std::wstring PathString = RelPath.native(); + + if ((PathString.size() == (3 + 2 + 1)) && (File.size() == (40 - 3 - 2))) + { + if (PathString.at(3) == std::filesystem::path::preferred_separator) + { + PathString.erase(3, 1); + } + PathString.append(File); + + StringBuilder<64> Utf8; + WideToUtf8(PathString, Utf8); + + // TODO: should validate that we're actually dealing with a valid hex string here + + IoHash NameHash = IoHash::FromHexString({Utf8.Data(), Utf8.Size()}); + + BasicFile PayloadFile; + std::error_code Ec; + PayloadFile.Open(Parent / File, false, Ec); + + if (!Ec) + { + Callback(NameHash, PayloadFile); + } + } + } + + virtual bool VisitDirectory(const std::filesystem::path& Parent, const std::wstring_view& DirectoryName) { return true; } + + const std::filesystem::path& RootDirectory; + std::function<void(const IoHash& Hash, BasicFile& PayloadFile)> Callback; + } CasVisitor{m_Config.RootDirectory}; + + CasVisitor.Callback = std::move(Callback); + + FileSystemTraversal Traversal; + Traversal.TraverseFileSystem(m_Config.RootDirectory, CasVisitor); +} + +void FileCasStrategy::Flush() { // Since we don't keep files open after writing there's nothing specific @@ -353,8 +400,22 @@ FileCasStrategy::Flush() } void -FileCasStrategy::Scrub() +FileCasStrategy::Scrub(ScrubContext& Ctx) { + std::vector<IoHash> BadHashes; + + IterateChunks([&](const IoHash& Hash, BasicFile& Payload) { + IoHashStream Hasher; + Payload.StreamFile([&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); + IoHash ComputedHash = Hasher.GetHash(); + + if (ComputedHash != Hash) + { + BadHashes.push_back(Hash); + } + }); + + Ctx.ReportBadChunks(BadHashes); } void diff --git a/zenstore/filecas.h b/zenstore/filecas.h index c7cd9d7ca..885973810 100644 --- a/zenstore/filecas.h +++ b/zenstore/filecas.h @@ -10,11 +10,20 @@ #include <zencore/thread.h> #include <zenstore/cas.h> +#include <functional> + namespace zen { +class BasicFile; + +/** CAS storage strategy using a file-per-chunk storage strategy +*/ + struct FileCasStrategy { - FileCasStrategy(const CasStoreConfiguration& Config, CasStore::Stats& Stats) : m_Config(Config), m_Stats(Stats) {} + FileCasStrategy(const CasStoreConfiguration& Config); + ~FileCasStrategy(); + CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); IoBuffer FindChunk(const IoHash& ChunkHash); @@ -22,16 +31,16 @@ struct FileCasStrategy void FilterChunks(CasChunkSet& InOutChunks); void Flush(); void GarbageCollect(GcContext& GcCtx); - void Scrub(); + void Scrub(ScrubContext& Ctx); private: const CasStoreConfiguration& m_Config; - CasStore::Stats& m_Stats; RwLock m_Lock; RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; } static WideStringBuilderBase& MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHash& ChunkHash, size_t& OutShard2len); + void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback); }; } // namespace zen diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h index c6c919593..bb310b179 100644 --- a/zenstore/include/zenstore/CAS.h +++ b/zenstore/include/zenstore/CAS.h @@ -37,6 +37,14 @@ public: private: }; +class ScrubContext +{ +public: + virtual void ReportBadChunks(std::span<IoHash> BadChunks); + +private: +}; + class CasChunkSet { public: @@ -54,17 +62,7 @@ class CasStore public: virtual ~CasStore() = default; - struct Stats - { - uint64_t PutBytes = 0; - uint64_t PutCount = 0; - - uint64_t GetBytes = 0; - uint64_t GetCount = 0; - }; - const CasStoreConfiguration& Config() { return m_Config; } - const Stats& GetStats() const { return m_Stats; } struct InsertResult { @@ -76,11 +74,10 @@ public: virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0; virtual void FilterChunks(CasChunkSet& InOutChunks) = 0; virtual void Flush() = 0; - virtual void Scrub() = 0; + virtual void Scrub(ScrubContext& Ctx) = 0; protected: CasStoreConfiguration m_Config; - Stats m_Stats; }; ZENCORE_API CasStore* CreateCasStore(); |