diff options
| author | Dan Engelbrecht <[email protected]> | 2023-02-13 07:39:15 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-02-13 08:39:15 +0100 |
| commit | 697f03d670a3164a343c97ae301853798ca28ba1 (patch) | |
| tree | cf3205cecc15f1c22e8a8af925c0dcb3058cebbf | |
| parent | 0.2.3 (diff) | |
| download | zen-697f03d670a3164a343c97ae301853798ca28ba1.tar.xz zen-697f03d670a3164a343c97ae301853798ca28ba1.zip | |
FileCas (#226)
* maintain snapshot of disk state in file cas
* Add folder scanning to establish initial state for filecas and pre-scrubbing
* changelog
| -rw-r--r-- | CHANGELOG.md | 3 | ||||
| -rw-r--r-- | zenstore/filecas.cpp | 551 | ||||
| -rw-r--r-- | zenstore/filecas.h | 31 |
3 files changed, 452 insertions, 133 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 592ef19e4..1d9461b3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ ## +- Improvement: FileCas now keeps an up to date index of all the entries improving performance when getting cache misses on large payloads + +## 0.2.3 - Feature: Add support for "packagedata" mapping in oplog entries - Feature: Zen command line tool `project-create` to create a project store project - `--project` Project name (id) diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index e67653e8a..bfed99639 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -31,6 +31,7 @@ #include <unordered_map> ZEN_THIRD_PARTY_INCLUDES_START +#include <xxhash.h> #if ZEN_PLATFORM_WINDOWS # include <atlfile.h> #endif @@ -38,6 +39,46 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { +namespace { + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".ulog"; + + std::filesystem::path GetIndexPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", IndexExtension); } + + std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootDir) + { + return RootDir / fmt::format("cas.tmp{}", IndexExtension); + } + + std::filesystem::path GetLogPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", LogExtension); } + +#pragma pack(push) +#pragma pack(1) + + struct FileCasIndexHeader + { + static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; + static constexpr uint32_t CurrentVersion = 1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t Reserved = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const FileCasIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(FileCasIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } + }; + + static_assert(sizeof(FileCasIndexHeader) == 32); + +#pragma pack(pop) + +} // namespace + FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash) { ShardedPath.Append(RootPath.c_str()); @@ -88,34 +129,32 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN m_RootDirectory = RootDirectory; - CreateDirectories(m_RootDirectory); + m_Index.clear(); - m_CasLog.Open(m_RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); + std::filesystem::path LogPath = GetLogPath(m_RootDirectory); + std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory); - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read log {} containing {}", m_RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed))); - }); + if (IsNewStore) + { + std::filesystem::remove(LogPath); + std::filesystem::remove(IndexPath); + std::filesystem::remove_all(RootDirectory); + } - m_KnownEntries.reserve(10000); - m_CasLog.Replay( - [&](const FileCasIndexEntry& Entry) { - if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone)) - { - if (m_KnownEntries.erase(Entry.Key) == 1u) - { - m_TotalSize.fetch_sub(Entry.Size, std::memory_order_relaxed); - } - } - else - { - if (m_KnownEntries.insert(Entry.Key).second) - { - m_TotalSize.fetch_add(Entry.Size, std::memory_order_relaxed); - } - } - }, - 0); + m_LogFlushPosition = ReadIndexFile(); + uint64_t LogEntryCount = ReadLog(m_LogFlushPosition); + for (const auto& Entry : m_Index) + { + m_TotalSize.fetch_add(Entry.second.Size, std::memory_order::relaxed); + } + + CreateDirectories(m_RootDirectory); + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); + + if (IsNewStore || LogEntryCount > 0) + { + MakeIndexSnapshot(); + } } #if ZEN_PLATFORM_WINDOWS @@ -152,7 +191,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: { { RwLock::SharedLockScope _(m_Lock); - if (m_KnownEntries.contains(ChunkHash)) + if (m_Index.contains(ChunkHash)) { return CasStore::InsertResult{.New = false}; } @@ -170,7 +209,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: bool Exists = true; { RwLock::SharedLockScope _(m_Lock); - Exists = m_KnownEntries.contains(ChunkHash); + Exists = m_Index.contains(ChunkHash); } if (Exists) { @@ -218,11 +257,12 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: bool IsNew = false; { RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_KnownEntries.insert(ChunkHash).second; + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; } if (IsNew) + { - m_TotalSize.fetch_add(static_cast<uint64_t>(Chunk.Size()), std::memory_order::relaxed); + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); } DeletePayloadFileOnClose(ChunkFileHandle); @@ -330,7 +370,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: bool IsNew = false; { RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_KnownEntries.insert(ChunkHash).second; + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; } if (IsNew) { @@ -350,7 +390,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: bool IsNew = false; { RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_KnownEntries.insert(ChunkHash).second; + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; } if (IsNew) { @@ -399,7 +439,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: bool IsNew = false; { RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_KnownEntries.insert(ChunkHash).second; + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; } if (IsNew) { @@ -418,7 +458,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: bool IsNew = false; { RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_KnownEntries.insert(ChunkHash).second; + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; } if (IsNew) { @@ -437,6 +477,14 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize { ZEN_ASSERT(m_IsInitialized); + { + RwLock::SharedLockScope _(m_Lock); + if (m_Index.contains(ChunkHash)) + { + return {.New = false}; + } + } + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); // See if file already exists @@ -454,7 +502,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize bool IsNew = false; { RwLock::ExclusiveLockScope _(m_Lock); - IsNew = m_KnownEntries.insert(ChunkHash).second; + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; } if (IsNew) { @@ -490,7 +538,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize bool IsNew = false; { RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_KnownEntries.insert(ChunkHash).second; + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; } if (IsNew) { @@ -550,7 +598,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize bool IsNew = false; { RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_KnownEntries.insert(ChunkHash).second; + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; } if (IsNew) { @@ -616,7 +664,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize bool IsNew = false; { RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_KnownEntries.insert(ChunkHash).second; + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; } if (IsNew) { @@ -631,6 +679,14 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash) { ZEN_ASSERT(m_IsInitialized); + { + RwLock::SharedLockScope _(m_Lock); + if (!m_Index.contains(ChunkHash)) + { + return {}; + } + } + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); @@ -643,17 +699,8 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash) { ZEN_ASSERT(m_IsInitialized); - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - - RwLock::SharedLockScope _(LockForHash(ChunkHash)); - - std::error_code Ec; - if (std::filesystem::exists(Name.ShardedPath.c_str(), Ec)) - { - return true; - } - - return false; + RwLock::SharedLockScope _(m_Lock); + return m_Index.contains(ChunkHash); } void @@ -669,14 +716,17 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) } ZEN_DEBUG("deleting CAS payload file '{}' {}", Name.ShardedPath.ToUtf8(), NiceBytes(FileSize)); - std::filesystem::remove(Name.ShardedPath.c_str(), Ec); - if (!Ec) + if (!Ec || !std::filesystem::exists(Name.ShardedPath.c_str())) { - if (m_KnownEntries.erase(ChunkHash) == 1u) { - m_TotalSize.fetch_sub(FileSize, std::memory_order_relaxed); + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) + { + m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed); + m_Index.erase(It); + } } m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = FileSize}); } @@ -699,64 +749,18 @@ FileCasStrategy::FilterChunks(HashKeySet& InOutChunks) } void -FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback) +FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback) { ZEN_ASSERT(m_IsInitialized); - struct Visitor : public FileSystemTraversal::TreeVisitor + RwLock::SharedLockScope _(m_Lock); + for (const auto& It : m_Index) { - Visitor(const std::filesystem::path& RootDir) : RootDirectory(RootDir) {} - virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override - { - ZEN_UNUSED(FileSize); - - std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory); - - std::filesystem::path::string_type PathString = RelPath.native(); - - if ((PathString.size() == (3 + 2 + 1)) && (File.size() == (40 - 3 - 2))) - { - if (PathString.at(3) == std::filesystem::path::preferred_separator) - { - PathString.erase(3, 1); - } - PathString.append(File); - - // TODO: should validate that we're actually dealing with a valid hex string here - -#if ZEN_PLATFORM_WINDOWS - StringBuilder<64> Utf8; - WideToUtf8(PathString, Utf8); - IoHash NameHash = IoHash::FromHexString({Utf8.Data(), Utf8.Size()}); -#else - IoHash NameHash = IoHash::FromHexString(PathString); -#endif - - BasicFile PayloadFile; - std::error_code Ec; - PayloadFile.Open(Parent / File, BasicFile::Mode::kWrite, Ec); - - if (!Ec) - { - Callback(NameHash, PayloadFile); - } - } - } - - virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, - [[maybe_unused]] const path_view& DirectoryName) override - { - return true; - } - - const std::filesystem::path& RootDirectory; - std::function<void(const IoHash& Hash, BasicFile& PayloadFile)> Callback; - } CasVisitor{m_RootDirectory}; - - CasVisitor.Callback = std::move(Callback); - - FileSystemTraversal Traversal; - Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor); + const IoHash& NameHash = It.first; + ShardingHelper Name(m_RootDirectory.c_str(), NameHash); + IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); + Callback(NameHash, std::move(Payload)); + } } void @@ -782,14 +786,31 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) std::vector<IoHash> BadHashes; uint64_t ChunkCount{0}, ChunkBytes{0}; - IterateChunks([&](const IoHash& Hash, BasicFile& Payload) { + { + std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory); + RwLock::ExclusiveLockScope _(m_Lock); + for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries) + { + if (m_Index.insert({Entry.Key, {.Size = Entry.Size}}).second) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(Entry.Size), std::memory_order::relaxed); + m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size}); + } + } + } + + IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { + if (!Payload) + { + BadHashes.push_back(Hash); + return; + } ++ChunkCount; - ChunkBytes += Payload.FileSize(); + ChunkBytes += Payload.GetSize(); - IoBuffer Buffer(IoBuffer::BorrowedFile, Payload.Handle(), 0, Payload.FileSize()); IoHash RawHash; uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + if (CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize)) { if (RawHash != Hash) { @@ -800,9 +821,7 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) return; } #if ZEN_WITH_TESTS - IoHashStream Hasher; - Payload.StreamByteRange(0, Payload.FileSize(), [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); - IoHash ComputedHash = Hasher.GetHash(); + IoHash ComputedHash = IoHash::HashBuffer(CompositeBuffer(SharedBuffer(std::move(Payload)))); if (ComputedHash == Hash) { return; @@ -870,7 +889,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) NiceBytes(OldTotalSize)); }); - IterateChunks([&](const IoHash& Hash, BasicFile& Payload) { + IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { bool KeepThis = false; CandidateCas[0] = Hash; GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) { @@ -878,15 +897,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) KeepThis = true; }); - const uint64_t FileSize = Payload.FileSize(); - // Is this a file we did not track previously? - { - RwLock::ExclusiveLockScope _(m_Lock); - if (m_KnownEntries.insert(Hash).second) - { - m_TotalSize.fetch_add(FileSize, std::memory_order_relaxed); - } - } + const uint64_t FileSize = Payload.GetSize(); if (!KeepThis) { @@ -898,6 +909,8 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) ChunkBytes.fetch_add(FileSize); }); + // TODO, any entires we did not encounter during our IterateChunks should be removed from the index + if (ChunksToDelete.empty()) { ZEN_DEBUG("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory); @@ -935,7 +948,297 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) GcCtx.AddDeletedCids(ChunksToDelete); } -////////////////////////////////////////////////////////////////////////// +bool +FileCasStrategy::ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason) +{ + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if (Entry.Flags & (~FileCasIndexEntry::kTombStone)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); + return false; + } + if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone)) + { + return true; + } + uint64_t Size = Entry.Size; + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; +} + +void +FileCasStrategy::MakeIndexSnapshot() +{ + uint64_t LogCount = m_CasLog.GetLogCount(); + if (m_LogFlushPosition == LogCount) + { + return; + } + ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory); + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}", + m_RootDirectory, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + namespace fs = std::filesystem; + + fs::path IndexPath = GetIndexPath(m_RootDirectory); + fs::path STmpIndexPath = GetTempIndexPath(m_RootDirectory); + + // Move index away, we keep it if something goes wrong + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } + if (fs::is_regular_file(IndexPath)) + { + fs::rename(IndexPath, STmpIndexPath); + } + + try + { + // Write the current state of the location map to a new index state + std::vector<FileCasIndexEntry> Entries; + + { + Entries.resize(m_Index.size()); + + uint64_t EntryIndex = 0; + for (auto& Entry : m_Index) + { + FileCasIndexEntry& IndexEntry = Entries[EntryIndex++]; + IndexEntry.Key = Entry.first; + IndexEntry.Size = Entry.second.Size; + } + } + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); + FileCasIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = LogCount}; + + Header.Checksum = FileCasIndexHeader::ComputeChecksum(Header); + + ObjectIndexFile.Write(&Header, sizeof(FileCasIndexHeader), 0); + ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(FileCasIndexEntry), sizeof(FileCasIndexHeader)); + ObjectIndexFile.Flush(); + ObjectIndexFile.Close(); + EntryCount = Entries.size(); + m_LogFlushPosition = LogCount; + } + catch (std::exception& Err) + { + ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); + + // Restore any previous snapshot + + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(IndexPath); + fs::rename(STmpIndexPath, IndexPath); + } + } + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } +} +uint64_t +FileCasStrategy::ReadIndexFile() +{ + std::vector<FileCasIndexEntry> Entries; + std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory); + if (std::filesystem::is_regular_file(IndexPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' index containing #{} entries in {}", + IndexPath, + Entries.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(FileCasIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(FileCasIndexHeader))) / sizeof(FileCasIndexEntry); + FileCasIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if ((Header.Magic == FileCasIndexHeader::ExpectedMagic) && (Header.Version == FileCasIndexHeader::CurrentVersion) && + (Header.Checksum == FileCasIndexHeader::ComputeChecksum(Header)) && (Header.EntryCount <= ExpectedEntryCount)) + { + Entries.resize(Header.EntryCount); + ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(FileCasIndexEntry), sizeof(FileCasIndexHeader)); + + std::string InvalidEntryReason; + for (const FileCasIndexEntry& Entry : Entries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}); + } + + return Header.LogPosition; + } + else + { + ZEN_WARN("skipping invalid index file '{}'", IndexPath); + } + } + return 0; + } + + if (std::filesystem::is_directory(m_RootDirectory)) + { + ZEN_INFO("missing index for file cas, scanning for cas files in {}", m_RootDirectory); + TCasLogFile<FileCasIndexEntry> CasLog; + uint64_t TotalSize = 0; + Stopwatch TotalTimer; + const auto _ = MakeGuard([&] { + ZEN_INFO("scanned file cas folder '{}' DONE after {}, found {} files totalling {}", + m_RootDirectory, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + CasLog.GetLogCount(), + NiceBytes(TotalSize)); + }); + + std::filesystem::path LogPath = GetLogPath(m_RootDirectory); + + std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory); + CasLog.Open(LogPath, CasLogFile::Mode::kTruncate); + std::string InvalidEntryReason; + for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", m_RootDirectory, InvalidEntryReason); + continue; + } + m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}); + } + + CasLog.Close(); + } + + return 0; +} + +uint64_t +FileCasStrategy::ReadLog(uint64_t SkipEntryCount) +{ + std::filesystem::path LogPath = GetLogPath(m_RootDirectory); + if (std::filesystem::is_regular_file(LogPath)) + { + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' log containing #{} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + TCasLogFile<FileCasIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (CasLog.Initialize()) + { + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + LogEntryCount = EntryCount - SkipEntryCount; + m_Index.reserve(LogEntryCount); + uint64_t InvalidEntryCount = 0; + CasLog.Replay( + [&](const FileCasIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Flags & FileCasIndexEntry::kTombStone) + { + m_Index.erase(Record.Key); + return; + } + if (!ValidateEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + m_Index.insert_or_assign(Record.Key, IndexEntry{.Size = Record.Size}); + }, + SkipEntryCount); + if (InvalidEntryCount) + { + ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, LogPath); + } + return LogEntryCount; + } + } + return 0; +} + +std::vector<FileCasStrategy::FileCasIndexEntry> +FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir) +{ + std::vector<FileCasIndexEntry> Entries; + struct Visitor : public FileSystemTraversal::TreeVisitor + { + Visitor(const std::filesystem::path& RootDir, std::vector<FileCasIndexEntry>& Entries) : RootDirectory(RootDir), Entries(Entries) {} + virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override + { + std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory); + + std::filesystem::path::string_type PathString = RelPath.native(); + + if ((PathString.size() == (3 + 2 + 1)) && (File.size() == (40 - 3 - 2))) + { + if (PathString.at(3) == std::filesystem::path::preferred_separator) + { + PathString.erase(3, 1); + } + PathString.append(File); + + // TODO: should validate that we're actually dealing with a valid hex string here +#if ZEN_PLATFORM_WINDOWS + StringBuilder<64> Utf8; + WideToUtf8(PathString, Utf8); + IoHash NameHash = IoHash::FromHexString({Utf8.Data(), Utf8.Size()}); +#else + IoHash NameHash = IoHash::FromHexString(PathString); +#endif + Entries.emplace_back(FileCasIndexEntry{.Key = NameHash, .Size = FileSize}); + } + } + + virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, + [[maybe_unused]] const path_view& DirectoryName) override + { + return true; + } + + const std::filesystem::path& RootDirectory; + std::vector<FileCasIndexEntry>& Entries; + } CasVisitor{RootDir, Entries}; + + FileSystemTraversal Traversal; + Traversal.TraverseFileSystem(RootDir, CasVisitor); + return Entries; +}; + + ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS diff --git a/zenstore/filecas.h b/zenstore/filecas.h index 29c28560e..420b3a634 100644 --- a/zenstore/filecas.h +++ b/zenstore/filecas.h @@ -45,16 +45,26 @@ struct FileCasStrategy final : public GcStorage virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; } private: + void MakeIndexSnapshot(); + uint64_t ReadIndexFile(); + uint64_t ReadLog(uint64_t LogPosition); + + struct IndexEntry + { + uint64_t Size = 0; + }; + using IndexMap = tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher>; + CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); - std::filesystem::path m_RootDirectory; - RwLock m_Lock; - std::unordered_set<IoHash, IoHash::Hasher> m_KnownEntries; - RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines - spdlog::logger& m_Log; - spdlog::logger& Log() { return m_Log; } - std::atomic_uint64_t m_TotalSize{}; - bool m_IsInitialized = false; + std::filesystem::path m_RootDirectory; + RwLock m_Lock; + IndexMap m_Index; + RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines + spdlog::logger& m_Log; + spdlog::logger& Log() { return m_Log; } + std::atomic_uint64_t m_TotalSize{}; + bool m_IsInitialized = false; struct FileCasIndexEntry { @@ -66,13 +76,16 @@ private: uint32_t Flags = 0; uint64_t Size = 0; }; + static bool ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason); + static std::vector<FileCasStrategy::FileCasIndexEntry> ScanFolderForCasFiles(const std::filesystem::path& RootDir); static_assert(sizeof(FileCasIndexEntry) == 32); TCasLogFile<FileCasIndexEntry> m_CasLog; + uint64_t m_LogFlushPosition = 0; inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; } - void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback); + void IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback); void DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec); struct ShardingHelper |