aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-12-16 10:37:43 +0100
committerGitHub <[email protected]>2022-12-16 01:37:43 -0800
commitc5a974fa97bc23861464a48f89c93851f9f0ab63 (patch)
treeb8d9d6d51873d6c4421faf9179cc53c82f5d3eb8
parentoplog level GC (#209) (diff)
downloadzen-c5a974fa97bc23861464a48f89c93851f9f0ab63.tar.xz
zen-c5a974fa97bc23861464a48f89c93851f9f0ab63.zip
Fix log index snapshot (#210)
* Fix log reading for structured cache store Make sure cache is flushed at exit * dont flush index to disk unless new entries have been written * changelog
-rw-r--r--CHANGELOG.md3
-rw-r--r--zenserver/cache/structuredcache.cpp1
-rw-r--r--zenserver/cache/structuredcachestore.cpp29
-rw-r--r--zenserver/cache/structuredcachestore.h1
-rw-r--r--zenstore/compactcas.cpp31
-rw-r--r--zenstore/compactcas.h1
6 files changed, 40 insertions, 26 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 37c73fc7a..0211df5c3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,8 @@
##
- Feature: Oplog level GC in project store. If gc marker file path is given by UE, oplogs will be GC:d when marker file is deleted (and GC is triggered)
+- Bugfix: Index handling for cache large object store was broken resulting in log always being played back
+- Bugfix: Make sure to flush cache store on call to flush on service and exit
+- Improvement: Don't write index snapshots if no new entries has been added to log
## 0.2.0
- Feature: Recording and playback of cache request with full data - both get and put operations can be replayed. Invoke via web request
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index a7571b4c4..d273bc88c 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -527,6 +527,7 @@ HttpStructuredCacheService::BaseUri() const
void
HttpStructuredCacheService::Flush()
{
+ m_CacheStore.Flush();
}
void
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 9f6707a4a..de1243ddd 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -50,7 +50,7 @@ namespace {
struct CacheBucketIndexHeader
{
static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
- static constexpr uint32_t CurrentVersion = 1;
+ static constexpr uint32_t CurrentVersion = 2;
uint32_t Magic = ExpectedMagic;
uint32_t Version = CurrentVersion;
@@ -649,6 +649,12 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
void
ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
{
+ uint64_t LogCount = m_SlogFile.GetLogCount();
+ if (m_LogFlushPosition == LogCount)
+ {
+ return;
+ }
+
ZEN_DEBUG("write store snapshot for '{}'", m_BucketDir / m_BucketName);
uint64_t EntryCount = 0;
Stopwatch Timer;
@@ -676,10 +682,7 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
try
{
- m_SlogFile.Flush();
-
// Write the current state of the location map to a new index state
- uint64_t LogCount = 0;
std::vector<DiskIndexEntry> Entries;
{
@@ -692,8 +695,6 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
IndexEntry.Key = Entry.first;
IndexEntry.Location = Entry.second.Location;
}
-
- LogCount = m_SlogFile.GetLogCount();
}
BasicFile ObjectIndexFile;
@@ -708,7 +709,8 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
ObjectIndexFile.Flush();
ObjectIndexFile.Close();
- EntryCount = Entries.size();
+ EntryCount = Entries.size();
+ m_LogFlushPosition = LogCount;
}
catch (std::exception& Err)
{
@@ -787,9 +789,10 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount)
std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
if (std::filesystem::is_regular_file(LogPath))
{
+ uint64_t LogEntryCount = 0;
Stopwatch Timer;
const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' log containing #{} entries in {}", LogPath, m_Index.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ ZEN_INFO("read store '{}' log containing #{} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
TCasLogFile<DiskIndexEntry> CasLog;
CasLog.Open(LogPath, CasLogFile::Mode::kRead);
@@ -801,8 +804,8 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount)
ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
SkipEntryCount = 0;
}
- uint64_t ReadCount = EntryCount - SkipEntryCount;
- m_Index.reserve(ReadCount);
+ LogEntryCount = EntryCount - SkipEntryCount;
+ m_Index.reserve(LogEntryCount);
uint64_t InvalidEntryCount = 0;
CasLog.Replay(
[&](const DiskIndexEntry& Record) {
@@ -825,6 +828,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount)
{
ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
}
+ return LogEntryCount;
}
}
return 0;
@@ -849,8 +853,8 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is
fs::remove_all(m_BlocksBasePath);
}
- uint64_t LogPosition = ReadIndexFile();
- uint64_t LogEntryCount = ReadLog(LogPosition);
+ m_LogFlushPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(m_LogFlushPosition);
CreateDirectories(m_BucketDir);
@@ -987,6 +991,7 @@ ZenCacheDiskLayer::CacheBucket::Flush()
m_BlockStore.Flush();
RwLock::SharedLockScope _(m_IndexLock);
+ m_SlogFile.Flush();
MakeIndexSnapshot();
SaveManifest();
}
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index d5cb536e5..e6b849c0a 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -261,6 +261,7 @@ private:
// These files are used to manage storage of small objects for this bucket
TCasLogFile<DiskIndexEntry> m_SlogFile;
+ uint64_t m_LogFlushPosition = 0;
struct IndexEntry
{
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 1c692b609..70a88ecad 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -228,6 +228,7 @@ void
CasContainerStrategy::Flush()
{
m_BlockStore.Flush();
+ m_CasLog.Flush();
MakeIndexSnapshot();
}
@@ -502,6 +503,12 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
void
CasContainerStrategy::MakeIndexSnapshot()
{
+ uint64_t LogCount = m_CasLog.GetLogCount();
+ if (m_LogFlushPosition == LogCount)
+ {
+ return;
+ }
+
ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory / m_ContainerBaseName);
uint64_t EntryCount = 0;
Stopwatch Timer;
@@ -529,10 +536,7 @@ CasContainerStrategy::MakeIndexSnapshot()
try
{
- m_CasLog.Flush();
-
// Write the current state of the location map to a new index state
- uint64_t LogCount = 0;
std::vector<CasDiskIndexEntry> Entries;
{
@@ -546,8 +550,6 @@ CasContainerStrategy::MakeIndexSnapshot()
IndexEntry.Key = Entry.first;
IndexEntry.Location = Entry.second;
}
-
- LogCount = m_CasLog.GetLogCount();
}
BasicFile ObjectIndexFile;
@@ -562,7 +564,8 @@ CasContainerStrategy::MakeIndexSnapshot()
ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry));
ObjectIndexFile.Flush();
ObjectIndexFile.Close();
- EntryCount = Entries.size();
+ EntryCount = Entries.size();
+ m_LogFlushPosition = LogCount;
}
catch (std::exception& Err)
{
@@ -638,15 +641,15 @@ CasContainerStrategy::ReadIndexFile()
uint64_t
CasContainerStrategy::ReadLog(uint64_t SkipEntryCount)
{
- std::vector<CasDiskIndexEntry> Entries;
- std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName);
+ std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName);
if (std::filesystem::is_regular_file(LogPath))
{
+ size_t LogEntryCount = 0;
Stopwatch Timer;
const auto _ = MakeGuard([&] {
ZEN_INFO("read store '{}' log containing #{} entries in {}",
m_RootDirectory / m_ContainerBaseName,
- Entries.size(),
+ LogEntryCount,
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
@@ -660,10 +663,10 @@ CasContainerStrategy::ReadLog(uint64_t SkipEntryCount)
ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
SkipEntryCount = 0;
}
- uint64_t ReadCount = EntryCount - SkipEntryCount;
- Entries.reserve(ReadCount);
+ LogEntryCount = EntryCount - SkipEntryCount;
CasLog.Replay(
[&](const CasDiskIndexEntry& Record) {
+ LogEntryCount++;
std::string InvalidEntryReason;
if (Record.Flags & CasDiskIndexEntry::kTombstone)
{
@@ -678,7 +681,7 @@ CasContainerStrategy::ReadLog(uint64_t SkipEntryCount)
m_LocationMap[Record.Key] = Record.Location;
},
SkipEntryCount);
- return ReadCount;
+ return LogEntryCount;
}
}
return 0;
@@ -698,8 +701,8 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
std::filesystem::remove_all(BasePath);
}
- uint64_t LogPosition = ReadIndexFile();
- uint64_t LogEntryCount = ReadLog(LogPosition);
+ m_LogFlushPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(m_LogFlushPosition);
CreateDirectories(BasePath);
diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h
index 3d9c42c1b..b0c6699eb 100644
--- a/zenstore/compactcas.h
+++ b/zenstore/compactcas.h
@@ -80,6 +80,7 @@ private:
uint64_t m_MaxBlockSize = 1u << 28;
bool m_IsInitialized = false;
TCasLogFile<CasDiskIndexEntry> m_CasLog;
+ uint64_t m_LogFlushPosition = 0;
std::string m_ContainerBaseName;
std::filesystem::path m_BlocksBasePath;
BlockStore m_BlockStore;