diff options
| author | Dan Engelbrecht <[email protected]> | 2022-04-04 12:05:25 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-04-04 12:05:25 +0200 |
| commit | 605b4f330eed43b14135f37ffb58c14fa1cd79c2 (patch) | |
| tree | e8c99f26abb0634ecf53afe76cebd6f0c05742ca | |
| parent | logging cleanup (diff) | |
| download | zen-605b4f330eed43b14135f37ffb58c14fa1cd79c2.tar.xz zen-605b4f330eed43b14135f37ffb58c14fa1cd79c2.zip | |
always keep full log but read from index snapshot location if available
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 34 | ||||
| -rw-r--r-- | zenserver/projectstore.cpp | 45 | ||||
| -rw-r--r-- | zenstore/caslog.cpp | 13 | ||||
| -rw-r--r-- | zenstore/cidstore.cpp | 36 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 190 | ||||
| -rw-r--r-- | zenstore/filecas.cpp | 34 | ||||
| -rw-r--r-- | zenstore/include/zenstore/caslog.h | 14 |
7 files changed, 185 insertions, 181 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index e22b06572..738e4c1fd 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -487,22 +487,24 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is m_SobsFile.Open(SobsPath, IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); m_SlogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); - m_SlogFile.Replay([&](const DiskIndexEntry& Entry) { - if (Entry.Key == IoHash::Zero) - { - ++InvalidEntryCount; - } - else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) - { - m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); - } - else - { - m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); - m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); - } - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size()); - }); + m_SlogFile.Replay( + [&](const DiskIndexEntry& Entry) { + if (Entry.Key == IoHash::Zero) + { + ++InvalidEntryCount; + } + else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + { + m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + } + else + { + m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); + } + MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size()); + }, + 0); if (InvalidEntryCount) { diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index bb8adb19f..617f50660 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -181,36 +181,39 @@ struct ProjectStore::OplogStorage : public RefCounted uint64_t InvalidEntries = 0; - m_Oplog.Replay([&](const zen::OplogEntry& LogEntry) { - if (LogEntry.OpCoreSize == 0) - { - ++InvalidEntries; + m_Oplog.Replay( + [&](const zen::OplogEntry& LogEntry) { + if (LogEntry.OpCoreSize == 0) + { + ++InvalidEntries; - return; - } + return; + } - IoBuffer OpBuffer(LogEntry.OpCoreSize); + IoBuffer OpBuffer(LogEntry.OpCoreSize); - const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; + const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; - m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); + m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); - // Verify checksum, ignore op data if incorrect - const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), OpBuffer.Size()) & 0xffffFFFF); + // Verify checksum, ignore op data if incorrect + const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), OpBuffer.Size()) & 0xffffFFFF); - if (OpCoreHash != LogEntry.OpCoreHash) - { - ZEN_WARN("skipping oplog entry with bad checksum!"); - return; - } + if (OpCoreHash != LogEntry.OpCoreHash) + { + ZEN_WARN("skipping oplog entry with bad checksum!"); + return; + } - CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), OpBuffer.Size())); + CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), OpBuffer.Size())); - m_NextOpsOffset = Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); - m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn); + m_NextOpsOffset = + Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); + m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn); - Handler(Op, LogEntry); - }); + Handler(Op, LogEntry); + }, + 0); if (InvalidEntries) { diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp index e080d6771..03a56f010 100644 --- a/zenstore/caslog.cpp +++ b/zenstore/caslog.cpp @@ -123,7 +123,7 @@ CasLogFile::GetLogSize() uint64_t CasLogFile::GetLogCount() { - uint64_t LogFileSize = m_File.FileSize(); + uint64_t LogFileSize = m_AppendOffset.load(std::memory_order_acquire); if (LogFileSize < sizeof(FileHeader)) { return 0; @@ -134,19 +134,22 @@ CasLogFile::GetLogCount() } void -CasLogFile::Replay(std::function<void(const void*)>&& Handler) +CasLogFile::Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount) { uint64_t LogFileSize = m_File.FileSize(); // Ensure we end up on a clean boundary - const uint64_t LogBaseOffset = sizeof(FileHeader); - const size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize; + uint64_t LogBaseOffset = sizeof(FileHeader); + size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize; - if (LogEntryCount == 0) + if (LogEntryCount <= SkipEntryCount) { return; } + LogBaseOffset += SkipEntryCount * m_RecordSize; + LogEntryCount -= SkipEntryCount; + // This should really be streaming the data rather than just // reading it into memory, though we don't tend to get very // large logs so it may not matter diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index 482a89b9d..509d21abe 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -134,28 +134,30 @@ struct CidStore::Impl uint64_t TombstoneCount = 0; uint64_t InvalidCount = 0; - m_LogFile.Replay([&](const IndexEntry& Entry) { - if (Entry.Compressed != IoHash::Zero) - { - // Update - m_CidMap.insert_or_assign(Entry.Uncompressed, Entry.Compressed); - } - else - { - if (Entry.Uncompressed != IoHash::Zero) + m_LogFile.Replay( + [&](const IndexEntry& Entry) { + if (Entry.Compressed != IoHash::Zero) { - // Tombstone - m_CidMap.erase(Entry.Uncompressed); - ++TombstoneCount; + // Update + m_CidMap.insert_or_assign(Entry.Uncompressed, Entry.Compressed); } else { - // Completely uninitialized entry with both hashes set to zero indicates a - // problem. Might be an unwritten page due to BSOD or some other problem - ++InvalidCount; + if (Entry.Uncompressed != IoHash::Zero) + { + // Tombstone + m_CidMap.erase(Entry.Uncompressed); + ++TombstoneCount; + } + else + { + // Completely uninitialized entry with both hashes set to zero indicates a + // problem. Might be an unwritten page due to BSOD or some other problem + ++InvalidCount; + } } - } - }); + }, + 0); ZEN_INFO("CID index initialized: {} entries found ({} tombstones, {} invalid)", m_CidMap.size(), TombstoneCount, InvalidCount); } diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 366ea5534..c5f9ed80a 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -12,6 +12,8 @@ #include <zencore/workthreadpool.h> #include <gsl/gsl-lite.hpp> +#include <xxhash.h> + #if ZEN_WITH_TESTS # include <zencore/compactbinarybuilder.h> # include <zencore/testing.h> @@ -27,15 +29,20 @@ namespace zen { struct CasDiskIndexHeader { - static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; - static constexpr uint32_t CurrentVersion = 1; - uint32_t Magic = ExpectedMagic; - uint32_t Version = CurrentVersion; - uint32_t PayloadAlignment = 0; - uint32_t Reserved0 = 0; - uint64_t EntryCount = 0; - uint32_t Reserved1 = 0; - uint32_t Reserved2 = 0; + static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; + static constexpr uint32_t CurrentVersion = 1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t PayloadAlignment = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } }; static_assert(sizeof(CasDiskIndexHeader) == 32); @@ -171,7 +178,8 @@ namespace { std::vector<CasDiskIndexEntry> ReadIndexFile(const std::filesystem::path& RootDirectory, const std::string& ContainerBaseName, - uint64_t& InOutPayloadAlignment) + uint64_t& InOutPayloadAlignment, + uint64_t& OutLogPosition) { std::vector<CasDiskIndexEntry> Entries; std::filesystem::path SidxPath = GetIndexPath(RootDirectory, ContainerBaseName); @@ -193,12 +201,18 @@ namespace { uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); CasDiskIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if (Header.Magic == CasDiskIndexHeader::ExpectedMagic && Header.Version == CasDiskIndexHeader::CurrentVersion && - Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount) + if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) && + (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && + (Header.EntryCount <= ExpectedEntryCount)) { Entries.resize(Header.EntryCount); ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); InOutPayloadAlignment = Header.PayloadAlignment; + OutLogPosition = Header.LogPosition; + } + else + { + ZEN_WARN("skipping invalid index file '{}'", SidxPath); } } } @@ -267,7 +281,9 @@ namespace { return true; } - std::vector<CasDiskIndexEntry> ReadLog(const std::filesystem::path& RootDirectory, const std::string& ContainerBaseName) + std::vector<CasDiskIndexEntry> ReadLog(const std::filesystem::path& RootDirectory, + const std::string& ContainerBaseName, + uint64_t SkipEntryCount) { std::vector<CasDiskIndexEntry> Entries; std::filesystem::path SlogPath = GetLogPath(RootDirectory, ContainerBaseName); @@ -285,8 +301,14 @@ namespace { CasLog.Open(SlogPath, CasLogFile::Mode::kRead); if (CasLog.Initialize()) { - Entries.reserve(CasLog.GetLogCount()); - CasLog.Replay([&](const CasDiskIndexEntry& Record) { Entries.push_back(Record); }); + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", SlogPath); + SkipEntryCount = 0; + } + Entries.reserve(EntryCount - SkipEntryCount); + CasLog.Replay([&](const CasDiskIndexEntry& Record) { Entries.push_back(Record); }, SkipEntryCount); } } return Entries; @@ -362,24 +384,26 @@ namespace { if (LegacyCasLog.Initialize()) { LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount()); - LegacyCasLog.Replay([&](const LegacyCasDiskIndexEntry& Record) { - std::string InvalidEntryReason; - if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone) - { - LegacyDiskIndex.erase(Record.Key); - return; - } - if (!ValidateLegacyEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason); - return; - } - if (ExistingChunks.contains(Record.Key)) - { - return; - } - LegacyDiskIndex[Record.Key] = Record; - }); + LegacyCasLog.Replay( + [&](const LegacyCasDiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone) + { + LegacyDiskIndex.erase(Record.Key); + return; + } + if (!ValidateLegacyEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason); + return; + } + if (ExistingChunks.contains(Record.Key)) + { + return; + } + LegacyDiskIndex[Record.Key] = Record; + }, + 0); } } @@ -1308,7 +1332,7 @@ CasContainerStrategy::MakeIndexSnapshot() uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([this, &EntryCount, &Timer] { - ZEN_INFO("write store snapshot for '{}' containing #{} entries in {}", + ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}", m_Config.RootDirectory / m_ContainerBaseName, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); @@ -1316,13 +1340,10 @@ CasContainerStrategy::MakeIndexSnapshot() namespace fs = std::filesystem; - fs::path SlogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); - fs::path SidxPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName); - fs::path STmplogPath = GetTempLogPath(m_Config.RootDirectory, m_ContainerBaseName); - fs::path STmpSidxPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName); - fs::path SRecoveredlogPath = GetRecoverLogPath(m_Config.RootDirectory, m_ContainerBaseName); + fs::path SidxPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName); + fs::path STmpSidxPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName); - // Index away, we keep it if something goes wrong + // Move index away, we keep it if something goes wrong if (fs::is_regular_file(STmpSidxPath)) { fs::remove(STmpSidxPath); @@ -1332,31 +1353,17 @@ CasContainerStrategy::MakeIndexSnapshot() fs::rename(SidxPath, STmpSidxPath); } - // Move cas away, we keep it if something goes wrong, any new chunks will be added to the new log + try { - RwLock::ExclusiveLockScope __(m_InsertLock); - RwLock::ExclusiveLockScope ___(m_LocationMapLock); m_CasLog.Flush(); - m_CasLog.Close(); - - if (fs::is_regular_file(STmplogPath)) - { - fs::remove(STmplogPath); - } - fs::rename(SlogPath, STmplogPath); - - // Open an new log - m_CasLog.Open(SlogPath, CasLogFile::Mode::kTruncate); - } - - try - { // Write the current state of the location map to a new index state + uint64_t LogCount = 0; std::vector<CasDiskIndexEntry> Entries; { - RwLock::SharedLockScope __(m_LocationMapLock); + RwLock::SharedLockScope __(m_InsertLock); + RwLock::SharedLockScope ___(m_LocationMapLock); Entries.resize(m_LocationMap.size()); uint64_t EntryIndex = 0; @@ -1366,11 +1373,18 @@ CasContainerStrategy::MakeIndexSnapshot() IndexEntry.Key = Entry.first; IndexEntry.Location = Entry.second; } + + LogCount = m_CasLog.GetLogCount(); } BasicFile ObjectIndexFile; ObjectIndexFile.Open(SidxPath, BasicFile::Mode::kTruncate); - CasDiskIndexHeader Header = {.PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment), .EntryCount = Entries.size()}; + CasDiskIndexHeader Header = {.EntryCount = Entries.size(), + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; + + Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header); + ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0); ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry)); ObjectIndexFile.Flush(); @@ -1381,40 +1395,8 @@ CasContainerStrategy::MakeIndexSnapshot() { ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); - // Reconstruct the log from old log and any added log entries - RwLock::ExclusiveLockScope __(m_LocationMapLock); - if (fs::is_regular_file(STmplogPath)) - { - std::vector<CasDiskIndexEntry> Records; - Records.reserve(m_LocationMap.size()); - { - TCasLogFile<CasDiskIndexEntry> OldCasLog; - OldCasLog.Open(STmplogPath, CasLogFile::Mode::kRead); - Records.reserve(OldCasLog.GetLogCount()); - OldCasLog.Replay([&](const CasDiskIndexEntry& Record) { Records.push_back(Record); }); - } - { - Records.reserve(Records.size() + m_CasLog.GetLogCount()); - m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { Records.push_back(Record); }); - } - - TCasLogFile<CasDiskIndexEntry> RecoveredCasLog; - RecoveredCasLog.Open(SRecoveredlogPath, CasLogFile::Mode::kWrite); - RecoveredCasLog.Append(Records); - RecoveredCasLog.Flush(); - RecoveredCasLog.Close(); - - fs::remove(SlogPath); - fs::rename(SRecoveredlogPath, SlogPath); - fs::remove(STmplogPath); - } - - if (fs::is_regular_file(SidxPath)) - { - fs::remove(SidxPath); - } - // Restore any previous snapshot + if (fs::is_regular_file(STmpSidxPath)) { fs::remove(SidxPath); @@ -1425,10 +1407,6 @@ CasContainerStrategy::MakeIndexSnapshot() { fs::remove(STmpSidxPath); } - if (fs::is_regular_file(STmplogPath)) - { - fs::remove(STmplogPath); - } } void @@ -1452,9 +1430,11 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) std::filesystem::remove_all(BasePath); } + uint64_t LogPosition = 0; { - std::vector<CasDiskIndexEntry> IndexEntries = ReadIndexFile(m_Config.RootDirectory, m_ContainerBaseName, m_PayloadAlignment); - std::string InvalidEntryReason; + std::vector<CasDiskIndexEntry> IndexEntries = + ReadIndexFile(m_Config.RootDirectory, m_ContainerBaseName, m_PayloadAlignment, LogPosition); + std::string InvalidEntryReason; for (const CasDiskIndexEntry& Entry : IndexEntries) { if (!ValidateEntry(Entry, InvalidEntryReason)) @@ -1469,7 +1449,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) } { - std::vector<CasDiskIndexEntry> LogEntries = ReadLog(m_Config.RootDirectory, m_ContainerBaseName); + std::vector<CasDiskIndexEntry> LogEntries = ReadLog(m_Config.RootDirectory, m_ContainerBaseName, LogPosition); std::string InvalidEntryReason; for (const CasDiskIndexEntry& Entry : LogEntries) { @@ -1747,6 +1727,16 @@ TEST_CASE("compactcas.compact.totalsize") const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } + + // Re-open again, this time we should have a snapshot + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 65536, 16, false); + + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + } } } @@ -2249,7 +2239,7 @@ TEST_CASE("compactcas.legacyconversion") TCasLogFile<CasDiskIndexEntry> CasLog; CasLog.Open(SlogPath, CasLogFile::Mode::kRead); LogEntries.reserve(CasLog.GetLogCount()); - CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); }); + CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0); } TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog; std::filesystem::path SLegacylogPath = GetLegacyLogPath(CasConfig.RootDirectory, "test"); diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index 6b0082ab0..b53cfaa54 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -98,26 +98,28 @@ FileCasStrategy::Initialize(bool IsNewStore) std::unordered_set<IoHash> FoundEntries; FoundEntries.reserve(10000); - m_CasLog.Replay([&](const FileCasIndexEntry& Entry) { - if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone)) - { - if (!FoundEntries.contains(Entry.Key)) + m_CasLog.Replay( + [&](const FileCasIndexEntry& Entry) { + if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone)) { - return; + if (!FoundEntries.contains(Entry.Key)) + { + return; + } + m_TotalSize.fetch_sub(Entry.Size, std::memory_order_relaxed); + FoundEntries.erase(Entry.Key); } - m_TotalSize.fetch_sub(Entry.Size, std::memory_order_relaxed); - FoundEntries.erase(Entry.Key); - } - else - { - if (FoundEntries.contains(Entry.Key)) + else { - return; + if (FoundEntries.contains(Entry.Key)) + { + return; + } + FoundEntries.insert(Entry.Key); + m_TotalSize.fetch_add(Entry.Size, std::memory_order_relaxed); } - FoundEntries.insert(Entry.Key); - m_TotalSize.fetch_add(Entry.Size, std::memory_order_relaxed); - } - }); + }, + 0); } CasStore::InsertResult diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h index d09f4befb..4b93a708f 100644 --- a/zenstore/include/zenstore/caslog.h +++ b/zenstore/include/zenstore/caslog.h @@ -24,7 +24,7 @@ public: void Open(std::filesystem::path FileName, size_t RecordSize, Mode Mode); void Append(const void* DataPointer, uint64_t DataSize); - void Replay(std::function<void(const void*)>&& Handler); + void Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount); void Flush(); void Close(); uint64_t GetLogSize(); @@ -66,13 +66,15 @@ public: // This should be called before the Replay() is called to do some basic sanity checking bool Initialize() { return true; } - void Replay(Invocable<const T&> auto Handler) + void Replay(Invocable<const T&> auto Handler, uint64_t SkipEntryCount) { - CasLogFile::Replay([&](const void* VoidPtr) { - const T& Record = *reinterpret_cast<const T*>(VoidPtr); + CasLogFile::Replay( + [&](const void* VoidPtr) { + const T& Record = *reinterpret_cast<const T*>(VoidPtr); - Handler(Record); - }); + Handler(Record); + }, + SkipEntryCount); } void Append(const T& Record) |