diff options
| author | Stefan Boberg <[email protected]> | 2021-09-21 14:17:23 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-09-21 14:17:23 +0200 |
| commit | c35c36bf81cae52dacf8e3f8dc858bb376ca424b (patch) | |
| tree | 891475f41d4c8be86cbb3f2bd6c269f596ff668d | |
| parent | Removed scrubbing from CasImpl::Initialize since this is triggered by higher ... (diff) | |
| download | zen-c35c36bf81cae52dacf8e3f8dc858bb376ca424b.tar.xz zen-c35c36bf81cae52dacf8e3f8dc858bb376ca424b.zip | |
Wired up scrubbing to more higher level services
Also moved sharding logic for filecas into a function to redduce cut/pasta
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 11 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 1 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 7 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 1 | ||||
| -rw-r--r-- | zenstore/cidstore.cpp | 51 | ||||
| -rw-r--r-- | zenstore/filecas.cpp | 83 | ||||
| -rw-r--r-- | zenstore/filecas.h | 15 | ||||
| -rw-r--r-- | zenstore/include/zenstore/CAS.h | 14 | ||||
| -rw-r--r-- | zenstore/include/zenstore/cidstore.h | 4 |
9 files changed, 142 insertions, 45 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 7f1fe7b44..533fea498 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -176,7 +176,16 @@ HttpStructuredCacheService::Flush() void HttpStructuredCacheService::Scrub(ScrubContext& Ctx) { - ZEN_UNUSED(Ctx); + if (m_LastScrubTime == Ctx.ScrubTimestamp()) + { + return; + } + + m_LastScrubTime = Ctx.ScrubTimestamp(); + + m_CasStore.Scrub(Ctx); + m_CidStore.Scrub(Ctx); + m_CacheStore.Scrub(Ctx); } void diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index bd163dd1d..c673ea1f5 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -81,6 +81,7 @@ private: zen::CasStore& m_CasStore; zen::CidStore& m_CidStore; std::unique_ptr<UpstreamCache> m_UpstreamCache; + uint64_t m_LastScrubTime = 0; }; } // namespace zen diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 502ca6605..3d80bb14c 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -108,6 +108,13 @@ ZenCacheStore::Flush() void ZenCacheStore::Scrub(ScrubContext& Ctx) { + if (m_LastScrubTime == Ctx.ScrubTimestamp()) + { + return; + } + + m_LastScrubTime = Ctx.ScrubTimestamp(); + m_DiskLayer.Scrub(Ctx); m_MemLayer.Scrub(Ctx); } diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index fdf4a8cfe..2cc3abb53 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -112,6 +112,7 @@ private: ZenCacheMemoryLayer m_MemLayer; ZenCacheDiskLayer m_DiskLayer; uint64_t m_DiskLayerSizeThreshold = 4 * 1024; + uint64_t m_LastScrubTime = 0; }; /** Tracks cache entry access, stats and orchestrates cleanup activities diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index 5e266f9d3..9c8d4742c 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -12,9 +12,9 @@ namespace zen { -struct CidStore::CidState +struct CidStore::Impl { - CidState(CasStore& InCasStore) : m_CasStore(InCasStore) {} + Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {} struct IndexEntry { @@ -42,18 +42,26 @@ struct CidStore::CidState void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed) { + ZEN_ASSERT(Compressed != IoHash::Zero); + RwLock::ExclusiveLockScope _(m_Lock); m_CidMap.insert_or_assign(DecompressedId, Compressed); // TODO: it's pretty wasteful to log even idempotent updates // however we can't simply use the boolean returned by insert_or_assign // since there's not a 1:1 mapping between compressed and uncompressed // so if we want a last-write-wins policy then we have to log each update + LogMapping(DecompressedId, Compressed); + } + + void LogMapping(const IoHash& DecompressedId, const IoHash& Compressed) + { m_LogFile.Append({.Uncompressed = DecompressedId, .Compressed = Compressed}); } IoBuffer FindChunkByCid(const IoHash& DecompressedId) { IoHash CompressedHash; + { RwLock::SharedLockScope _(m_Lock); if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) @@ -62,12 +70,9 @@ struct CidStore::CidState } } - if (CompressedHash != IoHash::Zero) - { - return m_CasStore.FindChunk(CompressedHash); - } + ZEN_ASSERT(CompressedHash != IoHash::Zero); - return IoBuffer(); + return m_CasStore.FindChunk(CompressedHash); } bool ContainsChunk(const IoHash& DecompressedId) @@ -75,7 +80,17 @@ struct CidStore::CidState RwLock::SharedLockScope _(m_Lock); // Note that we do not check CAS here. This is optimistic but usually // what we want. - return m_CidMap.find(DecompressedId) != m_CidMap.end(); + auto It = m_CidMap.find(DecompressedId); + + if (It == m_CidMap.end()) + { + // Not in map, or tombstone + return false; + } + + ZEN_ASSERT(It->second != IoHash::Zero); + + return true; } void InitializeIndex(const std::filesystem::path& RootDir) @@ -87,6 +102,8 @@ struct CidStore::CidState m_LogFile.Open(SlogPath, IsNew); + uint64_t TombstoneCount = 0; + m_LogFile.Replay([&](const IndexEntry& Ie) { if (Ie.Compressed != IoHash::Zero) { @@ -97,16 +114,24 @@ struct CidStore::CidState { // Tombstone m_CidMap.erase(Ie.Uncompressed); + ++TombstoneCount; } }); - ZEN_DEBUG("CID index initialized: {} entries found", m_CidMap.size()); + ZEN_INFO("CID index initialized: {} entries found ({} tombstones)", m_CidMap.size(), TombstoneCount); } void Flush() { m_LogFile.Flush(); } void Scrub(ScrubContext& Ctx) { + if (Ctx.ScrubTimestamp() == m_LastScrubTime) + { + return; + } + + m_LastScrubTime = Ctx.ScrubTimestamp(); + CasChunkSet ChunkSet; { @@ -126,7 +151,9 @@ struct CidStore::CidState return; } - ZEN_ERROR("Scrubbing found that {} cid mappings mapped to non-existent CAS chunks", ChunkSet.GetChunkSet().size()); + ZEN_ERROR("Scrubbing found that {} cid mappings (out of {}) mapped to non-existent CAS chunks. These mappings will be removed", + ChunkSet.GetChunkSet().size(), + m_CidMap.size()); // Erase all mappings to chunks which are not present in the underlying CAS store // we do this by removing mappings from the in-memory lookup structure and also @@ -163,11 +190,13 @@ struct CidStore::CidState Ctx.ReportBadChunks(BadChunks); } + + uint64_t m_LastScrubTime = 0; }; ////////////////////////////////////////////////////////////////////////// -CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique<CidState>(InCasStore)) +CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique<Impl>(InCasStore)) { m_Impl->InitializeIndex(RootDir); } diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index 968c9f3a0..1fcae6d02 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -31,7 +31,16 @@ namespace zen { using namespace fmt::literals; -FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config) : m_Config(Config) +FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash) +{ + ShardedPath.Append(RootPath.c_str()); + ShardedPath.Append(std::filesystem::path::preferred_separator); + MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len); +} + +////////////////////////////////////////////////////////////////////////// + +FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config) : m_Config(Config), m_Log(logging::Get("filecas")) { } @@ -78,11 +87,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) IoBufferFileReference FileRef; if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef)) { - size_t Shard2len = 0; - ExtendableWideStringBuilder<128> ShardedPath; - ShardedPath.Append(m_Config.RootDirectory.c_str()); - ShardedPath.Append(std::filesystem::path::preferred_separator); - MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); auto DeletePayloadFileOnClose = [&] { // This will cause the file to be deleted when the last handle to it is closed @@ -105,7 +110,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { CAtlFile PayloadFile; - if (HRESULT hRes = PayloadFile.Create(ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes)) + if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes)) { // If we succeeded in opening the target file then we don't need to do anything else because it already exists // and should contain the content we were about to insert @@ -118,7 +123,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) } } - std::filesystem::path FullPath(ShardedPath.c_str()); + std::filesystem::path FullPath(Name.ShardedPath.c_str()); std::filesystem::path FilePath = FullPath.parent_path(); std::wstring FileName = FullPath.native(); @@ -194,11 +199,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) CasStore::InsertResult FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash) { - size_t Shard2len = 0; - ExtendableWideStringBuilder<128> ShardedPath; - ShardedPath.Append(m_Config.RootDirectory.c_str()); - ShardedPath.Append(std::filesystem::path::preferred_separator); - MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); // See if file already exists // @@ -206,7 +207,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize CAtlFile PayloadFile; - HRESULT hRes = PayloadFile.Create(ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); + HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); if (SUCCEEDED(hRes)) { @@ -221,7 +222,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize // For now, use double-checked locking to see if someone else was first - hRes = PayloadFile.Create(ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); + hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); if (SUCCEEDED(hRes)) { @@ -235,7 +236,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize ZEN_WARN("Unexpected error code when opening shard file for read: {:#x}", uint32_t(hRes)); } - auto InternalCreateFile = [&] { return PayloadFile.Create(ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); }; + auto InternalCreateFile = [&] { return PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); }; hRes = InternalCreateFile(); @@ -243,14 +244,14 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize { // Ensure parent directories exist and retry file creation - std::filesystem::create_directories(std::wstring_view(ShardedPath.c_str(), Shard2len)); + std::filesystem::create_directories(std::wstring_view(Name.ShardedPath.c_str(), Name.Shard2len)); hRes = InternalCreateFile(); } if (FAILED(hRes)) { - ThrowSystemException(hRes, "Failed to open shard file '{}'"_format(WideToUtf8(ShardedPath))); + ThrowSystemException(hRes, "Failed to open shard file '{}'"_format(WideToUtf8(Name.ShardedPath))); } size_t ChunkRemain = ChunkSize; @@ -276,36 +277,37 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize IoBuffer FileCasStrategy::FindChunk(const IoHash& ChunkHash) { - size_t Shard2len = 0; - ExtendableWideStringBuilder<128> ShardedPath; - ShardedPath.Append(m_Config.RootDirectory.c_str()); - ShardedPath.Append(std::filesystem::path::preferred_separator); - MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); - return IoBufferBuilder::MakeFromFile(ShardedPath.c_str()); + return IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); } bool FileCasStrategy::HaveChunk(const IoHash& ChunkHash) { - size_t Shard2len = 0; - ExtendableWideStringBuilder<128> ShardedPath; - ShardedPath.Append(m_Config.RootDirectory.c_str()); - ShardedPath.Append(std::filesystem::path::preferred_separator); - MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); std::error_code Ec; - if (std::filesystem::exists(ShardedPath.c_str(), Ec)) + if (std::filesystem::exists(Name.ShardedPath.c_str(), Ec)) { return true; } return false; } +void +FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) +{ + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + + ZEN_DEBUG("deleting CAS payload file '{}'", WideToUtf8(Name.ShardedPath)); + + std::filesystem::remove(Name.ShardedPath.c_str(), Ec); +} void FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) @@ -421,6 +423,27 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) } }); + if (!BadHashes.empty()) + { + ZEN_ERROR("file CAS scrubbing: {} bad chunks found", BadHashes.size()); + + if (Ctx.RunRecovery()) + { + ZEN_WARN("recovery: deleting backing files for {} bad chunks which were found to be bad", BadHashes.size()); + + for (const IoHash& Hash : BadHashes) + { + std::error_code Ec; + DeleteChunk(Hash, Ec); + + if (Ec) + { + ZEN_WARN("failed to delete file for chunk {}", Hash); + } + } + } + } + Ctx.ReportBadChunks(BadHashes); } diff --git a/zenstore/filecas.h b/zenstore/filecas.h index 18102968a..2e09367df 100644 --- a/zenstore/filecas.h +++ b/zenstore/filecas.h @@ -12,6 +12,10 @@ #include <functional> +namespace spdlog { +class logger; +} + namespace zen { class BasicFile; @@ -37,10 +41,21 @@ private: const CasStoreConfiguration& m_Config; RwLock m_Lock; RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines + spdlog::logger& m_Log; + spdlog::logger& Log() { return m_Log; } inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; } static WideStringBuilderBase& MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHash& ChunkHash, size_t& OutShard2len); void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback); + void DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec); + + struct ShardingHelper + { + ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash); + + size_t Shard2len = 0; + ExtendableWideStringBuilder<128> ShardedPath; + }; }; } // namespace zen diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h index bb310b179..ed235bb4b 100644 --- a/zenstore/include/zenstore/CAS.h +++ b/zenstore/include/zenstore/CAS.h @@ -8,6 +8,7 @@ #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <zencore/refcount.h> +#include <zencore/timer.h> #include <atomic> #include <filesystem> #include <memory> @@ -37,12 +38,22 @@ public: private: }; +/** Context object for data scrubbing + * + * Data scrubbing is when we traverse stored data to validate it and + * optionally correct/recover + */ + class ScrubContext { public: - virtual void ReportBadChunks(std::span<IoHash> BadChunks); + virtual void ReportBadChunks(std::span<IoHash> BadChunks); + inline uint64_t ScrubTimestamp() const { return m_ScrubTime; } + inline bool RunRecovery() const { return m_Recover; } private: + uint64_t m_ScrubTime = GetHifreqTimerValue(); + bool m_Recover = true; }; class CasChunkSet @@ -78,6 +89,7 @@ public: protected: CasStoreConfiguration m_Config; + uint64_t m_LastScrubTime = 0; }; ZENCORE_API CasStore* CreateCasStore(); diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h index 49f2bf99a..f4439e083 100644 --- a/zenstore/include/zenstore/cidstore.h +++ b/zenstore/include/zenstore/cidstore.h @@ -50,8 +50,8 @@ public: // TODO: add batch filter support private: - struct CidState; - std::unique_ptr<CidState> m_Impl; + struct Impl; + std::unique_ptr<Impl> m_Impl; }; } // namespace zen |