aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-09-21 14:17:23 +0200
committerStefan Boberg <[email protected]>2021-09-21 14:17:23 +0200
commitc35c36bf81cae52dacf8e3f8dc858bb376ca424b (patch)
tree891475f41d4c8be86cbb3f2bd6c269f596ff668d
parentRemoved scrubbing from CasImpl::Initialize since this is triggered by higher ... (diff)
downloadzen-c35c36bf81cae52dacf8e3f8dc858bb376ca424b.tar.xz
zen-c35c36bf81cae52dacf8e3f8dc858bb376ca424b.zip
Wired up scrubbing to more higher level services
Also moved sharding logic for filecas into a function to redduce cut/pasta
-rw-r--r--zenserver/cache/structuredcache.cpp11
-rw-r--r--zenserver/cache/structuredcache.h1
-rw-r--r--zenserver/cache/structuredcachestore.cpp7
-rw-r--r--zenserver/cache/structuredcachestore.h1
-rw-r--r--zenstore/cidstore.cpp51
-rw-r--r--zenstore/filecas.cpp83
-rw-r--r--zenstore/filecas.h15
-rw-r--r--zenstore/include/zenstore/CAS.h14
-rw-r--r--zenstore/include/zenstore/cidstore.h4
9 files changed, 142 insertions, 45 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 7f1fe7b44..533fea498 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -176,7 +176,16 @@ HttpStructuredCacheService::Flush()
void
HttpStructuredCacheService::Scrub(ScrubContext& Ctx)
{
- ZEN_UNUSED(Ctx);
+ if (m_LastScrubTime == Ctx.ScrubTimestamp())
+ {
+ return;
+ }
+
+ m_LastScrubTime = Ctx.ScrubTimestamp();
+
+ m_CasStore.Scrub(Ctx);
+ m_CidStore.Scrub(Ctx);
+ m_CacheStore.Scrub(Ctx);
}
void
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index bd163dd1d..c673ea1f5 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -81,6 +81,7 @@ private:
zen::CasStore& m_CasStore;
zen::CidStore& m_CidStore;
std::unique_ptr<UpstreamCache> m_UpstreamCache;
+ uint64_t m_LastScrubTime = 0;
};
} // namespace zen
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 502ca6605..3d80bb14c 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -108,6 +108,13 @@ ZenCacheStore::Flush()
void
ZenCacheStore::Scrub(ScrubContext& Ctx)
{
+ if (m_LastScrubTime == Ctx.ScrubTimestamp())
+ {
+ return;
+ }
+
+ m_LastScrubTime = Ctx.ScrubTimestamp();
+
m_DiskLayer.Scrub(Ctx);
m_MemLayer.Scrub(Ctx);
}
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index fdf4a8cfe..2cc3abb53 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -112,6 +112,7 @@ private:
ZenCacheMemoryLayer m_MemLayer;
ZenCacheDiskLayer m_DiskLayer;
uint64_t m_DiskLayerSizeThreshold = 4 * 1024;
+ uint64_t m_LastScrubTime = 0;
};
/** Tracks cache entry access, stats and orchestrates cleanup activities
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index 5e266f9d3..9c8d4742c 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -12,9 +12,9 @@
namespace zen {
-struct CidStore::CidState
+struct CidStore::Impl
{
- CidState(CasStore& InCasStore) : m_CasStore(InCasStore) {}
+ Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {}
struct IndexEntry
{
@@ -42,18 +42,26 @@ struct CidStore::CidState
void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed)
{
+ ZEN_ASSERT(Compressed != IoHash::Zero);
+
RwLock::ExclusiveLockScope _(m_Lock);
m_CidMap.insert_or_assign(DecompressedId, Compressed);
// TODO: it's pretty wasteful to log even idempotent updates
// however we can't simply use the boolean returned by insert_or_assign
// since there's not a 1:1 mapping between compressed and uncompressed
// so if we want a last-write-wins policy then we have to log each update
+ LogMapping(DecompressedId, Compressed);
+ }
+
+ void LogMapping(const IoHash& DecompressedId, const IoHash& Compressed)
+ {
m_LogFile.Append({.Uncompressed = DecompressedId, .Compressed = Compressed});
}
IoBuffer FindChunkByCid(const IoHash& DecompressedId)
{
IoHash CompressedHash;
+
{
RwLock::SharedLockScope _(m_Lock);
if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end())
@@ -62,12 +70,9 @@ struct CidStore::CidState
}
}
- if (CompressedHash != IoHash::Zero)
- {
- return m_CasStore.FindChunk(CompressedHash);
- }
+ ZEN_ASSERT(CompressedHash != IoHash::Zero);
- return IoBuffer();
+ return m_CasStore.FindChunk(CompressedHash);
}
bool ContainsChunk(const IoHash& DecompressedId)
@@ -75,7 +80,17 @@ struct CidStore::CidState
RwLock::SharedLockScope _(m_Lock);
// Note that we do not check CAS here. This is optimistic but usually
// what we want.
- return m_CidMap.find(DecompressedId) != m_CidMap.end();
+ auto It = m_CidMap.find(DecompressedId);
+
+ if (It == m_CidMap.end())
+ {
+ // Not in map, or tombstone
+ return false;
+ }
+
+ ZEN_ASSERT(It->second != IoHash::Zero);
+
+ return true;
}
void InitializeIndex(const std::filesystem::path& RootDir)
@@ -87,6 +102,8 @@ struct CidStore::CidState
m_LogFile.Open(SlogPath, IsNew);
+ uint64_t TombstoneCount = 0;
+
m_LogFile.Replay([&](const IndexEntry& Ie) {
if (Ie.Compressed != IoHash::Zero)
{
@@ -97,16 +114,24 @@ struct CidStore::CidState
{
// Tombstone
m_CidMap.erase(Ie.Uncompressed);
+ ++TombstoneCount;
}
});
- ZEN_DEBUG("CID index initialized: {} entries found", m_CidMap.size());
+ ZEN_INFO("CID index initialized: {} entries found ({} tombstones)", m_CidMap.size(), TombstoneCount);
}
void Flush() { m_LogFile.Flush(); }
void Scrub(ScrubContext& Ctx)
{
+ if (Ctx.ScrubTimestamp() == m_LastScrubTime)
+ {
+ return;
+ }
+
+ m_LastScrubTime = Ctx.ScrubTimestamp();
+
CasChunkSet ChunkSet;
{
@@ -126,7 +151,9 @@ struct CidStore::CidState
return;
}
- ZEN_ERROR("Scrubbing found that {} cid mappings mapped to non-existent CAS chunks", ChunkSet.GetChunkSet().size());
+ ZEN_ERROR("Scrubbing found that {} cid mappings (out of {}) mapped to non-existent CAS chunks. These mappings will be removed",
+ ChunkSet.GetChunkSet().size(),
+ m_CidMap.size());
// Erase all mappings to chunks which are not present in the underlying CAS store
// we do this by removing mappings from the in-memory lookup structure and also
@@ -163,11 +190,13 @@ struct CidStore::CidState
Ctx.ReportBadChunks(BadChunks);
}
+
+ uint64_t m_LastScrubTime = 0;
};
//////////////////////////////////////////////////////////////////////////
-CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique<CidState>(InCasStore))
+CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique<Impl>(InCasStore))
{
m_Impl->InitializeIndex(RootDir);
}
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index 968c9f3a0..1fcae6d02 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -31,7 +31,16 @@ namespace zen {
using namespace fmt::literals;
-FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config) : m_Config(Config)
+FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash)
+{
+ ShardedPath.Append(RootPath.c_str());
+ ShardedPath.Append(std::filesystem::path::preferred_separator);
+ MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config) : m_Config(Config), m_Log(logging::Get("filecas"))
{
}
@@ -78,11 +87,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
IoBufferFileReference FileRef;
if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef))
{
- size_t Shard2len = 0;
- ExtendableWideStringBuilder<128> ShardedPath;
- ShardedPath.Append(m_Config.RootDirectory.c_str());
- ShardedPath.Append(std::filesystem::path::preferred_separator);
- MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len);
+ ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
auto DeletePayloadFileOnClose = [&] {
// This will cause the file to be deleted when the last handle to it is closed
@@ -105,7 +110,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
{
CAtlFile PayloadFile;
- if (HRESULT hRes = PayloadFile.Create(ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes))
+ if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes))
{
// If we succeeded in opening the target file then we don't need to do anything else because it already exists
// and should contain the content we were about to insert
@@ -118,7 +123,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
}
}
- std::filesystem::path FullPath(ShardedPath.c_str());
+ std::filesystem::path FullPath(Name.ShardedPath.c_str());
std::filesystem::path FilePath = FullPath.parent_path();
std::wstring FileName = FullPath.native();
@@ -194,11 +199,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
CasStore::InsertResult
FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash)
{
- size_t Shard2len = 0;
- ExtendableWideStringBuilder<128> ShardedPath;
- ShardedPath.Append(m_Config.RootDirectory.c_str());
- ShardedPath.Append(std::filesystem::path::preferred_separator);
- MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len);
+ ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
// See if file already exists
//
@@ -206,7 +207,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
CAtlFile PayloadFile;
- HRESULT hRes = PayloadFile.Create(ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
+ HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
if (SUCCEEDED(hRes))
{
@@ -221,7 +222,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
// For now, use double-checked locking to see if someone else was first
- hRes = PayloadFile.Create(ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
+ hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
if (SUCCEEDED(hRes))
{
@@ -235,7 +236,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
ZEN_WARN("Unexpected error code when opening shard file for read: {:#x}", uint32_t(hRes));
}
- auto InternalCreateFile = [&] { return PayloadFile.Create(ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); };
+ auto InternalCreateFile = [&] { return PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); };
hRes = InternalCreateFile();
@@ -243,14 +244,14 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
{
// Ensure parent directories exist and retry file creation
- std::filesystem::create_directories(std::wstring_view(ShardedPath.c_str(), Shard2len));
+ std::filesystem::create_directories(std::wstring_view(Name.ShardedPath.c_str(), Name.Shard2len));
hRes = InternalCreateFile();
}
if (FAILED(hRes))
{
- ThrowSystemException(hRes, "Failed to open shard file '{}'"_format(WideToUtf8(ShardedPath)));
+ ThrowSystemException(hRes, "Failed to open shard file '{}'"_format(WideToUtf8(Name.ShardedPath)));
}
size_t ChunkRemain = ChunkSize;
@@ -276,36 +277,37 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
IoBuffer
FileCasStrategy::FindChunk(const IoHash& ChunkHash)
{
- size_t Shard2len = 0;
- ExtendableWideStringBuilder<128> ShardedPath;
- ShardedPath.Append(m_Config.RootDirectory.c_str());
- ShardedPath.Append(std::filesystem::path::preferred_separator);
- MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len);
+ ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
- return IoBufferBuilder::MakeFromFile(ShardedPath.c_str());
+ return IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
}
bool
FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
{
- size_t Shard2len = 0;
- ExtendableWideStringBuilder<128> ShardedPath;
- ShardedPath.Append(m_Config.RootDirectory.c_str());
- ShardedPath.Append(std::filesystem::path::preferred_separator);
- MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len);
+ ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
std::error_code Ec;
- if (std::filesystem::exists(ShardedPath.c_str(), Ec))
+ if (std::filesystem::exists(Name.ShardedPath.c_str(), Ec))
{
return true;
}
return false;
}
+void
+FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
+{
+ ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+
+ ZEN_DEBUG("deleting CAS payload file '{}'", WideToUtf8(Name.ShardedPath));
+
+ std::filesystem::remove(Name.ShardedPath.c_str(), Ec);
+}
void
FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks)
@@ -421,6 +423,27 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
}
});
+ if (!BadHashes.empty())
+ {
+ ZEN_ERROR("file CAS scrubbing: {} bad chunks found", BadHashes.size());
+
+ if (Ctx.RunRecovery())
+ {
+ ZEN_WARN("recovery: deleting backing files for {} bad chunks which were found to be bad", BadHashes.size());
+
+ for (const IoHash& Hash : BadHashes)
+ {
+ std::error_code Ec;
+ DeleteChunk(Hash, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("failed to delete file for chunk {}", Hash);
+ }
+ }
+ }
+ }
+
Ctx.ReportBadChunks(BadHashes);
}
diff --git a/zenstore/filecas.h b/zenstore/filecas.h
index 18102968a..2e09367df 100644
--- a/zenstore/filecas.h
+++ b/zenstore/filecas.h
@@ -12,6 +12,10 @@
#include <functional>
+namespace spdlog {
+class logger;
+}
+
namespace zen {
class BasicFile;
@@ -37,10 +41,21 @@ private:
const CasStoreConfiguration& m_Config;
RwLock m_Lock;
RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
+ spdlog::logger& m_Log;
+ spdlog::logger& Log() { return m_Log; }
inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; }
static WideStringBuilderBase& MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHash& ChunkHash, size_t& OutShard2len);
void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback);
+ void DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec);
+
+ struct ShardingHelper
+ {
+ ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash);
+
+ size_t Shard2len = 0;
+ ExtendableWideStringBuilder<128> ShardedPath;
+ };
};
} // namespace zen
diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h
index bb310b179..ed235bb4b 100644
--- a/zenstore/include/zenstore/CAS.h
+++ b/zenstore/include/zenstore/CAS.h
@@ -8,6 +8,7 @@
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/refcount.h>
+#include <zencore/timer.h>
#include <atomic>
#include <filesystem>
#include <memory>
@@ -37,12 +38,22 @@ public:
private:
};
+/** Context object for data scrubbing
+ *
+ * Data scrubbing is when we traverse stored data to validate it and
+ * optionally correct/recover
+ */
+
class ScrubContext
{
public:
- virtual void ReportBadChunks(std::span<IoHash> BadChunks);
+ virtual void ReportBadChunks(std::span<IoHash> BadChunks);
+ inline uint64_t ScrubTimestamp() const { return m_ScrubTime; }
+ inline bool RunRecovery() const { return m_Recover; }
private:
+ uint64_t m_ScrubTime = GetHifreqTimerValue();
+ bool m_Recover = true;
};
class CasChunkSet
@@ -78,6 +89,7 @@ public:
protected:
CasStoreConfiguration m_Config;
+ uint64_t m_LastScrubTime = 0;
};
ZENCORE_API CasStore* CreateCasStore();
diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h
index 49f2bf99a..f4439e083 100644
--- a/zenstore/include/zenstore/cidstore.h
+++ b/zenstore/include/zenstore/cidstore.h
@@ -50,8 +50,8 @@ public:
// TODO: add batch filter support
private:
- struct CidState;
- std::unique_ptr<CidState> m_Impl;
+ struct Impl;
+ std::unique_ptr<Impl> m_Impl;
};
} // namespace zen