aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-02-13 07:39:15 +0000
committerGitHub <[email protected]>2023-02-13 08:39:15 +0100
commit697f03d670a3164a343c97ae301853798ca28ba1 (patch)
treecf3205cecc15f1c22e8a8af925c0dcb3058cebbf
parent0.2.3 (diff)
downloadzen-697f03d670a3164a343c97ae301853798ca28ba1.tar.xz
zen-697f03d670a3164a343c97ae301853798ca28ba1.zip
FileCas (#226)
* maintain snapshot of disk state in file cas * Add folder scanning to establish initial state for filecas and pre-scrubbing * changelog
-rw-r--r--CHANGELOG.md3
-rw-r--r--zenstore/filecas.cpp551
-rw-r--r--zenstore/filecas.h31
3 files changed, 452 insertions, 133 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 592ef19e4..1d9461b3f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,7 @@
##
+- Improvement: FileCas now keeps an up to date index of all the entries improving performance when getting cache misses on large payloads
+
+## 0.2.3
- Feature: Add support for "packagedata" mapping in oplog entries
- Feature: Zen command line tool `project-create` to create a project store project
- `--project` Project name (id)
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index e67653e8a..bfed99639 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -31,6 +31,7 @@
#include <unordered_map>
ZEN_THIRD_PARTY_INCLUDES_START
+#include <xxhash.h>
#if ZEN_PLATFORM_WINDOWS
# include <atlfile.h>
#endif
@@ -38,6 +39,46 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
+namespace {
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".ulog";
+
+ std::filesystem::path GetIndexPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", IndexExtension); }
+
+ std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootDir)
+ {
+ return RootDir / fmt::format("cas.tmp{}", IndexExtension);
+ }
+
+ std::filesystem::path GetLogPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", LogExtension); }
+
+#pragma pack(push)
+#pragma pack(1)
+
+ struct FileCasIndexHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
+ static constexpr uint32_t CurrentVersion = 1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t Reserved = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const FileCasIndexHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(FileCasIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+ };
+
+ static_assert(sizeof(FileCasIndexHeader) == 32);
+
+#pragma pack(pop)
+
+} // namespace
+
FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash)
{
ShardedPath.Append(RootPath.c_str());
@@ -88,34 +129,32 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN
m_RootDirectory = RootDirectory;
- CreateDirectories(m_RootDirectory);
+ m_Index.clear();
- m_CasLog.Open(m_RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
+ std::filesystem::path LogPath = GetLogPath(m_RootDirectory);
+ std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory);
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read log {} containing {}", m_RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed)));
- });
+ if (IsNewStore)
+ {
+ std::filesystem::remove(LogPath);
+ std::filesystem::remove(IndexPath);
+ std::filesystem::remove_all(RootDirectory);
+ }
- m_KnownEntries.reserve(10000);
- m_CasLog.Replay(
- [&](const FileCasIndexEntry& Entry) {
- if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone))
- {
- if (m_KnownEntries.erase(Entry.Key) == 1u)
- {
- m_TotalSize.fetch_sub(Entry.Size, std::memory_order_relaxed);
- }
- }
- else
- {
- if (m_KnownEntries.insert(Entry.Key).second)
- {
- m_TotalSize.fetch_add(Entry.Size, std::memory_order_relaxed);
- }
- }
- },
- 0);
+ m_LogFlushPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(m_LogFlushPosition);
+ for (const auto& Entry : m_Index)
+ {
+ m_TotalSize.fetch_add(Entry.second.Size, std::memory_order::relaxed);
+ }
+
+ CreateDirectories(m_RootDirectory);
+ m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ if (IsNewStore || LogEntryCount > 0)
+ {
+ MakeIndexSnapshot();
+ }
}
#if ZEN_PLATFORM_WINDOWS
@@ -152,7 +191,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
{
{
RwLock::SharedLockScope _(m_Lock);
- if (m_KnownEntries.contains(ChunkHash))
+ if (m_Index.contains(ChunkHash))
{
return CasStore::InsertResult{.New = false};
}
@@ -170,7 +209,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
bool Exists = true;
{
RwLock::SharedLockScope _(m_Lock);
- Exists = m_KnownEntries.contains(ChunkHash);
+ Exists = m_Index.contains(ChunkHash);
}
if (Exists)
{
@@ -218,11 +257,12 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
bool IsNew = false;
{
RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_KnownEntries.insert(ChunkHash).second;
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
}
if (IsNew)
+
{
- m_TotalSize.fetch_add(static_cast<uint64_t>(Chunk.Size()), std::memory_order::relaxed);
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
}
DeletePayloadFileOnClose(ChunkFileHandle);
@@ -330,7 +370,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
bool IsNew = false;
{
RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_KnownEntries.insert(ChunkHash).second;
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
}
if (IsNew)
{
@@ -350,7 +390,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
bool IsNew = false;
{
RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_KnownEntries.insert(ChunkHash).second;
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
}
if (IsNew)
{
@@ -399,7 +439,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
bool IsNew = false;
{
RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_KnownEntries.insert(ChunkHash).second;
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
}
if (IsNew)
{
@@ -418,7 +458,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
bool IsNew = false;
{
RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_KnownEntries.insert(ChunkHash).second;
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
}
if (IsNew)
{
@@ -437,6 +477,14 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
{
ZEN_ASSERT(m_IsInitialized);
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ if (m_Index.contains(ChunkHash))
+ {
+ return {.New = false};
+ }
+ }
+
ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
// See if file already exists
@@ -454,7 +502,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
bool IsNew = false;
{
RwLock::ExclusiveLockScope _(m_Lock);
- IsNew = m_KnownEntries.insert(ChunkHash).second;
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
}
if (IsNew)
{
@@ -490,7 +538,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
bool IsNew = false;
{
RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_KnownEntries.insert(ChunkHash).second;
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
}
if (IsNew)
{
@@ -550,7 +598,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
bool IsNew = false;
{
RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_KnownEntries.insert(ChunkHash).second;
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
}
if (IsNew)
{
@@ -616,7 +664,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
bool IsNew = false;
{
RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_KnownEntries.insert(ChunkHash).second;
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
}
if (IsNew)
{
@@ -631,6 +679,14 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash)
{
ZEN_ASSERT(m_IsInitialized);
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ if (!m_Index.contains(ChunkHash))
+ {
+ return {};
+ }
+ }
+
ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
@@ -643,17 +699,8 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
{
ZEN_ASSERT(m_IsInitialized);
- ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
-
- RwLock::SharedLockScope _(LockForHash(ChunkHash));
-
- std::error_code Ec;
- if (std::filesystem::exists(Name.ShardedPath.c_str(), Ec))
- {
- return true;
- }
-
- return false;
+ RwLock::SharedLockScope _(m_Lock);
+ return m_Index.contains(ChunkHash);
}
void
@@ -669,14 +716,17 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
}
ZEN_DEBUG("deleting CAS payload file '{}' {}", Name.ShardedPath.ToUtf8(), NiceBytes(FileSize));
-
std::filesystem::remove(Name.ShardedPath.c_str(), Ec);
- if (!Ec)
+ if (!Ec || !std::filesystem::exists(Name.ShardedPath.c_str()))
{
- if (m_KnownEntries.erase(ChunkHash) == 1u)
{
- m_TotalSize.fetch_sub(FileSize, std::memory_order_relaxed);
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (auto It = m_Index.find(ChunkHash); It != m_Index.end())
+ {
+ m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed);
+ m_Index.erase(It);
+ }
}
m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = FileSize});
}
@@ -699,64 +749,18 @@ FileCasStrategy::FilterChunks(HashKeySet& InOutChunks)
}
void
-FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback)
+FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback)
{
ZEN_ASSERT(m_IsInitialized);
- struct Visitor : public FileSystemTraversal::TreeVisitor
+ RwLock::SharedLockScope _(m_Lock);
+ for (const auto& It : m_Index)
{
- Visitor(const std::filesystem::path& RootDir) : RootDirectory(RootDir) {}
- virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override
- {
- ZEN_UNUSED(FileSize);
-
- std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory);
-
- std::filesystem::path::string_type PathString = RelPath.native();
-
- if ((PathString.size() == (3 + 2 + 1)) && (File.size() == (40 - 3 - 2)))
- {
- if (PathString.at(3) == std::filesystem::path::preferred_separator)
- {
- PathString.erase(3, 1);
- }
- PathString.append(File);
-
- // TODO: should validate that we're actually dealing with a valid hex string here
-
-#if ZEN_PLATFORM_WINDOWS
- StringBuilder<64> Utf8;
- WideToUtf8(PathString, Utf8);
- IoHash NameHash = IoHash::FromHexString({Utf8.Data(), Utf8.Size()});
-#else
- IoHash NameHash = IoHash::FromHexString(PathString);
-#endif
-
- BasicFile PayloadFile;
- std::error_code Ec;
- PayloadFile.Open(Parent / File, BasicFile::Mode::kWrite, Ec);
-
- if (!Ec)
- {
- Callback(NameHash, PayloadFile);
- }
- }
- }
-
- virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent,
- [[maybe_unused]] const path_view& DirectoryName) override
- {
- return true;
- }
-
- const std::filesystem::path& RootDirectory;
- std::function<void(const IoHash& Hash, BasicFile& PayloadFile)> Callback;
- } CasVisitor{m_RootDirectory};
-
- CasVisitor.Callback = std::move(Callback);
-
- FileSystemTraversal Traversal;
- Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor);
+ const IoHash& NameHash = It.first;
+ ShardingHelper Name(m_RootDirectory.c_str(), NameHash);
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
+ Callback(NameHash, std::move(Payload));
+ }
}
void
@@ -782,14 +786,31 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
std::vector<IoHash> BadHashes;
uint64_t ChunkCount{0}, ChunkBytes{0};
- IterateChunks([&](const IoHash& Hash, BasicFile& Payload) {
+ {
+ std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory);
+ RwLock::ExclusiveLockScope _(m_Lock);
+ for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries)
+ {
+ if (m_Index.insert({Entry.Key, {.Size = Entry.Size}}).second)
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(Entry.Size), std::memory_order::relaxed);
+ m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size});
+ }
+ }
+ }
+
+ IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
+ if (!Payload)
+ {
+ BadHashes.push_back(Hash);
+ return;
+ }
++ChunkCount;
- ChunkBytes += Payload.FileSize();
+ ChunkBytes += Payload.GetSize();
- IoBuffer Buffer(IoBuffer::BorrowedFile, Payload.Handle(), 0, Payload.FileSize());
IoHash RawHash;
uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+ if (CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize))
{
if (RawHash != Hash)
{
@@ -800,9 +821,7 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
return;
}
#if ZEN_WITH_TESTS
- IoHashStream Hasher;
- Payload.StreamByteRange(0, Payload.FileSize(), [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
- IoHash ComputedHash = Hasher.GetHash();
+ IoHash ComputedHash = IoHash::HashBuffer(CompositeBuffer(SharedBuffer(std::move(Payload))));
if (ComputedHash == Hash)
{
return;
@@ -870,7 +889,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
NiceBytes(OldTotalSize));
});
- IterateChunks([&](const IoHash& Hash, BasicFile& Payload) {
+ IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
bool KeepThis = false;
CandidateCas[0] = Hash;
GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) {
@@ -878,15 +897,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
KeepThis = true;
});
- const uint64_t FileSize = Payload.FileSize();
- // Is this a file we did not track previously?
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- if (m_KnownEntries.insert(Hash).second)
- {
- m_TotalSize.fetch_add(FileSize, std::memory_order_relaxed);
- }
- }
+ const uint64_t FileSize = Payload.GetSize();
if (!KeepThis)
{
@@ -898,6 +909,8 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
ChunkBytes.fetch_add(FileSize);
});
+ // TODO, any entires we did not encounter during our IterateChunks should be removed from the index
+
if (ChunksToDelete.empty())
{
ZEN_DEBUG("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory);
@@ -935,7 +948,297 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
GcCtx.AddDeletedCids(ChunksToDelete);
}
-//////////////////////////////////////////////////////////////////////////
+bool
+FileCasStrategy::ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason)
+{
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Flags & (~FileCasIndexEntry::kTombStone))
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone))
+ {
+ return true;
+ }
+ uint64_t Size = Entry.Size;
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+}
+
+void
+FileCasStrategy::MakeIndexSnapshot()
+{
+ uint64_t LogCount = m_CasLog.GetLogCount();
+ if (m_LogFlushPosition == LogCount)
+ {
+ return;
+ }
+ ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory);
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}",
+ m_RootDirectory,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ namespace fs = std::filesystem;
+
+ fs::path IndexPath = GetIndexPath(m_RootDirectory);
+ fs::path STmpIndexPath = GetTempIndexPath(m_RootDirectory);
+
+ // Move index away, we keep it if something goes wrong
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(STmpIndexPath);
+ }
+ if (fs::is_regular_file(IndexPath))
+ {
+ fs::rename(IndexPath, STmpIndexPath);
+ }
+
+ try
+ {
+ // Write the current state of the location map to a new index state
+ std::vector<FileCasIndexEntry> Entries;
+
+ {
+ Entries.resize(m_Index.size());
+
+ uint64_t EntryIndex = 0;
+ for (auto& Entry : m_Index)
+ {
+ FileCasIndexEntry& IndexEntry = Entries[EntryIndex++];
+ IndexEntry.Key = Entry.first;
+ IndexEntry.Size = Entry.second.Size;
+ }
+ }
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
+ FileCasIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = LogCount};
+
+ Header.Checksum = FileCasIndexHeader::ComputeChecksum(Header);
+
+ ObjectIndexFile.Write(&Header, sizeof(FileCasIndexHeader), 0);
+ ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(FileCasIndexEntry), sizeof(FileCasIndexHeader));
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.Close();
+ EntryCount = Entries.size();
+ m_LogFlushPosition = LogCount;
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
+
+ // Restore any previous snapshot
+
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(IndexPath);
+ fs::rename(STmpIndexPath, IndexPath);
+ }
+ }
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(STmpIndexPath);
+ }
+}
+uint64_t
+FileCasStrategy::ReadIndexFile()
+{
+ std::vector<FileCasIndexEntry> Entries;
+ std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory);
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' index containing #{} entries in {}",
+ IndexPath,
+ Entries.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(FileCasIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(FileCasIndexHeader))) / sizeof(FileCasIndexEntry);
+ FileCasIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if ((Header.Magic == FileCasIndexHeader::ExpectedMagic) && (Header.Version == FileCasIndexHeader::CurrentVersion) &&
+ (Header.Checksum == FileCasIndexHeader::ComputeChecksum(Header)) && (Header.EntryCount <= ExpectedEntryCount))
+ {
+ Entries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(FileCasIndexEntry), sizeof(FileCasIndexHeader));
+
+ std::string InvalidEntryReason;
+ for (const FileCasIndexEntry& Entry : Entries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
+ }
+ m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size});
+ }
+
+ return Header.LogPosition;
+ }
+ else
+ {
+ ZEN_WARN("skipping invalid index file '{}'", IndexPath);
+ }
+ }
+ return 0;
+ }
+
+ if (std::filesystem::is_directory(m_RootDirectory))
+ {
+ ZEN_INFO("missing index for file cas, scanning for cas files in {}", m_RootDirectory);
+ TCasLogFile<FileCasIndexEntry> CasLog;
+ uint64_t TotalSize = 0;
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("scanned file cas folder '{}' DONE after {}, found {} files totalling {}",
+ m_RootDirectory,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ CasLog.GetLogCount(),
+ NiceBytes(TotalSize));
+ });
+
+ std::filesystem::path LogPath = GetLogPath(m_RootDirectory);
+
+ std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory);
+ CasLog.Open(LogPath, CasLogFile::Mode::kTruncate);
+ std::string InvalidEntryReason;
+ for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", m_RootDirectory, InvalidEntryReason);
+ continue;
+ }
+ m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size});
+ }
+
+ CasLog.Close();
+ }
+
+ return 0;
+}
+
+uint64_t
+FileCasStrategy::ReadLog(uint64_t SkipEntryCount)
+{
+ std::filesystem::path LogPath = GetLogPath(m_RootDirectory);
+ if (std::filesystem::is_regular_file(LogPath))
+ {
+ uint64_t LogEntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' log containing #{} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ TCasLogFile<FileCasIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (CasLog.Initialize())
+ {
+ uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+ LogEntryCount = EntryCount - SkipEntryCount;
+ m_Index.reserve(LogEntryCount);
+ uint64_t InvalidEntryCount = 0;
+ CasLog.Replay(
+ [&](const FileCasIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Flags & FileCasIndexEntry::kTombStone)
+ {
+ m_Index.erase(Record.Key);
+ return;
+ }
+ if (!ValidateEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ m_Index.insert_or_assign(Record.Key, IndexEntry{.Size = Record.Size});
+ },
+ SkipEntryCount);
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, LogPath);
+ }
+ return LogEntryCount;
+ }
+ }
+ return 0;
+}
+
+std::vector<FileCasStrategy::FileCasIndexEntry>
+FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir)
+{
+ std::vector<FileCasIndexEntry> Entries;
+ struct Visitor : public FileSystemTraversal::TreeVisitor
+ {
+ Visitor(const std::filesystem::path& RootDir, std::vector<FileCasIndexEntry>& Entries) : RootDirectory(RootDir), Entries(Entries) {}
+ virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override
+ {
+ std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory);
+
+ std::filesystem::path::string_type PathString = RelPath.native();
+
+ if ((PathString.size() == (3 + 2 + 1)) && (File.size() == (40 - 3 - 2)))
+ {
+ if (PathString.at(3) == std::filesystem::path::preferred_separator)
+ {
+ PathString.erase(3, 1);
+ }
+ PathString.append(File);
+
+ // TODO: should validate that we're actually dealing with a valid hex string here
+#if ZEN_PLATFORM_WINDOWS
+ StringBuilder<64> Utf8;
+ WideToUtf8(PathString, Utf8);
+ IoHash NameHash = IoHash::FromHexString({Utf8.Data(), Utf8.Size()});
+#else
+ IoHash NameHash = IoHash::FromHexString(PathString);
+#endif
+ Entries.emplace_back(FileCasIndexEntry{.Key = NameHash, .Size = FileSize});
+ }
+ }
+
+ virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent,
+ [[maybe_unused]] const path_view& DirectoryName) override
+ {
+ return true;
+ }
+
+ const std::filesystem::path& RootDirectory;
+ std::vector<FileCasIndexEntry>& Entries;
+ } CasVisitor{RootDir, Entries};
+
+ FileSystemTraversal Traversal;
+ Traversal.TraverseFileSystem(RootDir, CasVisitor);
+ return Entries;
+};
+
+ //////////////////////////////////////////////////////////////////////////
#if ZEN_WITH_TESTS
diff --git a/zenstore/filecas.h b/zenstore/filecas.h
index 29c28560e..420b3a634 100644
--- a/zenstore/filecas.h
+++ b/zenstore/filecas.h
@@ -45,16 +45,26 @@ struct FileCasStrategy final : public GcStorage
virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; }
private:
+ void MakeIndexSnapshot();
+ uint64_t ReadIndexFile();
+ uint64_t ReadLog(uint64_t LogPosition);
+
+ struct IndexEntry
+ {
+ uint64_t Size = 0;
+ };
+ using IndexMap = tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher>;
+
CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
- std::filesystem::path m_RootDirectory;
- RwLock m_Lock;
- std::unordered_set<IoHash, IoHash::Hasher> m_KnownEntries;
- RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
- spdlog::logger& m_Log;
- spdlog::logger& Log() { return m_Log; }
- std::atomic_uint64_t m_TotalSize{};
- bool m_IsInitialized = false;
+ std::filesystem::path m_RootDirectory;
+ RwLock m_Lock;
+ IndexMap m_Index;
+ RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
+ spdlog::logger& m_Log;
+ spdlog::logger& Log() { return m_Log; }
+ std::atomic_uint64_t m_TotalSize{};
+ bool m_IsInitialized = false;
struct FileCasIndexEntry
{
@@ -66,13 +76,16 @@ private:
uint32_t Flags = 0;
uint64_t Size = 0;
};
+ static bool ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason);
+ static std::vector<FileCasStrategy::FileCasIndexEntry> ScanFolderForCasFiles(const std::filesystem::path& RootDir);
static_assert(sizeof(FileCasIndexEntry) == 32);
TCasLogFile<FileCasIndexEntry> m_CasLog;
+ uint64_t m_LogFlushPosition = 0;
inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; }
- void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback);
+ void IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback);
void DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec);
struct ShardingHelper