diff options
| author | Dan Engelbrecht <[email protected]> | 2022-04-04 12:05:25 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-04-04 12:05:25 +0200 |
| commit | 605b4f330eed43b14135f37ffb58c14fa1cd79c2 (patch) | |
| tree | e8c99f26abb0634ecf53afe76cebd6f0c05742ca /zenstore/compactcas.cpp | |
| parent | logging cleanup (diff) | |
| download | zen-605b4f330eed43b14135f37ffb58c14fa1cd79c2.tar.xz zen-605b4f330eed43b14135f37ffb58c14fa1cd79c2.zip | |
always keep full log but read from index snapshot location if available
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 190 |
1 files changed, 90 insertions, 100 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 366ea5534..c5f9ed80a 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -12,6 +12,8 @@ #include <zencore/workthreadpool.h> #include <gsl/gsl-lite.hpp> +#include <xxhash.h> + #if ZEN_WITH_TESTS # include <zencore/compactbinarybuilder.h> # include <zencore/testing.h> @@ -27,15 +29,20 @@ namespace zen { struct CasDiskIndexHeader { - static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; - static constexpr uint32_t CurrentVersion = 1; - uint32_t Magic = ExpectedMagic; - uint32_t Version = CurrentVersion; - uint32_t PayloadAlignment = 0; - uint32_t Reserved0 = 0; - uint64_t EntryCount = 0; - uint32_t Reserved1 = 0; - uint32_t Reserved2 = 0; + static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; + static constexpr uint32_t CurrentVersion = 1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t PayloadAlignment = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } }; static_assert(sizeof(CasDiskIndexHeader) == 32); @@ -171,7 +178,8 @@ namespace { std::vector<CasDiskIndexEntry> ReadIndexFile(const std::filesystem::path& RootDirectory, const std::string& ContainerBaseName, - uint64_t& InOutPayloadAlignment) + uint64_t& InOutPayloadAlignment, + uint64_t& OutLogPosition) { std::vector<CasDiskIndexEntry> Entries; std::filesystem::path SidxPath = GetIndexPath(RootDirectory, ContainerBaseName); @@ -193,12 +201,18 @@ namespace { uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); CasDiskIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if (Header.Magic == CasDiskIndexHeader::ExpectedMagic && Header.Version == CasDiskIndexHeader::CurrentVersion && - Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount) + if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) && + (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && + (Header.EntryCount <= ExpectedEntryCount)) { Entries.resize(Header.EntryCount); ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); InOutPayloadAlignment = Header.PayloadAlignment; + OutLogPosition = Header.LogPosition; + } + else + { + ZEN_WARN("skipping invalid index file '{}'", SidxPath); } } } @@ -267,7 +281,9 @@ namespace { return true; } - std::vector<CasDiskIndexEntry> ReadLog(const std::filesystem::path& RootDirectory, const std::string& ContainerBaseName) + std::vector<CasDiskIndexEntry> ReadLog(const std::filesystem::path& RootDirectory, + const std::string& ContainerBaseName, + uint64_t SkipEntryCount) { std::vector<CasDiskIndexEntry> Entries; std::filesystem::path SlogPath = GetLogPath(RootDirectory, ContainerBaseName); @@ -285,8 +301,14 @@ namespace { CasLog.Open(SlogPath, CasLogFile::Mode::kRead); if (CasLog.Initialize()) { - Entries.reserve(CasLog.GetLogCount()); - CasLog.Replay([&](const CasDiskIndexEntry& Record) { Entries.push_back(Record); }); + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", SlogPath); + SkipEntryCount = 0; + } + Entries.reserve(EntryCount - SkipEntryCount); + CasLog.Replay([&](const CasDiskIndexEntry& Record) { Entries.push_back(Record); }, SkipEntryCount); } } return Entries; @@ -362,24 +384,26 @@ namespace { if (LegacyCasLog.Initialize()) { LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount()); - LegacyCasLog.Replay([&](const LegacyCasDiskIndexEntry& Record) { - std::string InvalidEntryReason; - if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone) - { - LegacyDiskIndex.erase(Record.Key); - return; - } - if (!ValidateLegacyEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason); - return; - } - if (ExistingChunks.contains(Record.Key)) - { - return; - } - LegacyDiskIndex[Record.Key] = Record; - }); + LegacyCasLog.Replay( + [&](const LegacyCasDiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone) + { + LegacyDiskIndex.erase(Record.Key); + return; + } + if (!ValidateLegacyEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason); + return; + } + if (ExistingChunks.contains(Record.Key)) + { + return; + } + LegacyDiskIndex[Record.Key] = Record; + }, + 0); } } @@ -1308,7 +1332,7 @@ CasContainerStrategy::MakeIndexSnapshot() uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([this, &EntryCount, &Timer] { - ZEN_INFO("write store snapshot for '{}' containing #{} entries in {}", + ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}", m_Config.RootDirectory / m_ContainerBaseName, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); @@ -1316,13 +1340,10 @@ CasContainerStrategy::MakeIndexSnapshot() namespace fs = std::filesystem; - fs::path SlogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); - fs::path SidxPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName); - fs::path STmplogPath = GetTempLogPath(m_Config.RootDirectory, m_ContainerBaseName); - fs::path STmpSidxPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName); - fs::path SRecoveredlogPath = GetRecoverLogPath(m_Config.RootDirectory, m_ContainerBaseName); + fs::path SidxPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName); + fs::path STmpSidxPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName); - // Index away, we keep it if something goes wrong + // Move index away, we keep it if something goes wrong if (fs::is_regular_file(STmpSidxPath)) { fs::remove(STmpSidxPath); @@ -1332,31 +1353,17 @@ CasContainerStrategy::MakeIndexSnapshot() fs::rename(SidxPath, STmpSidxPath); } - // Move cas away, we keep it if something goes wrong, any new chunks will be added to the new log + try { - RwLock::ExclusiveLockScope __(m_InsertLock); - RwLock::ExclusiveLockScope ___(m_LocationMapLock); m_CasLog.Flush(); - m_CasLog.Close(); - - if (fs::is_regular_file(STmplogPath)) - { - fs::remove(STmplogPath); - } - fs::rename(SlogPath, STmplogPath); - - // Open an new log - m_CasLog.Open(SlogPath, CasLogFile::Mode::kTruncate); - } - - try - { // Write the current state of the location map to a new index state + uint64_t LogCount = 0; std::vector<CasDiskIndexEntry> Entries; { - RwLock::SharedLockScope __(m_LocationMapLock); + RwLock::SharedLockScope __(m_InsertLock); + RwLock::SharedLockScope ___(m_LocationMapLock); Entries.resize(m_LocationMap.size()); uint64_t EntryIndex = 0; @@ -1366,11 +1373,18 @@ CasContainerStrategy::MakeIndexSnapshot() IndexEntry.Key = Entry.first; IndexEntry.Location = Entry.second; } + + LogCount = m_CasLog.GetLogCount(); } BasicFile ObjectIndexFile; ObjectIndexFile.Open(SidxPath, BasicFile::Mode::kTruncate); - CasDiskIndexHeader Header = {.PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment), .EntryCount = Entries.size()}; + CasDiskIndexHeader Header = {.EntryCount = Entries.size(), + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; + + Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header); + ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0); ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry)); ObjectIndexFile.Flush(); @@ -1381,40 +1395,8 @@ CasContainerStrategy::MakeIndexSnapshot() { ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); - // Reconstruct the log from old log and any added log entries - RwLock::ExclusiveLockScope __(m_LocationMapLock); - if (fs::is_regular_file(STmplogPath)) - { - std::vector<CasDiskIndexEntry> Records; - Records.reserve(m_LocationMap.size()); - { - TCasLogFile<CasDiskIndexEntry> OldCasLog; - OldCasLog.Open(STmplogPath, CasLogFile::Mode::kRead); - Records.reserve(OldCasLog.GetLogCount()); - OldCasLog.Replay([&](const CasDiskIndexEntry& Record) { Records.push_back(Record); }); - } - { - Records.reserve(Records.size() + m_CasLog.GetLogCount()); - m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { Records.push_back(Record); }); - } - - TCasLogFile<CasDiskIndexEntry> RecoveredCasLog; - RecoveredCasLog.Open(SRecoveredlogPath, CasLogFile::Mode::kWrite); - RecoveredCasLog.Append(Records); - RecoveredCasLog.Flush(); - RecoveredCasLog.Close(); - - fs::remove(SlogPath); - fs::rename(SRecoveredlogPath, SlogPath); - fs::remove(STmplogPath); - } - - if (fs::is_regular_file(SidxPath)) - { - fs::remove(SidxPath); - } - // Restore any previous snapshot + if (fs::is_regular_file(STmpSidxPath)) { fs::remove(SidxPath); @@ -1425,10 +1407,6 @@ CasContainerStrategy::MakeIndexSnapshot() { fs::remove(STmpSidxPath); } - if (fs::is_regular_file(STmplogPath)) - { - fs::remove(STmplogPath); - } } void @@ -1452,9 +1430,11 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) std::filesystem::remove_all(BasePath); } + uint64_t LogPosition = 0; { - std::vector<CasDiskIndexEntry> IndexEntries = ReadIndexFile(m_Config.RootDirectory, m_ContainerBaseName, m_PayloadAlignment); - std::string InvalidEntryReason; + std::vector<CasDiskIndexEntry> IndexEntries = + ReadIndexFile(m_Config.RootDirectory, m_ContainerBaseName, m_PayloadAlignment, LogPosition); + std::string InvalidEntryReason; for (const CasDiskIndexEntry& Entry : IndexEntries) { if (!ValidateEntry(Entry, InvalidEntryReason)) @@ -1469,7 +1449,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) } { - std::vector<CasDiskIndexEntry> LogEntries = ReadLog(m_Config.RootDirectory, m_ContainerBaseName); + std::vector<CasDiskIndexEntry> LogEntries = ReadLog(m_Config.RootDirectory, m_ContainerBaseName, LogPosition); std::string InvalidEntryReason; for (const CasDiskIndexEntry& Entry : LogEntries) { @@ -1747,6 +1727,16 @@ TEST_CASE("compactcas.compact.totalsize") const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } + + // Re-open again, this time we should have a snapshot + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 65536, 16, false); + + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + } } } @@ -2249,7 +2239,7 @@ TEST_CASE("compactcas.legacyconversion") TCasLogFile<CasDiskIndexEntry> CasLog; CasLog.Open(SlogPath, CasLogFile::Mode::kRead); LogEntries.reserve(CasLog.GetLogCount()); - CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); }); + CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0); } TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog; std::filesystem::path SLegacylogPath = GetLegacyLogPath(CasConfig.RootDirectory, "test"); |