aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-04 12:05:25 +0200
committerDan Engelbrecht <[email protected]>2022-04-04 12:05:25 +0200
commit605b4f330eed43b14135f37ffb58c14fa1cd79c2 (patch)
treee8c99f26abb0634ecf53afe76cebd6f0c05742ca
parentlogging cleanup (diff)
downloadzen-605b4f330eed43b14135f37ffb58c14fa1cd79c2.tar.xz
zen-605b4f330eed43b14135f37ffb58c14fa1cd79c2.zip
always keep full log but read from index snapshot location if available
-rw-r--r--zenserver/cache/structuredcachestore.cpp34
-rw-r--r--zenserver/projectstore.cpp45
-rw-r--r--zenstore/caslog.cpp13
-rw-r--r--zenstore/cidstore.cpp36
-rw-r--r--zenstore/compactcas.cpp190
-rw-r--r--zenstore/filecas.cpp34
-rw-r--r--zenstore/include/zenstore/caslog.h14
7 files changed, 185 insertions, 181 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index e22b06572..738e4c1fd 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -487,22 +487,24 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is
m_SobsFile.Open(SobsPath, IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite);
m_SlogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
- m_SlogFile.Replay([&](const DiskIndexEntry& Entry) {
- if (Entry.Key == IoHash::Zero)
- {
- ++InvalidEntryCount;
- }
- else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
- {
- m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
- }
- else
- {
- m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
- m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed);
- }
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size());
- });
+ m_SlogFile.Replay(
+ [&](const DiskIndexEntry& Entry) {
+ if (Entry.Key == IoHash::Zero)
+ {
+ ++InvalidEntryCount;
+ }
+ else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
+ {
+ m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ }
+ else
+ {
+ m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
+ m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed);
+ }
+ MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size());
+ },
+ 0);
if (InvalidEntryCount)
{
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index bb8adb19f..617f50660 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -181,36 +181,39 @@ struct ProjectStore::OplogStorage : public RefCounted
uint64_t InvalidEntries = 0;
- m_Oplog.Replay([&](const zen::OplogEntry& LogEntry) {
- if (LogEntry.OpCoreSize == 0)
- {
- ++InvalidEntries;
+ m_Oplog.Replay(
+ [&](const zen::OplogEntry& LogEntry) {
+ if (LogEntry.OpCoreSize == 0)
+ {
+ ++InvalidEntries;
- return;
- }
+ return;
+ }
- IoBuffer OpBuffer(LogEntry.OpCoreSize);
+ IoBuffer OpBuffer(LogEntry.OpCoreSize);
- const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
+ const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
- m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
+ m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
- // Verify checksum, ignore op data if incorrect
- const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), OpBuffer.Size()) & 0xffffFFFF);
+ // Verify checksum, ignore op data if incorrect
+ const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), OpBuffer.Size()) & 0xffffFFFF);
- if (OpCoreHash != LogEntry.OpCoreHash)
- {
- ZEN_WARN("skipping oplog entry with bad checksum!");
- return;
- }
+ if (OpCoreHash != LogEntry.OpCoreHash)
+ {
+ ZEN_WARN("skipping oplog entry with bad checksum!");
+ return;
+ }
- CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), OpBuffer.Size()));
+ CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), OpBuffer.Size()));
- m_NextOpsOffset = Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign));
- m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn);
+ m_NextOpsOffset =
+ Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign));
+ m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn);
- Handler(Op, LogEntry);
- });
+ Handler(Op, LogEntry);
+ },
+ 0);
if (InvalidEntries)
{
diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp
index e080d6771..03a56f010 100644
--- a/zenstore/caslog.cpp
+++ b/zenstore/caslog.cpp
@@ -123,7 +123,7 @@ CasLogFile::GetLogSize()
uint64_t
CasLogFile::GetLogCount()
{
- uint64_t LogFileSize = m_File.FileSize();
+ uint64_t LogFileSize = m_AppendOffset.load(std::memory_order_acquire);
if (LogFileSize < sizeof(FileHeader))
{
return 0;
@@ -134,19 +134,22 @@ CasLogFile::GetLogCount()
}
void
-CasLogFile::Replay(std::function<void(const void*)>&& Handler)
+CasLogFile::Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount)
{
uint64_t LogFileSize = m_File.FileSize();
// Ensure we end up on a clean boundary
- const uint64_t LogBaseOffset = sizeof(FileHeader);
- const size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize;
+ uint64_t LogBaseOffset = sizeof(FileHeader);
+ size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize;
- if (LogEntryCount == 0)
+ if (LogEntryCount <= SkipEntryCount)
{
return;
}
+ LogBaseOffset += SkipEntryCount * m_RecordSize;
+ LogEntryCount -= SkipEntryCount;
+
// This should really be streaming the data rather than just
// reading it into memory, though we don't tend to get very
// large logs so it may not matter
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index 482a89b9d..509d21abe 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -134,28 +134,30 @@ struct CidStore::Impl
uint64_t TombstoneCount = 0;
uint64_t InvalidCount = 0;
- m_LogFile.Replay([&](const IndexEntry& Entry) {
- if (Entry.Compressed != IoHash::Zero)
- {
- // Update
- m_CidMap.insert_or_assign(Entry.Uncompressed, Entry.Compressed);
- }
- else
- {
- if (Entry.Uncompressed != IoHash::Zero)
+ m_LogFile.Replay(
+ [&](const IndexEntry& Entry) {
+ if (Entry.Compressed != IoHash::Zero)
{
- // Tombstone
- m_CidMap.erase(Entry.Uncompressed);
- ++TombstoneCount;
+ // Update
+ m_CidMap.insert_or_assign(Entry.Uncompressed, Entry.Compressed);
}
else
{
- // Completely uninitialized entry with both hashes set to zero indicates a
- // problem. Might be an unwritten page due to BSOD or some other problem
- ++InvalidCount;
+ if (Entry.Uncompressed != IoHash::Zero)
+ {
+ // Tombstone
+ m_CidMap.erase(Entry.Uncompressed);
+ ++TombstoneCount;
+ }
+ else
+ {
+ // Completely uninitialized entry with both hashes set to zero indicates a
+ // problem. Might be an unwritten page due to BSOD or some other problem
+ ++InvalidCount;
+ }
}
- }
- });
+ },
+ 0);
ZEN_INFO("CID index initialized: {} entries found ({} tombstones, {} invalid)", m_CidMap.size(), TombstoneCount, InvalidCount);
}
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 366ea5534..c5f9ed80a 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -12,6 +12,8 @@
#include <zencore/workthreadpool.h>
#include <gsl/gsl-lite.hpp>
+#include <xxhash.h>
+
#if ZEN_WITH_TESTS
# include <zencore/compactbinarybuilder.h>
# include <zencore/testing.h>
@@ -27,15 +29,20 @@ namespace zen {
struct CasDiskIndexHeader
{
- static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
- static constexpr uint32_t CurrentVersion = 1;
- uint32_t Magic = ExpectedMagic;
- uint32_t Version = CurrentVersion;
- uint32_t PayloadAlignment = 0;
- uint32_t Reserved0 = 0;
- uint64_t EntryCount = 0;
- uint32_t Reserved1 = 0;
- uint32_t Reserved2 = 0;
+ static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
+ static constexpr uint32_t CurrentVersion = 1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t PayloadAlignment = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
};
static_assert(sizeof(CasDiskIndexHeader) == 32);
@@ -171,7 +178,8 @@ namespace {
std::vector<CasDiskIndexEntry> ReadIndexFile(const std::filesystem::path& RootDirectory,
const std::string& ContainerBaseName,
- uint64_t& InOutPayloadAlignment)
+ uint64_t& InOutPayloadAlignment,
+ uint64_t& OutLogPosition)
{
std::vector<CasDiskIndexEntry> Entries;
std::filesystem::path SidxPath = GetIndexPath(RootDirectory, ContainerBaseName);
@@ -193,12 +201,18 @@ namespace {
uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry);
CasDiskIndexHeader Header;
ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if (Header.Magic == CasDiskIndexHeader::ExpectedMagic && Header.Version == CasDiskIndexHeader::CurrentVersion &&
- Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount)
+ if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) &&
+ (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
+ (Header.EntryCount <= ExpectedEntryCount))
{
Entries.resize(Header.EntryCount);
ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
InOutPayloadAlignment = Header.PayloadAlignment;
+ OutLogPosition = Header.LogPosition;
+ }
+ else
+ {
+ ZEN_WARN("skipping invalid index file '{}'", SidxPath);
}
}
}
@@ -267,7 +281,9 @@ namespace {
return true;
}
- std::vector<CasDiskIndexEntry> ReadLog(const std::filesystem::path& RootDirectory, const std::string& ContainerBaseName)
+ std::vector<CasDiskIndexEntry> ReadLog(const std::filesystem::path& RootDirectory,
+ const std::string& ContainerBaseName,
+ uint64_t SkipEntryCount)
{
std::vector<CasDiskIndexEntry> Entries;
std::filesystem::path SlogPath = GetLogPath(RootDirectory, ContainerBaseName);
@@ -285,8 +301,14 @@ namespace {
CasLog.Open(SlogPath, CasLogFile::Mode::kRead);
if (CasLog.Initialize())
{
- Entries.reserve(CasLog.GetLogCount());
- CasLog.Replay([&](const CasDiskIndexEntry& Record) { Entries.push_back(Record); });
+ uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", SlogPath);
+ SkipEntryCount = 0;
+ }
+ Entries.reserve(EntryCount - SkipEntryCount);
+ CasLog.Replay([&](const CasDiskIndexEntry& Record) { Entries.push_back(Record); }, SkipEntryCount);
}
}
return Entries;
@@ -362,24 +384,26 @@ namespace {
if (LegacyCasLog.Initialize())
{
LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount());
- LegacyCasLog.Replay([&](const LegacyCasDiskIndexEntry& Record) {
- std::string InvalidEntryReason;
- if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone)
- {
- LegacyDiskIndex.erase(Record.Key);
- return;
- }
- if (!ValidateLegacyEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
- return;
- }
- if (ExistingChunks.contains(Record.Key))
- {
- return;
- }
- LegacyDiskIndex[Record.Key] = Record;
- });
+ LegacyCasLog.Replay(
+ [&](const LegacyCasDiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone)
+ {
+ LegacyDiskIndex.erase(Record.Key);
+ return;
+ }
+ if (!ValidateLegacyEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
+ return;
+ }
+ if (ExistingChunks.contains(Record.Key))
+ {
+ return;
+ }
+ LegacyDiskIndex[Record.Key] = Record;
+ },
+ 0);
}
}
@@ -1308,7 +1332,7 @@ CasContainerStrategy::MakeIndexSnapshot()
uint64_t EntryCount = 0;
Stopwatch Timer;
const auto _ = MakeGuard([this, &EntryCount, &Timer] {
- ZEN_INFO("write store snapshot for '{}' containing #{} entries in {}",
+ ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}",
m_Config.RootDirectory / m_ContainerBaseName,
EntryCount,
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
@@ -1316,13 +1340,10 @@ CasContainerStrategy::MakeIndexSnapshot()
namespace fs = std::filesystem;
- fs::path SlogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
- fs::path SidxPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
- fs::path STmplogPath = GetTempLogPath(m_Config.RootDirectory, m_ContainerBaseName);
- fs::path STmpSidxPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
- fs::path SRecoveredlogPath = GetRecoverLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+ fs::path SidxPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
+ fs::path STmpSidxPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
- // Index away, we keep it if something goes wrong
+ // Move index away, we keep it if something goes wrong
if (fs::is_regular_file(STmpSidxPath))
{
fs::remove(STmpSidxPath);
@@ -1332,31 +1353,17 @@ CasContainerStrategy::MakeIndexSnapshot()
fs::rename(SidxPath, STmpSidxPath);
}
- // Move cas away, we keep it if something goes wrong, any new chunks will be added to the new log
+ try
{
- RwLock::ExclusiveLockScope __(m_InsertLock);
- RwLock::ExclusiveLockScope ___(m_LocationMapLock);
m_CasLog.Flush();
- m_CasLog.Close();
-
- if (fs::is_regular_file(STmplogPath))
- {
- fs::remove(STmplogPath);
- }
- fs::rename(SlogPath, STmplogPath);
-
- // Open an new log
- m_CasLog.Open(SlogPath, CasLogFile::Mode::kTruncate);
- }
-
- try
- {
// Write the current state of the location map to a new index state
+ uint64_t LogCount = 0;
std::vector<CasDiskIndexEntry> Entries;
{
- RwLock::SharedLockScope __(m_LocationMapLock);
+ RwLock::SharedLockScope __(m_InsertLock);
+ RwLock::SharedLockScope ___(m_LocationMapLock);
Entries.resize(m_LocationMap.size());
uint64_t EntryIndex = 0;
@@ -1366,11 +1373,18 @@ CasContainerStrategy::MakeIndexSnapshot()
IndexEntry.Key = Entry.first;
IndexEntry.Location = Entry.second;
}
+
+ LogCount = m_CasLog.GetLogCount();
}
BasicFile ObjectIndexFile;
ObjectIndexFile.Open(SidxPath, BasicFile::Mode::kTruncate);
- CasDiskIndexHeader Header = {.PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment), .EntryCount = Entries.size()};
+ CasDiskIndexHeader Header = {.EntryCount = Entries.size(),
+ .LogPosition = LogCount,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
+
+ Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header);
+
ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0);
ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry));
ObjectIndexFile.Flush();
@@ -1381,40 +1395,8 @@ CasContainerStrategy::MakeIndexSnapshot()
{
ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
- // Reconstruct the log from old log and any added log entries
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- if (fs::is_regular_file(STmplogPath))
- {
- std::vector<CasDiskIndexEntry> Records;
- Records.reserve(m_LocationMap.size());
- {
- TCasLogFile<CasDiskIndexEntry> OldCasLog;
- OldCasLog.Open(STmplogPath, CasLogFile::Mode::kRead);
- Records.reserve(OldCasLog.GetLogCount());
- OldCasLog.Replay([&](const CasDiskIndexEntry& Record) { Records.push_back(Record); });
- }
- {
- Records.reserve(Records.size() + m_CasLog.GetLogCount());
- m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { Records.push_back(Record); });
- }
-
- TCasLogFile<CasDiskIndexEntry> RecoveredCasLog;
- RecoveredCasLog.Open(SRecoveredlogPath, CasLogFile::Mode::kWrite);
- RecoveredCasLog.Append(Records);
- RecoveredCasLog.Flush();
- RecoveredCasLog.Close();
-
- fs::remove(SlogPath);
- fs::rename(SRecoveredlogPath, SlogPath);
- fs::remove(STmplogPath);
- }
-
- if (fs::is_regular_file(SidxPath))
- {
- fs::remove(SidxPath);
- }
-
// Restore any previous snapshot
+
if (fs::is_regular_file(STmpSidxPath))
{
fs::remove(SidxPath);
@@ -1425,10 +1407,6 @@ CasContainerStrategy::MakeIndexSnapshot()
{
fs::remove(STmpSidxPath);
}
- if (fs::is_regular_file(STmplogPath))
- {
- fs::remove(STmplogPath);
- }
}
void
@@ -1452,9 +1430,11 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
std::filesystem::remove_all(BasePath);
}
+ uint64_t LogPosition = 0;
{
- std::vector<CasDiskIndexEntry> IndexEntries = ReadIndexFile(m_Config.RootDirectory, m_ContainerBaseName, m_PayloadAlignment);
- std::string InvalidEntryReason;
+ std::vector<CasDiskIndexEntry> IndexEntries =
+ ReadIndexFile(m_Config.RootDirectory, m_ContainerBaseName, m_PayloadAlignment, LogPosition);
+ std::string InvalidEntryReason;
for (const CasDiskIndexEntry& Entry : IndexEntries)
{
if (!ValidateEntry(Entry, InvalidEntryReason))
@@ -1469,7 +1449,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
}
{
- std::vector<CasDiskIndexEntry> LogEntries = ReadLog(m_Config.RootDirectory, m_ContainerBaseName);
+ std::vector<CasDiskIndexEntry> LogEntries = ReadLog(m_Config.RootDirectory, m_ContainerBaseName, LogPosition);
std::string InvalidEntryReason;
for (const CasDiskIndexEntry& Entry : LogEntries)
{
@@ -1747,6 +1727,16 @@ TEST_CASE("compactcas.compact.totalsize")
const uint64_t TotalSize = Cas.StorageSize().DiskSize;
CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
}
+
+ // Re-open again, this time we should have a snapshot
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 65536, 16, false);
+
+ const uint64_t TotalSize = Cas.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+ }
}
}
@@ -2249,7 +2239,7 @@ TEST_CASE("compactcas.legacyconversion")
TCasLogFile<CasDiskIndexEntry> CasLog;
CasLog.Open(SlogPath, CasLogFile::Mode::kRead);
LogEntries.reserve(CasLog.GetLogCount());
- CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); });
+ CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0);
}
TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog;
std::filesystem::path SLegacylogPath = GetLegacyLogPath(CasConfig.RootDirectory, "test");
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index 6b0082ab0..b53cfaa54 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -98,26 +98,28 @@ FileCasStrategy::Initialize(bool IsNewStore)
std::unordered_set<IoHash> FoundEntries;
FoundEntries.reserve(10000);
- m_CasLog.Replay([&](const FileCasIndexEntry& Entry) {
- if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone))
- {
- if (!FoundEntries.contains(Entry.Key))
+ m_CasLog.Replay(
+ [&](const FileCasIndexEntry& Entry) {
+ if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone))
{
- return;
+ if (!FoundEntries.contains(Entry.Key))
+ {
+ return;
+ }
+ m_TotalSize.fetch_sub(Entry.Size, std::memory_order_relaxed);
+ FoundEntries.erase(Entry.Key);
}
- m_TotalSize.fetch_sub(Entry.Size, std::memory_order_relaxed);
- FoundEntries.erase(Entry.Key);
- }
- else
- {
- if (FoundEntries.contains(Entry.Key))
+ else
{
- return;
+ if (FoundEntries.contains(Entry.Key))
+ {
+ return;
+ }
+ FoundEntries.insert(Entry.Key);
+ m_TotalSize.fetch_add(Entry.Size, std::memory_order_relaxed);
}
- FoundEntries.insert(Entry.Key);
- m_TotalSize.fetch_add(Entry.Size, std::memory_order_relaxed);
- }
- });
+ },
+ 0);
}
CasStore::InsertResult
diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h
index d09f4befb..4b93a708f 100644
--- a/zenstore/include/zenstore/caslog.h
+++ b/zenstore/include/zenstore/caslog.h
@@ -24,7 +24,7 @@ public:
void Open(std::filesystem::path FileName, size_t RecordSize, Mode Mode);
void Append(const void* DataPointer, uint64_t DataSize);
- void Replay(std::function<void(const void*)>&& Handler);
+ void Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount);
void Flush();
void Close();
uint64_t GetLogSize();
@@ -66,13 +66,15 @@ public:
// This should be called before the Replay() is called to do some basic sanity checking
bool Initialize() { return true; }
- void Replay(Invocable<const T&> auto Handler)
+ void Replay(Invocable<const T&> auto Handler, uint64_t SkipEntryCount)
{
- CasLogFile::Replay([&](const void* VoidPtr) {
- const T& Record = *reinterpret_cast<const T*>(VoidPtr);
+ CasLogFile::Replay(
+ [&](const void* VoidPtr) {
+ const T& Record = *reinterpret_cast<const T*>(VoidPtr);
- Handler(Record);
- });
+ Handler(Record);
+ },
+ SkipEntryCount);
}
void Append(const T& Record)