aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-04 12:05:25 +0200
committerDan Engelbrecht <[email protected]>2022-04-04 12:05:25 +0200
commit605b4f330eed43b14135f37ffb58c14fa1cd79c2 (patch)
treee8c99f26abb0634ecf53afe76cebd6f0c05742ca /zenstore/compactcas.cpp
parentlogging cleanup (diff)
downloadzen-605b4f330eed43b14135f37ffb58c14fa1cd79c2.tar.xz
zen-605b4f330eed43b14135f37ffb58c14fa1cd79c2.zip
always keep full log but read from index snapshot location if available
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp190
1 files changed, 90 insertions, 100 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 366ea5534..c5f9ed80a 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -12,6 +12,8 @@
#include <zencore/workthreadpool.h>
#include <gsl/gsl-lite.hpp>
+#include <xxhash.h>
+
#if ZEN_WITH_TESTS
# include <zencore/compactbinarybuilder.h>
# include <zencore/testing.h>
@@ -27,15 +29,20 @@ namespace zen {
struct CasDiskIndexHeader
{
- static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
- static constexpr uint32_t CurrentVersion = 1;
- uint32_t Magic = ExpectedMagic;
- uint32_t Version = CurrentVersion;
- uint32_t PayloadAlignment = 0;
- uint32_t Reserved0 = 0;
- uint64_t EntryCount = 0;
- uint32_t Reserved1 = 0;
- uint32_t Reserved2 = 0;
+ static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
+ static constexpr uint32_t CurrentVersion = 1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t PayloadAlignment = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
};
static_assert(sizeof(CasDiskIndexHeader) == 32);
@@ -171,7 +178,8 @@ namespace {
std::vector<CasDiskIndexEntry> ReadIndexFile(const std::filesystem::path& RootDirectory,
const std::string& ContainerBaseName,
- uint64_t& InOutPayloadAlignment)
+ uint64_t& InOutPayloadAlignment,
+ uint64_t& OutLogPosition)
{
std::vector<CasDiskIndexEntry> Entries;
std::filesystem::path SidxPath = GetIndexPath(RootDirectory, ContainerBaseName);
@@ -193,12 +201,18 @@ namespace {
uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry);
CasDiskIndexHeader Header;
ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if (Header.Magic == CasDiskIndexHeader::ExpectedMagic && Header.Version == CasDiskIndexHeader::CurrentVersion &&
- Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount)
+ if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) &&
+ (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
+ (Header.EntryCount <= ExpectedEntryCount))
{
Entries.resize(Header.EntryCount);
ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
InOutPayloadAlignment = Header.PayloadAlignment;
+ OutLogPosition = Header.LogPosition;
+ }
+ else
+ {
+ ZEN_WARN("skipping invalid index file '{}'", SidxPath);
}
}
}
@@ -267,7 +281,9 @@ namespace {
return true;
}
- std::vector<CasDiskIndexEntry> ReadLog(const std::filesystem::path& RootDirectory, const std::string& ContainerBaseName)
+ std::vector<CasDiskIndexEntry> ReadLog(const std::filesystem::path& RootDirectory,
+ const std::string& ContainerBaseName,
+ uint64_t SkipEntryCount)
{
std::vector<CasDiskIndexEntry> Entries;
std::filesystem::path SlogPath = GetLogPath(RootDirectory, ContainerBaseName);
@@ -285,8 +301,14 @@ namespace {
CasLog.Open(SlogPath, CasLogFile::Mode::kRead);
if (CasLog.Initialize())
{
- Entries.reserve(CasLog.GetLogCount());
- CasLog.Replay([&](const CasDiskIndexEntry& Record) { Entries.push_back(Record); });
+ uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", SlogPath);
+ SkipEntryCount = 0;
+ }
+ Entries.reserve(EntryCount - SkipEntryCount);
+ CasLog.Replay([&](const CasDiskIndexEntry& Record) { Entries.push_back(Record); }, SkipEntryCount);
}
}
return Entries;
@@ -362,24 +384,26 @@ namespace {
if (LegacyCasLog.Initialize())
{
LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount());
- LegacyCasLog.Replay([&](const LegacyCasDiskIndexEntry& Record) {
- std::string InvalidEntryReason;
- if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone)
- {
- LegacyDiskIndex.erase(Record.Key);
- return;
- }
- if (!ValidateLegacyEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
- return;
- }
- if (ExistingChunks.contains(Record.Key))
- {
- return;
- }
- LegacyDiskIndex[Record.Key] = Record;
- });
+ LegacyCasLog.Replay(
+ [&](const LegacyCasDiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone)
+ {
+ LegacyDiskIndex.erase(Record.Key);
+ return;
+ }
+ if (!ValidateLegacyEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
+ return;
+ }
+ if (ExistingChunks.contains(Record.Key))
+ {
+ return;
+ }
+ LegacyDiskIndex[Record.Key] = Record;
+ },
+ 0);
}
}
@@ -1308,7 +1332,7 @@ CasContainerStrategy::MakeIndexSnapshot()
uint64_t EntryCount = 0;
Stopwatch Timer;
const auto _ = MakeGuard([this, &EntryCount, &Timer] {
- ZEN_INFO("write store snapshot for '{}' containing #{} entries in {}",
+ ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}",
m_Config.RootDirectory / m_ContainerBaseName,
EntryCount,
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
@@ -1316,13 +1340,10 @@ CasContainerStrategy::MakeIndexSnapshot()
namespace fs = std::filesystem;
- fs::path SlogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
- fs::path SidxPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
- fs::path STmplogPath = GetTempLogPath(m_Config.RootDirectory, m_ContainerBaseName);
- fs::path STmpSidxPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
- fs::path SRecoveredlogPath = GetRecoverLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+ fs::path SidxPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
+ fs::path STmpSidxPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
- // Index away, we keep it if something goes wrong
+ // Move index away, we keep it if something goes wrong
if (fs::is_regular_file(STmpSidxPath))
{
fs::remove(STmpSidxPath);
@@ -1332,31 +1353,17 @@ CasContainerStrategy::MakeIndexSnapshot()
fs::rename(SidxPath, STmpSidxPath);
}
- // Move cas away, we keep it if something goes wrong, any new chunks will be added to the new log
+ try
{
- RwLock::ExclusiveLockScope __(m_InsertLock);
- RwLock::ExclusiveLockScope ___(m_LocationMapLock);
m_CasLog.Flush();
- m_CasLog.Close();
-
- if (fs::is_regular_file(STmplogPath))
- {
- fs::remove(STmplogPath);
- }
- fs::rename(SlogPath, STmplogPath);
-
- // Open an new log
- m_CasLog.Open(SlogPath, CasLogFile::Mode::kTruncate);
- }
-
- try
- {
// Write the current state of the location map to a new index state
+ uint64_t LogCount = 0;
std::vector<CasDiskIndexEntry> Entries;
{
- RwLock::SharedLockScope __(m_LocationMapLock);
+ RwLock::SharedLockScope __(m_InsertLock);
+ RwLock::SharedLockScope ___(m_LocationMapLock);
Entries.resize(m_LocationMap.size());
uint64_t EntryIndex = 0;
@@ -1366,11 +1373,18 @@ CasContainerStrategy::MakeIndexSnapshot()
IndexEntry.Key = Entry.first;
IndexEntry.Location = Entry.second;
}
+
+ LogCount = m_CasLog.GetLogCount();
}
BasicFile ObjectIndexFile;
ObjectIndexFile.Open(SidxPath, BasicFile::Mode::kTruncate);
- CasDiskIndexHeader Header = {.PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment), .EntryCount = Entries.size()};
+ CasDiskIndexHeader Header = {.EntryCount = Entries.size(),
+ .LogPosition = LogCount,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
+
+ Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header);
+
ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0);
ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry));
ObjectIndexFile.Flush();
@@ -1381,40 +1395,8 @@ CasContainerStrategy::MakeIndexSnapshot()
{
ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
- // Reconstruct the log from old log and any added log entries
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- if (fs::is_regular_file(STmplogPath))
- {
- std::vector<CasDiskIndexEntry> Records;
- Records.reserve(m_LocationMap.size());
- {
- TCasLogFile<CasDiskIndexEntry> OldCasLog;
- OldCasLog.Open(STmplogPath, CasLogFile::Mode::kRead);
- Records.reserve(OldCasLog.GetLogCount());
- OldCasLog.Replay([&](const CasDiskIndexEntry& Record) { Records.push_back(Record); });
- }
- {
- Records.reserve(Records.size() + m_CasLog.GetLogCount());
- m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { Records.push_back(Record); });
- }
-
- TCasLogFile<CasDiskIndexEntry> RecoveredCasLog;
- RecoveredCasLog.Open(SRecoveredlogPath, CasLogFile::Mode::kWrite);
- RecoveredCasLog.Append(Records);
- RecoveredCasLog.Flush();
- RecoveredCasLog.Close();
-
- fs::remove(SlogPath);
- fs::rename(SRecoveredlogPath, SlogPath);
- fs::remove(STmplogPath);
- }
-
- if (fs::is_regular_file(SidxPath))
- {
- fs::remove(SidxPath);
- }
-
// Restore any previous snapshot
+
if (fs::is_regular_file(STmpSidxPath))
{
fs::remove(SidxPath);
@@ -1425,10 +1407,6 @@ CasContainerStrategy::MakeIndexSnapshot()
{
fs::remove(STmpSidxPath);
}
- if (fs::is_regular_file(STmplogPath))
- {
- fs::remove(STmplogPath);
- }
}
void
@@ -1452,9 +1430,11 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
std::filesystem::remove_all(BasePath);
}
+ uint64_t LogPosition = 0;
{
- std::vector<CasDiskIndexEntry> IndexEntries = ReadIndexFile(m_Config.RootDirectory, m_ContainerBaseName, m_PayloadAlignment);
- std::string InvalidEntryReason;
+ std::vector<CasDiskIndexEntry> IndexEntries =
+ ReadIndexFile(m_Config.RootDirectory, m_ContainerBaseName, m_PayloadAlignment, LogPosition);
+ std::string InvalidEntryReason;
for (const CasDiskIndexEntry& Entry : IndexEntries)
{
if (!ValidateEntry(Entry, InvalidEntryReason))
@@ -1469,7 +1449,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
}
{
- std::vector<CasDiskIndexEntry> LogEntries = ReadLog(m_Config.RootDirectory, m_ContainerBaseName);
+ std::vector<CasDiskIndexEntry> LogEntries = ReadLog(m_Config.RootDirectory, m_ContainerBaseName, LogPosition);
std::string InvalidEntryReason;
for (const CasDiskIndexEntry& Entry : LogEntries)
{
@@ -1747,6 +1727,16 @@ TEST_CASE("compactcas.compact.totalsize")
const uint64_t TotalSize = Cas.StorageSize().DiskSize;
CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
}
+
+ // Re-open again, this time we should have a snapshot
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 65536, 16, false);
+
+ const uint64_t TotalSize = Cas.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+ }
}
}
@@ -2249,7 +2239,7 @@ TEST_CASE("compactcas.legacyconversion")
TCasLogFile<CasDiskIndexEntry> CasLog;
CasLog.Open(SlogPath, CasLogFile::Mode::kRead);
LogEntries.reserve(CasLog.GetLogCount());
- CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); });
+ CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0);
}
TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog;
std::filesystem::path SLegacylogPath = GetLegacyLogPath(CasConfig.RootDirectory, "test");