aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-09-03 19:40:54 +0200
committerGitHub Enterprise <[email protected]>2024-09-03 19:40:54 +0200
commit3dfd38c8837c513cb3a5eff576cf3590046bd31e (patch)
treea3ab8d88738dcbeae5d709ccacfe15e1ed58b9ee
parentmeta info store (#75) (diff)
downloadzen-3dfd38c8837c513cb3a5eff576cf3590046bd31e.tar.xz
zen-3dfd38c8837c513cb3a5eff576cf3590046bd31e.zip
oplog index snapshots (#140)
- Feature: Added project store oplog index snapshots for faster opening of oplog - opening oplogs are roughly 10x faster
-rw-r--r--CHANGELOG.md1
-rw-r--r--src/zenserver/projectstore/projectstore.cpp882
-rw-r--r--src/zenserver/projectstore/projectstore.h53
-rw-r--r--src/zenstore/blockstore.cpp2
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp2
-rw-r--r--src/zenstore/caslog.cpp4
-rw-r--r--src/zenstore/include/zenstore/caslog.h4
7 files changed, 741 insertions, 207 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e0a27a569..e319c02ca 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,7 @@
- Improvement: Added detection of zombie processes on Mac/Linux when waiting for process to exit
- Feature: Added option `--gc-cache-attachment-store` which caches referenced attachments in cache records on disk for faster GC - default is `false`
- Feature: Added option `--gc-projectstore-attachment-store` which caches referenced attachments in project store oplogs on disk for faster GC - default is `false`
+- Feature: Added project store oplog index snapshots for faster opening of oplog - opening oplogs are roughly 10x faster
## 5.5.5
- Improvement: Log reasons for failing to read cache bucket sidecar file
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 378423100..1952adcd3 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -285,6 +285,35 @@ namespace {
: fmt::format("{}: {}", Result.Reason, Result.Text)};
}
+#pragma pack(push)
+#pragma pack(1)
+ struct OplogIndexHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x7569647a; // 'zidx';
+ static constexpr uint32_t CurrentVersion = 1;
+ static constexpr uint64_t DataAlignment = 8;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t LogPosition = 0;
+ uint32_t LSNCount = 0;
+ uint64_t KeyCount = 0;
+ uint32_t OpAddressMapCount = 0;
+ uint32_t LatestOpMapCount = 0;
+ uint64_t ChunkMapCount = 0;
+ uint64_t MetaMapCount = 0;
+ uint64_t FileMapCount = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const OplogIndexHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(OplogIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+ };
+#pragma pack(pop)
+
+ static_assert(sizeof(OplogIndexHeader) == 64);
+
} // namespace
//////////////////////////////////////////////////////////////////////////
@@ -297,7 +326,10 @@ struct ProjectStore::OplogStorage : public RefCounted
~OplogStorage()
{
- ZEN_INFO("closing oplog storage at {}", m_OplogStoragePath);
+ ZEN_INFO("oplog '{}/{}': closing oplog storage at {}",
+ m_OwnerOplog->GetOuterProject()->Identifier,
+ m_OwnerOplog->OplogId(),
+ m_OplogStoragePath);
try
{
Flush();
@@ -306,7 +338,11 @@ struct ProjectStore::OplogStorage : public RefCounted
}
catch (const std::exception& Ex)
{
- ZEN_WARN("Flushing oplog at '{}' failed. Reason: '{}'", m_OplogStoragePath, Ex.what());
+ ZEN_WARN("oplog '{}/{}': flushing oplog at '{}' failed. Reason: '{}'",
+ m_OwnerOplog->GetOuterProject()->Identifier,
+ m_OwnerOplog->OplogId(),
+ m_OplogStoragePath,
+ Ex.what());
}
}
@@ -331,11 +367,21 @@ struct ProjectStore::OplogStorage : public RefCounted
return 0;
}
+ uint32_t MaxLSN() const { return m_MaxLsn; }
+ void SetMaxLSNAndNextWriteAddress(uint32_t MaxLSN, const OplogEntryAddress& NextOpFileOffset)
+ {
+ m_MaxLsn.store(MaxLSN);
+ m_NextOpsOffset = RoundUp((NextOpFileOffset.Offset * m_OpsAlign) + NextOpFileOffset.Size, m_OpsAlign);
+ }
+
void Open(bool IsCreate)
{
ZEN_TRACE_CPU("Store::OplogStorage::Open");
- ZEN_INFO("initializing oplog storage at '{}'", m_OplogStoragePath);
+ ZEN_INFO("oplog '{}/{}': initializing storage at '{}'",
+ m_OwnerOplog->GetOuterProject()->Identifier,
+ m_OwnerOplog->OplogId(),
+ m_OplogStoragePath);
if (IsCreate)
{
@@ -384,9 +430,12 @@ struct ProjectStore::OplogStorage : public RefCounted
bool RetainLSNs,
bool DryRun)
{
- ZEN_TRACE_CPU("Store::OplogStorage::Compact()");
+ ZEN_TRACE_CPU("Store::OplogStorage::Compact");
- ZEN_INFO("compacting log for '{}'", m_OplogStoragePath);
+ ZEN_INFO("oplog '{}/{}': compacting at '{}'",
+ m_OwnerOplog->GetOuterProject()->Identifier,
+ m_OwnerOplog->OplogId(),
+ m_OplogStoragePath);
Stopwatch Timer;
@@ -521,7 +570,9 @@ struct ProjectStore::OplogStorage : public RefCounted
OplogEntryAddress{.Offset = LogEntry.OpCoreOffset, .Size = LogEntry.OpCoreSize});
}
- ZEN_INFO("oplog compact completed in {} - Max LSN# {}, New size: {}, old size {}.",
+ ZEN_INFO("oplog '{}/{}': compact completed in {} - Max LSN# {}, New size: {}, old size {}.",
+ m_OwnerOplog->GetOuterProject()->Identifier,
+ m_OwnerOplog->OplogId(),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
m_MaxLsn.load(),
NiceBytes(m_Oplog.GetLogSize() + m_OpBlobs.FileSize()),
@@ -550,12 +601,10 @@ struct ProjectStore::OplogStorage : public RefCounted
std::filesystem::path GetBlobsPath() const { return GetBlobsPath(m_OplogStoragePath); }
- void ReplayLog(std::function<void(CbObjectView, const OplogEntry&)>&& Handler)
+ void ReplayLog(std::function<void(CbObjectView, const OplogEntry&)>&& Handler, uint64_t SkipEntryCount = 0)
{
ZEN_TRACE_CPU("Store::OplogStorage::ReplayLog");
- ZEN_INFO("replaying log for '{}'", m_OplogStoragePath);
-
uint64_t OpsBlockSize = m_OpBlobs.FileSize();
Stopwatch Timer;
@@ -565,7 +614,6 @@ struct ProjectStore::OplogStorage : public RefCounted
{
tsl::robin_map<Oid, size_t, Oid::Hasher> LatestKeys;
- const uint64_t SkipEntryCount = 0;
m_Oplog.Replay(
[&](const OplogEntry& LogEntry) {
@@ -628,8 +676,8 @@ struct ProjectStore::OplogStorage : public RefCounted
BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
- uint32_t MaxOpLsn = 0;
- uint64_t NextOpFileOffset = 0;
+ uint32_t MaxOpLsn = m_MaxLsn;
+ uint64_t NextOpFileOffset = m_NextOpsOffset;
for (const OplogEntry& LogEntry : OpLogEntries)
{
@@ -648,12 +696,21 @@ struct ProjectStore::OplogStorage : public RefCounted
if (OpCoreHash != ExpectedOpCoreHash)
{
- ZEN_WARN("skipping bad checksum op - {}. Expected: {}, found: {}", LogEntry.OpKeyHash, ExpectedOpCoreHash, OpCoreHash);
+ ZEN_WARN("oplog '{}/{}': skipping bad checksum op - {}. Expected: {}, found: {}",
+ m_OwnerOplog->GetOuterProject()->Identifier,
+ m_OwnerOplog->OplogId(),
+ LogEntry.OpKeyHash,
+ ExpectedOpCoreHash,
+ OpCoreHash);
}
else if (CbValidateError Err = ValidateCompactBinary(OpBuffer.GetView(), CbValidateMode::Default);
Err != CbValidateError::None)
{
- ZEN_WARN("skipping invalid format op - {}. Error: '{}'", LogEntry.OpKeyHash, ToString(Err));
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Error: '{}'",
+ m_OwnerOplog->GetOuterProject()->Identifier,
+ m_OwnerOplog->OplogId(),
+ LogEntry.OpKeyHash,
+ ToString(Err));
}
else
{
@@ -668,7 +725,10 @@ struct ProjectStore::OplogStorage : public RefCounted
m_MaxLsn = MaxOpLsn;
m_NextOpsOffset = NextOpFileOffset;
- ZEN_INFO("oplog replay completed in {} - Max LSN# {}, Next offset: {}, {} tombstones, {} invalid entries",
+ ZEN_INFO("oplog '{}/{}': replay from '{}' completed in {} - Max LSN# {}, Next offset: {}, {} tombstones, {} invalid entries",
+ m_OwnerOplog->GetOuterProject()->Identifier,
+ m_OwnerOplog->OplogId(),
+ m_OplogStoragePath,
NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
m_MaxLsn.load(),
m_NextOpsOffset.load(),
@@ -838,6 +898,7 @@ struct ProjectStore::OplogStorage : public RefCounted
m_Oplog.Flush();
m_OpBlobs.Flush();
}
+ uint64_t LogCount() const { return m_Oplog.GetLogCount(); }
LoggerRef Log() { return m_OwnerOplog->Log(); }
@@ -860,11 +921,11 @@ ProjectStore::Oplog::Oplog(std::string_view Id,
std::filesystem::path BasePath,
const std::filesystem::path& MarkerPath)
: m_OuterProject(Project)
+, m_OplogId(Id)
, m_CidStore(Store)
, m_BasePath(BasePath)
, m_MarkerPath(MarkerPath)
, m_MetaValid(false)
-, m_OplogId(Id)
{
using namespace std::literals;
@@ -889,13 +950,24 @@ ProjectStore::Oplog::~Oplog()
void
ProjectStore::Oplog::Flush()
{
+ ZEN_TRACE_CPU("Oplog::Flush");
+
+ RwLock::SharedLockScope Lock(m_OplogLock);
+
ZEN_ASSERT(m_Storage);
+
m_Storage->Flush();
if (!m_MetaValid)
{
std::error_code DummyEc;
std::filesystem::remove(m_MetaPath, DummyEc);
}
+
+ uint64_t LogCount = m_Storage->LogCount();
+ if (m_LogFlushPosition != LogCount)
+ {
+ WriteIndexSnapshot();
+ }
}
void
@@ -944,7 +1016,11 @@ ProjectStore::Oplog::ScrubStorage(ScrubContext& Ctx)
{
if (Ctx.RunRecovery())
{
- ZEN_WARN("scrubbing found {} bad ops in oplog @ '{}', these will be removed from the index", BadEntryKeys.size(), m_BasePath);
+ ZEN_WARN("oplog '{}/{}': scrubbing found {} bad ops in oplog @ '{}', these will be removed from the index",
+ m_OuterProject->Identifier,
+ m_OplogId,
+ BadEntryKeys.size(),
+ m_BasePath);
// Actually perform some clean-up
RwLock::ExclusiveLockScope _(m_OplogLock);
@@ -960,7 +1036,11 @@ ProjectStore::Oplog::ScrubStorage(ScrubContext& Ctx)
}
else
{
- ZEN_WARN("scrubbing found {} bad ops in oplog @ '{}' but no cleanup will be performed", BadEntryKeys.size(), m_BasePath);
+ ZEN_WARN("oplog '{}/{}': scrubbing found {} bad ops in oplog @ '{}' but no cleanup will be performed",
+ m_OuterProject->Identifier,
+ m_OplogId,
+ BadEntryKeys.size(),
+ m_BasePath);
}
}
}
@@ -1003,6 +1083,11 @@ ProjectStore::Oplog::TotalSize(const std::filesystem::path& BasePath)
{
Size += std::filesystem::file_size(MetaFilePath);
}
+ std::filesystem::path IndexFilePath = BasePath / "ops.zidx"sv;
+ if (std::filesystem::exists(IndexFilePath))
+ {
+ Size += std::filesystem::file_size(IndexFilePath);
+ }
return Size;
}
@@ -1052,14 +1137,16 @@ ProjectStore::Oplog::ExistsAt(const std::filesystem::path& BasePath)
}
void
-ProjectStore::Oplog::Read(bool AllowCompact)
+ProjectStore::Oplog::Read()
{
using namespace std::literals;
+ ZEN_TRACE_CPU("Oplog::Read");
+ ZEN_LOG_SCOPE("Oplog::Read '{}'", m_OplogId);
std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv;
if (std::filesystem::is_regular_file(StateFilePath))
{
- ZEN_INFO("reading config for oplog '{}' in project '{}' from {}", m_OplogId, m_OuterProject->Identifier, StateFilePath);
+ ZEN_INFO("oplog '{}/{}': config read from '{}'", m_OuterProject->Identifier, m_OplogId, StateFilePath);
BasicFile Blob;
Blob.Open(StateFilePath, BasicFile::Mode::kRead);
@@ -1079,12 +1166,56 @@ ProjectStore::Oplog::Read(bool AllowCompact)
}
else
{
- ZEN_INFO("config for oplog '{}' in project '{}' not found at {}. Assuming legacy store",
- m_OplogId,
+ ZEN_INFO("oplog '{}/{}': config read not found at '{}', assuming legacy store",
m_OuterProject->Identifier,
+ m_OplogId,
StateFilePath);
}
- ReplayLog(AllowCompact);
+
+ if (!m_MetaValid)
+ {
+ std::error_code DummyEc;
+ std::filesystem::remove(m_MetaPath, DummyEc);
+ }
+
+ ReadIndexSnapshot();
+
+ m_Storage->ReplayLog(
+ [&](CbObjectView Op, const OplogEntry& OpEntry) {
+ // MaxLSN = Max(OpEntry.OpLsn, MaxLSN);
+
+ const OplogEntryMapping OpMapping = GetMapping(Op);
+ // Update chunk id maps
+ for (const OplogEntryMapping::Mapping& Chunk : OpMapping.Chunks)
+ {
+ m_ChunkMap.insert_or_assign(Chunk.Id, Chunk.Hash);
+ }
+
+ for (const OplogEntryMapping::FileMapping& File : OpMapping.Files)
+ {
+ if (File.Hash != IoHash::Zero)
+ {
+ m_ChunkMap.insert_or_assign(File.Id, File.Hash);
+ }
+ m_FileMap.insert_or_assign(
+ File.Id,
+ FileMapEntry{.ServerPath = File.Hash == IoHash::Zero ? File.ServerPath : std::string(), .ClientPath = File.ClientPath});
+ }
+
+ for (const OplogEntryMapping::Mapping& Meta : OpMapping.Meta)
+ {
+ m_MetaMap.insert_or_assign(Meta.Id, Meta.Hash);
+ }
+
+ m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize});
+ m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn;
+ },
+ m_LogFlushPosition);
+
+ if (m_Storage->LogCount() != m_LogFlushPosition)
+ {
+ WriteIndexSnapshot();
+ }
}
void
@@ -1102,7 +1233,7 @@ ProjectStore::Oplog::Write()
std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv;
- ZEN_INFO("persisting config for oplog '{}' in project '{}' to {}", m_OplogId, m_OuterProject->Identifier, StateFilePath);
+ ZEN_INFO("oplog '{}/{}': persisting config to '{}'", m_OuterProject->Identifier, m_OplogId, StateFilePath);
TemporaryFile::SafeWriteFile(StateFilePath, Mem.GetView());
}
@@ -1153,33 +1284,364 @@ ProjectStore::Oplog::Reset()
}
void
-ProjectStore::Oplog::ReplayLog(bool AllowCompact)
+ProjectStore::Oplog::WriteIndexSnapshot()
{
- ZEN_LOG_SCOPE("ReplayLog '{}'", m_OplogId);
+ ZEN_TRACE_CPU("Oplog::WriteIndexSnapshot");
- RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
- if (!m_Storage)
+ ZEN_DEBUG("oplog '{}/{}': write store snapshot at '{}'", m_OuterProject->Identifier, m_OplogId, m_BasePath);
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("oplog '{}/{}': wrote store snapshot for '{}' containing {} entries in {}",
+ m_OuterProject->Identifier,
+ m_OplogId,
+ m_BasePath,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ namespace fs = std::filesystem;
+
+ fs::path IndexPath = m_BasePath / "ops.zidx";
+ fs::path TempIndexPath = m_BasePath / "ops.zidx.tmp";
+
+ // Move index away, we keep it if something goes wrong
+ if (fs::is_regular_file(TempIndexPath))
{
- return;
+ std::error_code Ec;
+ if (!fs::remove(TempIndexPath, Ec) || Ec)
+ {
+ ZEN_WARN("oplog '{}/{}': snapshot failed to clean up temp snapshot at {}, reason: '{}'",
+ GetOuterProject()->Identifier,
+ m_OplogId,
+ TempIndexPath,
+ Ec.message());
+ return;
+ }
}
- uint32_t MaxLSN = 0;
- m_Storage->ReplayLog([&](CbObjectView Op, const OplogEntry& OpEntry) {
- MaxLSN = Max(OpEntry.OpLsn, MaxLSN);
- RegisterOplogEntry(OplogLock, GetMapping(Op), OpEntry);
- });
+ try
+ {
+ if (fs::is_regular_file(IndexPath))
+ {
+ fs::rename(IndexPath, TempIndexPath);
+ }
+
+ // Write the current state of the location map to a new index state
+ std::vector<uint32_t> LSNEntries;
+ std::vector<Oid> Keys;
+ std::vector<OplogEntryAddress> AddressMapEntries;
+ std::vector<uint32_t> LatestOpMapEntries;
+ std::vector<IoHash> ChunkMapEntries;
+ std::vector<IoHash> MetaMapEntries;
+ std::vector<uint32_t> FilePathLengths;
+ std::vector<std::string> FilePaths;
+ uint64_t IndexLogPosition = 0;
+ {
+ IndexLogPosition = m_Storage->LogCount();
+
+ Keys.reserve(m_OpAddressMap.size() + m_LatestOpMap.size() + m_ChunkMap.size() + m_FileMap.size());
+
+ AddressMapEntries.reserve(m_OpAddressMap.size());
+ LSNEntries.reserve(m_OpAddressMap.size());
+ for (const auto& It : m_OpAddressMap)
+ {
+ LSNEntries.push_back(It.first);
+ AddressMapEntries.push_back(It.second);
+ }
+
+ LatestOpMapEntries.reserve(m_LatestOpMap.size());
+ for (const auto& It : m_LatestOpMap)
+ {
+ Keys.push_back(It.first);
+ LatestOpMapEntries.push_back(It.second);
+ }
+
+ ChunkMapEntries.reserve(m_ChunkMap.size());
+ for (const auto& It : m_ChunkMap)
+ {
+ Keys.push_back(It.first);
+ ChunkMapEntries.push_back(It.second);
+ }
+
+ MetaMapEntries.reserve(m_MetaMap.size());
+ for (const auto& It : m_MetaMap)
+ {
+ Keys.push_back(It.first);
+ MetaMapEntries.push_back(It.second);
+ }
+
+ FilePathLengths.reserve(m_FileMap.size() * 2);
+ FilePaths.reserve(m_FileMap.size() * 2);
+ for (const auto& It : m_FileMap)
+ {
+ Keys.push_back(It.first);
+ FilePathLengths.push_back(gsl::narrow<uint32_t>(It.second.ServerPath.length()));
+ FilePathLengths.push_back(gsl::narrow<uint32_t>(It.second.ClientPath.length()));
+ FilePaths.push_back(It.second.ServerPath);
+ FilePaths.push_back(It.second.ClientPath);
+ }
+ }
+
+ TemporaryFile ObjectIndexFile;
+ std::error_code Ec;
+ ObjectIndexFile.CreateTemporary(IndexPath.parent_path(), Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to create temp file for index snapshot at '{}'", IndexPath));
+ }
+
+ {
+ BasicFileWriter IndexFile(ObjectIndexFile, 65536u);
+
+ OplogIndexHeader Header = {.LogPosition = IndexLogPosition,
+ .LSNCount = gsl::narrow<uint32_t>(LSNEntries.size()),
+ .KeyCount = gsl::narrow<uint64_t>(Keys.size()),
+ .OpAddressMapCount = gsl::narrow<uint32_t>(AddressMapEntries.size()),
+ .LatestOpMapCount = gsl::narrow<uint32_t>(LatestOpMapEntries.size()),
+ .ChunkMapCount = gsl::narrow<uint64_t>(ChunkMapEntries.size()),
+ .MetaMapCount = gsl::narrow<uint64_t>(MetaMapEntries.size()),
+ .FileMapCount = gsl::narrow<uint64_t>(FilePathLengths.size() / 2)};
+
+ Header.Checksum = OplogIndexHeader::ComputeChecksum(Header);
+
+ uint64_t Offset = 0;
+ IndexFile.Write(&Header, sizeof(OplogIndexHeader), Offset);
+ Offset += sizeof(OplogIndexHeader);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ IndexFile.Write(LSNEntries.data(), LSNEntries.size() * sizeof(uint32_t), Offset);
+ Offset += LSNEntries.size() * sizeof(uint32_t);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ IndexFile.Write(Keys.data(), Keys.size() * sizeof(Oid), Offset);
+ Offset += Keys.size() * sizeof(Oid);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ IndexFile.Write(AddressMapEntries.data(), AddressMapEntries.size() * sizeof(OplogEntryAddress), Offset);
+ Offset += AddressMapEntries.size() * sizeof(OplogEntryAddress);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ IndexFile.Write(LatestOpMapEntries.data(), LatestOpMapEntries.size() * sizeof(uint32_t), Offset);
+ Offset += LatestOpMapEntries.size() * sizeof(uint32_t);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ IndexFile.Write(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset);
+ Offset += ChunkMapEntries.size() * sizeof(IoHash);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ IndexFile.Write(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset);
+ Offset += MetaMapEntries.size() * sizeof(IoHash);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
- if (AllowCompact)
+ IndexFile.Write(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset);
+ Offset += FilePathLengths.size() * sizeof(uint32_t);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ for (const auto& FilePath : FilePaths)
+ {
+ IndexFile.Write(FilePath.c_str(), FilePath.length(), Offset);
+ Offset += FilePath.length();
+ }
+ }
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to move temp file '{}' to '{}'", ObjectIndexFile.GetPath(), IndexPath));
+ }
+ EntryCount = LSNEntries.size();
+ m_LogFlushPosition = IndexLogPosition;
+ }
+ catch (const std::exception& Err)
{
- const uint32_t CompactUnusedThreshold = 50;
- uint32_t UnusedPercent = GetUnusedSpacePercentLocked();
- if (UnusedPercent >= CompactUnusedThreshold)
+ ZEN_WARN("oplog '{}/{}': snapshot FAILED, reason: '{}'", m_OuterProject->Identifier, m_OplogId, Err.what());
+
+ // Restore any previous snapshot
+
+ if (fs::is_regular_file(TempIndexPath))
{
- Compact(OplogLock,
- /*DryRun*/ false,
- /*RetainLSNs*/ MaxLSN <=
- 0xff000000ul, // If we have less than 16 miln entries left of our LSN range, allow renumbering of LSNs
- fmt::format("Compact on initial open of oplog {}/{}: ", m_OuterProject->Identifier, m_OplogId));
+ std::error_code Ec;
+ fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless
+ fs::rename(TempIndexPath, IndexPath, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("oplog '{}/{}': snapshot failed to restore old snapshot from {}, reason: '{}'",
+ m_OuterProject->Identifier,
+ m_OplogId,
+ TempIndexPath,
+ Ec.message());
+ }
+ }
+ }
+ if (fs::is_regular_file(TempIndexPath))
+ {
+ std::error_code Ec;
+ if (!fs::remove(TempIndexPath, Ec) || Ec)
+ {
+ ZEN_WARN("oplog '{}/{}': snapshot failed to remove temporary file {}, reason: '{}'",
+ m_OuterProject->Identifier,
+ m_OplogId,
+ TempIndexPath,
+ Ec.message());
+ }
+ }
+}
+
+void
+ProjectStore::Oplog::ReadIndexSnapshot()
+{
+ ZEN_TRACE_CPU("Oplog::ReadIndexSnapshot");
+
+ std::filesystem::path IndexPath = m_BasePath / "ops.zidx";
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("oplog '{}/{}': index read from '{}' containing {} entries in {}",
+ m_OuterProject->Identifier,
+ m_OplogId,
+ IndexPath,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ try
+ {
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(OplogIndexHeader))
+ {
+ OplogIndexHeader Header;
+ uint64_t Offset = 0;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ Offset += sizeof(OplogIndexHeader);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ if ((Header.Magic == OplogIndexHeader::ExpectedMagic) && (Header.Version == OplogIndexHeader::CurrentVersion) &&
+ (Header.Checksum == OplogIndexHeader::ComputeChecksum(Header)))
+ {
+ uint32_t MaxLSN = 0;
+ OplogEntryAddress LastOpAddress{.Offset = 0, .Size = 0};
+
+ uint32_t Checksum = OplogIndexHeader::ComputeChecksum(Header);
+ ZEN_ASSERT(Header.Checksum == Checksum);
+
+ std::vector<uint32_t> LSNEntries(Header.LSNCount);
+ ObjectIndexFile.Read(LSNEntries.data(), LSNEntries.size() * sizeof(uint32_t), Offset);
+ Offset += LSNEntries.size() * sizeof(uint32_t);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ size_t LSNOffset = 0;
+
+ std::vector<Oid> Keys(Header.KeyCount);
+ ObjectIndexFile.Read(Keys.data(), Keys.size() * sizeof(Oid), Offset);
+ Offset += Keys.size() * sizeof(Oid);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ size_t KeyOffset = 0;
+
+ {
+ std::vector<OplogEntryAddress> AddressMapEntries(Header.OpAddressMapCount);
+ ObjectIndexFile.Read(AddressMapEntries.data(), AddressMapEntries.size() * sizeof(OplogEntryAddress), Offset);
+ Offset += AddressMapEntries.size() * sizeof(OplogEntryAddress);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ m_OpAddressMap.reserve(AddressMapEntries.size());
+ for (const OplogEntryAddress& Address : AddressMapEntries)
+ {
+ m_OpAddressMap.insert_or_assign(LSNEntries[LSNOffset++], Address);
+ if (Address.Offset > LastOpAddress.Offset)
+ {
+ LastOpAddress = Address;
+ }
+ }
+ }
+ {
+ std::vector<uint32_t> LatestOpMapEntries(Header.LatestOpMapCount);
+ ObjectIndexFile.Read(LatestOpMapEntries.data(), LatestOpMapEntries.size() * sizeof(uint32_t), Offset);
+ Offset += LatestOpMapEntries.size() * sizeof(uint32_t);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ m_LatestOpMap.reserve(LSNEntries.size());
+ for (uint32_t LSN : LSNEntries)
+ {
+ m_LatestOpMap.insert_or_assign(Keys[KeyOffset++], LSN);
+ MaxLSN = Max(MaxLSN, LSN);
+ }
+ }
+ {
+ std::vector<IoHash> ChunkMapEntries(Header.ChunkMapCount);
+ ObjectIndexFile.Read(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset);
+ Offset += ChunkMapEntries.size() * sizeof(IoHash);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ m_ChunkMap.reserve(ChunkMapEntries.size());
+ for (const IoHash& ChunkId : ChunkMapEntries)
+ {
+ m_ChunkMap.insert_or_assign(Keys[KeyOffset++], ChunkId);
+ }
+ }
+ {
+ std::vector<IoHash> MetaMapEntries(Header.MetaMapCount);
+ ObjectIndexFile.Read(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset);
+ Offset += MetaMapEntries.size() * sizeof(IoHash);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ m_MetaMap.reserve(MetaMapEntries.size());
+ for (const IoHash& ChunkId : MetaMapEntries)
+ {
+ m_MetaMap.insert_or_assign(Keys[KeyOffset++], ChunkId);
+ }
+ }
+ {
+ m_FileMap.reserve(Header.FileMapCount);
+
+ std::vector<uint32_t> FilePathLengths(Header.FileMapCount * 2);
+ ObjectIndexFile.Read(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset);
+ Offset += FilePathLengths.size() * sizeof(uint32_t);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ BasicFileBuffer IndexFile(ObjectIndexFile, 65536);
+ auto ReadString([&IndexFile, &Offset](uint32_t Length) -> std::string_view {
+ MemoryView StringData = IndexFile.MakeView(Length, Offset);
+ if (StringData.GetSize() != Length)
+ {
+ throw std::runtime_error(fmt::format("Invalid format. Expected to read %u bytes but got %u",
+ Length,
+ uint32_t(StringData.GetSize())));
+ }
+ Offset += Length;
+ return std::string_view((const char*)StringData.GetData(), Length);
+ });
+ for (uint64_t FileLengthOffset = 0; FileLengthOffset < FilePathLengths.size();)
+ {
+ std::string_view ServerPath = ReadString(FilePathLengths[FileLengthOffset++]);
+ std::string_view ClientPath = ReadString(FilePathLengths[FileLengthOffset++]);
+ m_FileMap.insert_or_assign(
+ Keys[KeyOffset++],
+ FileMapEntry{.ServerPath = std::string(ServerPath), .ClientPath = std::string(ClientPath)});
+ }
+ }
+ m_LogFlushPosition = Header.LogPosition;
+ m_Storage->SetMaxLSNAndNextWriteAddress(MaxLSN, LastOpAddress);
+ EntryCount = Header.LSNCount;
+ }
+ else
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'", m_OuterProject->Identifier, m_OplogId, IndexPath);
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ m_OpAddressMap.clear();
+ m_LatestOpMap.clear();
+ m_ChunkMap.clear();
+ m_MetaMap.clear();
+ m_FileMap.clear();
+ m_LogFlushPosition = 0;
+ ZEN_ERROR("oplog '{}/{}': failed reading index snapshot from '{}'. Reason: '{}'",
+ m_OuterProject->Identifier,
+ m_OplogId,
+ IndexPath,
+ Ex.what());
}
}
}
@@ -1259,12 +1721,13 @@ ProjectStore::Oplog::Compact(RwLock::ExclusiveLockScope&, bool DryRun, bool Reta
{
m_OpAddressMap.swap(OpAddressMap);
m_LatestOpMap.swap(LatestOpMap);
+ WriteIndexSnapshot();
}
uint64_t PostSize = TotalSize();
uint64_t FreedSize = (PreSize > PostSize) ? (PreSize - PostSize) : 0;
- ZEN_INFO("{} Compacted oplog {}/{} in {}. New size: {}, freeing: {}",
+ ZEN_INFO("{} oplog '{}/{}': Compacted in {}. New size: {}, freeing: {}",
LogPrefix,
m_OuterProject->Identifier,
m_OplogId,
@@ -1356,7 +1819,12 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
}
catch (const std::exception& Ex)
{
- ZEN_WARN("Exception caught when iterating file chunk {}, path '{}'. Reason: '{}'", FileChunkIndex, FilePath, Ex.what());
+ ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'",
+ m_OuterProject->Identifier,
+ m_OplogId,
+ FileChunkIndex,
+ FilePath,
+ Ex.what());
}
});
}
@@ -1503,6 +1971,7 @@ ProjectStore::Oplog::IterateOplog(std::function<void(CbObjectView)>&& Handler)
void
ProjectStore::Oplog::IterateOplogLocked(std::function<void(CbObjectView)>&& Handler)
{
+ ZEN_TRACE_CPU("Store::Oplog::IterateOplogLocked");
if (!m_Storage)
{
return;
@@ -1531,6 +2000,7 @@ static constexpr uint32_t OplogMetaDataExpectedMagic = 0x6f'74'6d'62; // 'omta'
void
ProjectStore::Oplog::GetAttachmentsLocked(std::vector<IoHash>& OutAttachments, bool StoreMetaDataOnDisk)
{
+ ZEN_TRACE_CPU("Store::Oplog::GetAttachmentsLocked");
if (!m_Storage)
{
return;
@@ -1541,6 +2011,7 @@ ProjectStore::Oplog::GetAttachmentsLocked(std::vector<IoHash>& OutAttachments, b
IoBuffer MetadataPayload = IoBufferBuilder::MakeFromFile(m_MetaPath);
if (MetadataPayload)
{
+ ZEN_TRACE_CPU("Store::Oplog::GetAttachmentsFromMetaData");
if (GetAttachmentsFromMetaData<Oid, IoHash>(
MetadataPayload,
OplogMetaDataExpectedMagic,
@@ -1591,7 +2062,11 @@ ProjectStore::Oplog::GetAttachmentsLocked(std::vector<IoHash>& OutAttachments, b
if (Ec)
{
m_MetaValid = false;
- ZEN_WARN("Unable to set meta data for oplog '{}' at meta path: '{}'. Reason: '{}'", OplogId(), MetaPath, Ec.message());
+ ZEN_WARN("oplog '{}/{}': unable to set meta data meta path: '{}'. Reason: '{}'",
+ m_OuterProject->Identifier,
+ m_OplogId,
+ MetaPath,
+ Ec.message());
}
else
{
@@ -1879,7 +2354,7 @@ ProjectStore::Oplog::GetMapping(CbObjectView Core)
Oid Id = PackageObj["id"sv].AsObjectId();
IoHash Hash = PackageObj["data"sv].AsBinaryAttachment();
Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
- ZEN_DEBUG("package data {} -> {}", Id, Hash);
+ ZEN_DEBUG("oplog {}/{}: package data {} -> {}", m_OuterProject->Identifier, m_OplogId, Id, Hash);
continue;
}
if (FieldName == "bulkdata"sv)
@@ -1891,7 +2366,7 @@ ProjectStore::Oplog::GetMapping(CbObjectView Core)
Oid Id = BulkObj["id"sv].AsObjectId();
IoHash Hash = BulkObj["data"sv].AsBinaryAttachment();
Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
- ZEN_DEBUG("bulkdata {} -> {}", Id, Hash);
+ ZEN_DEBUG("oplog {}/{}: bulkdata {} -> {}", m_OuterProject->Identifier, m_OplogId, Id, Hash);
}
continue;
}
@@ -1904,7 +2379,7 @@ ProjectStore::Oplog::GetMapping(CbObjectView Core)
Oid Id = PackageDataObj["id"sv].AsObjectId();
IoHash Hash = PackageDataObj["data"sv].AsBinaryAttachment();
Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
- ZEN_DEBUG("package {} -> {}", Id, Hash);
+ ZEN_DEBUG("oplog {}/{}: package {} -> {}", m_OuterProject->Identifier, m_OplogId, Id, Hash);
}
continue;
}
@@ -1922,17 +2397,29 @@ ProjectStore::Oplog::GetMapping(CbObjectView Core)
IoHash Hash = FileObj["data"sv].AsBinaryAttachment();
if (ServerPath.empty() && Hash == IoHash::Zero)
{
- ZEN_WARN("invalid file for entry '{}', missing both 'serverpath' and 'data' fields", Id);
+ ZEN_WARN("oplog {}/{}: invalid file for entry '{}', missing both 'serverpath' and 'data' fields",
+ m_OuterProject->Identifier,
+ m_OplogId,
+ Id);
continue;
}
if (ClientPath.empty())
{
- ZEN_WARN("invalid file for entry '{}', missing 'clientpath' field", Id);
+ ZEN_WARN("oplog {}/{}: invalid file for entry '{}', missing 'clientpath' field",
+ m_OuterProject->Identifier,
+ m_OplogId,
+ Id);
continue;
}
Result.Files.emplace_back(OplogEntryMapping::FileMapping{Id, Hash, std::string(ServerPath), std::string(ClientPath)});
- ZEN_DEBUG("file {} -> {}, ServerPath: {}, ClientPath: {}", Id, Hash, ServerPath, ClientPath);
+ ZEN_DEBUG("oplog {}/{}: file {} -> {}, ServerPath: {}, ClientPath: {}",
+ m_OuterProject->Identifier,
+ m_OplogId,
+ Id,
+ Hash,
+ ServerPath,
+ ClientPath);
}
continue;
}
@@ -1947,7 +2434,7 @@ ProjectStore::Oplog::GetMapping(CbObjectView Core)
IoHash Hash = MetaObj["data"sv].AsBinaryAttachment();
Result.Meta.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
auto NameString = MetaObj["name"sv].AsString();
- ZEN_DEBUG("meta data ({}) {} -> {}", NameString, Id, Hash);
+ ZEN_DEBUG("oplog {}/{}: meta data ({}) {} -> {}", m_OuterProject->Identifier, m_OplogId, NameString, Id, Hash);
}
continue;
}
@@ -2112,10 +2599,12 @@ ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores)
std::vector<uint32_t> EntryIds;
EntryIds.resize(OpCount);
{
- RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
- for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
{
- EntryIds[OpIndex] = RegisterOplogEntry(OplogLock, Mappings[OpIndex], OpEntries[OpIndex]);
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
+ {
+ EntryIds[OpIndex] = RegisterOplogEntry(OplogLock, Mappings[OpIndex], OpEntries[OpIndex]);
+ }
}
CaptureUpdatedLSNs(EntryIds);
m_MetaValid = false;
@@ -2151,13 +2640,13 @@ ProjectStore::Project::Exists(const std::filesystem::path& BasePath)
void
ProjectStore::Project::Read()
{
- ZEN_TRACE_CPU("Store::Project::Read");
+ ZEN_TRACE_CPU("Project::Read");
using namespace std::literals;
std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv;
- ZEN_INFO("reading config for project '{}' from {}", Identifier, ProjectStateFilePath);
+ ZEN_INFO("project '{}': reading config from '{}'", Identifier, ProjectStateFilePath);
BasicFile Blob;
Blob.Open(ProjectStateFilePath, BasicFile::Mode::kRead);
@@ -2186,7 +2675,7 @@ ProjectStore::Project::Read()
void
ProjectStore::Project::Write()
{
- ZEN_TRACE_CPU("Store::Project::Write");
+ ZEN_TRACE_CPU("Project::Write");
using namespace std::literals;
@@ -2205,7 +2694,7 @@ ProjectStore::Project::Write()
std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv;
- ZEN_INFO("persisting config for project '{}' to {}", Identifier, ProjectStateFilePath);
+ ZEN_INFO("project '{}': persisting config to '{}'", Identifier, ProjectStateFilePath);
TemporaryFile::SafeWriteFile(ProjectStateFilePath, Mem.GetView());
}
@@ -2215,15 +2704,13 @@ ProjectStore::Project::ReadAccessTimes()
{
using namespace std::literals;
- RwLock::SharedLockScope _(m_ProjectLock);
-
std::filesystem::path ProjectAccessTimesFilePath = m_OplogStoragePath / "AccessTimes.zcb"sv;
if (!std::filesystem::exists(ProjectAccessTimesFilePath))
{
return;
}
- ZEN_INFO("reading access times for project '{}' from {}", Identifier, ProjectAccessTimesFilePath);
+ ZEN_INFO("project '{}': reading access times '{}'", Identifier, ProjectAccessTimesFilePath);
BasicFile Blob;
Blob.Open(ProjectAccessTimesFilePath, BasicFile::Mode::kRead);
@@ -2247,6 +2734,7 @@ ProjectStore::Project::ReadAccessTimes()
}
CbArrayView IdArray = Reader["ids"sv].AsArrayView();
uint64_t Index = 0;
+
for (CbFieldView& IdView : IdArray)
{
std::string_view Id = IdView.AsString();
@@ -2268,7 +2756,7 @@ ProjectStore::Project::ReadAccessTimes()
}
else
{
- ZEN_WARN("validation error {} hit for '{}'", int(ValidationError), ProjectAccessTimesFilePath);
+ ZEN_WARN("project '{}': validation error {} hit for '{}'", Identifier, int(ValidationError), ProjectAccessTimesFilePath);
}
}
@@ -2279,11 +2767,12 @@ ProjectStore::Project::WriteAccessTimes()
CbObjectWriter Writer;
- Writer.AddInteger("count", gsl::narrow<uint64_t>(m_LastAccessTimes.size()));
- Writer.BeginArray("ids");
-
{
- RwLock::SharedLockScope _(m_ProjectLock);
+ RwLock::SharedLockScope _(m_LastAccessTimesLock);
+
+ Writer.AddInteger("count", gsl::narrow<uint64_t>(m_LastAccessTimes.size()));
+ Writer.BeginArray("ids");
+
for (const auto& It : m_LastAccessTimes)
{
Writer << It.first;
@@ -2305,24 +2794,24 @@ ProjectStore::Project::WriteAccessTimes()
std::filesystem::path ProjectAccessTimesFilePath = m_OplogStoragePath / "AccessTimes.zcb"sv;
- ZEN_INFO("persisting access times for project '{}' to {}", Identifier, ProjectAccessTimesFilePath);
+ ZEN_INFO("project '{}': persisting access times for '{}'", Identifier, ProjectAccessTimesFilePath);
TemporaryFile::SafeWriteFile(ProjectAccessTimesFilePath, Data.GetView());
}
catch (const std::exception& Err)
{
- ZEN_WARN("writing access times FAILED, reason: '{}'", Err.what());
+ ZEN_WARN("project '{}': writing access times FAILED, reason: '{}'", Identifier, Err.what());
}
}
LoggerRef
-ProjectStore::Project::Log()
+ProjectStore::Project::Log() const
{
return m_ProjectStore->Log();
}
std::filesystem::path
-ProjectStore::Project::BasePathForOplog(std::string_view OplogId)
+ProjectStore::Project::BasePathForOplog(std::string_view OplogId) const
{
return m_OplogStoragePath / OplogId;
}
@@ -2368,6 +2857,7 @@ ProjectStore::Oplog*
ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact)
{
ZEN_TRACE_CPU("Store::OpenOplog");
+
{
RwLock::SharedLockScope _(m_ProjectLock);
@@ -2379,14 +2869,15 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact)
}
}
+ std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
RwLock::ExclusiveLockScope Lock(m_ProjectLock);
-
- std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
+ if (auto It = m_Oplogs.find(std::string{OplogId}); It != m_Oplogs.end())
+ {
+ return It->second.get();
+ }
if (Oplog::ExistsAt(OplogBasePath))
{
- // Do open of existing oplog
-
try
{
Oplog* Log =
@@ -2394,14 +2885,21 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact)
.try_emplace(std::string{OplogId},
std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, std::filesystem::path{}))
.first->second.get();
- Log->Read(AllowCompact);
+ Log->Read();
+ Lock.ReleaseNow();
+ if (AllowCompact)
+ {
+ const uint32_t CompactUnusedThreshold = 50;
+ Log->CompactIfUnusedExceeds(/*DryRun*/ false,
+ CompactUnusedThreshold,
+ fmt::format("Compact on initial open of oplog {}/{}: ", Identifier, OplogId));
+ }
return Log;
}
- catch (const std::exception& ex)
+ catch (const std::exception& Ex)
{
- ZEN_WARN("failed to open oplog '{}' @ '{}': {}", OplogId, OplogBasePath, ex.what());
-
+ ZEN_WARN("oplog '{}/{}': failed to open oplog at '{}': {}", Identifier, OplogId, OplogBasePath, Ex.what());
m_Oplogs.erase(std::string{OplogId});
}
}
@@ -2409,34 +2907,55 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact)
return nullptr;
}
+void
+ProjectStore::Oplog::CompactIfUnusedExceeds(bool DryRun, uint32_t CompactUnusedThreshold, std::string_view LogPrefix)
+{
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ if (!m_Storage)
+ {
+ return;
+ }
+ uint32_t UnusedPercent = GetUnusedSpacePercentLocked();
+ if (UnusedPercent >= CompactUnusedThreshold)
+ {
+ Compact(OplogLock,
+ DryRun,
+ /*RetainLSNs*/ m_Storage->MaxLSN() <=
+ 0xff000000ul, // If we have less than 16 miln entries left of our LSN range, allow renumbering of LSNs
+ LogPrefix);
+ }
+}
+
bool
ProjectStore::Project::RemoveOplog(std::string_view OplogId, std::filesystem::path& OutDeletePath)
{
- RwLock::ExclusiveLockScope _(m_ProjectLock);
-
- if (auto OplogIt = m_Oplogs.find(std::string(OplogId)); OplogIt == m_Oplogs.end())
{
- std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
+ RwLock::ExclusiveLockScope _(m_ProjectLock);
- if (Oplog::ExistsAt(OplogBasePath))
+ if (auto OplogIt = m_Oplogs.find(std::string(OplogId)); OplogIt == m_Oplogs.end())
{
- if (!PrepareDirectoryDelete(OplogBasePath, OutDeletePath))
+ std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
+
+ if (Oplog::ExistsAt(OplogBasePath))
{
- return false;
+ if (!PrepareDirectoryDelete(OplogBasePath, OutDeletePath))
+ {
+ return false;
+ }
}
}
- }
- else
- {
- std::unique_ptr<Oplog>& Oplog = OplogIt->second;
- if (!Oplog->PrepareForDelete(OutDeletePath))
+ else
{
- return false;
+ std::unique_ptr<Oplog>& Oplog = OplogIt->second;
+ if (!Oplog->PrepareForDelete(OutDeletePath))
+ {
+ return false;
+ }
+ m_DeletedOplogs.emplace_back(std::move(Oplog));
+ m_Oplogs.erase(OplogIt);
}
- m_DeletedOplogs.emplace_back(std::move(Oplog));
- m_Oplogs.erase(OplogIt);
}
- m_LastAccessTimes.erase(std::string(OplogId));
+ m_LastAccessTimesLock.WithExclusiveLock([&]() { m_LastAccessTimes.erase(std::string(OplogId)); });
return true;
}
@@ -2454,7 +2973,7 @@ ProjectStore::Project::DeleteOplog(std::string_view OplogId)
{
if (!OplogStorage::Delete(DeletePath))
{
- ZEN_WARN("Failed to remove old oplog path '{}'", DeletePath);
+ ZEN_WARN("oplog '{}/{}': failed to remove old oplog path '{}'", Identifier, OplogId, DeletePath);
return false;
}
}
@@ -2506,6 +3025,8 @@ ProjectStore::Project::IterateOplogs(std::function<void(const RwLock::SharedLock
void
ProjectStore::Project::Flush()
{
+ ZEN_TRACE_CPU("Project::Flush");
+
// We only need to flush oplogs that we have already loaded
IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) { Ops.Flush(); });
WriteAccessTimes();
@@ -2520,8 +3041,8 @@ ProjectStore::Project::ScrubStorage(ScrubContext& Ctx)
{
OpenOplog(OpLogId, /*AllowCompact*/ false);
}
- IterateOplogs([&](const RwLock::SharedLockScope& ProjectLock, Oplog& Ops) {
- if (!IsExpired(ProjectLock, GcClock::TimePoint::min(), Ops))
+ IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) {
+ if (!IsExpired(GcClock::TimePoint::min(), Ops))
{
Ops.ScrubStorage(Ctx);
}
@@ -2547,7 +3068,7 @@ ProjectStore::Project::GatherReferences(GcContext& GcCtx)
{
// Make sure any oplog at least have a last access time so they eventually will be GC:d if not touched
- RwLock::ExclusiveLockScope _(m_ProjectLock);
+ RwLock::ExclusiveLockScope _(m_LastAccessTimesLock);
for (const std::string& OpId : OpLogs)
{
if (auto It = m_LastAccessTimes.find(OpId); It == m_LastAccessTimes.end())
@@ -2557,8 +3078,8 @@ ProjectStore::Project::GatherReferences(GcContext& GcCtx)
}
}
- IterateOplogs([&](const RwLock::SharedLockScope& ProjectLock, Oplog& Ops) {
- if (!IsExpired(ProjectLock, GcCtx.ProjectStoreExpireTime(), Ops))
+ IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) {
+ if (!IsExpired(GcCtx.ProjectStoreExpireTime(), Ops))
{
Ops.GatherReferences(GcCtx);
}
@@ -2593,7 +3114,7 @@ ProjectStore::Project::TotalSize() const
std::vector<std::string> OpLogs = ScanForOplogs();
for (const std::string& OpLogId : OpLogs)
{
- std::filesystem::path OplogBasePath = m_OplogStoragePath / OpLogId;
+ std::filesystem::path OplogBasePath = BasePathForOplog(OpLogId);
Result += Oplog::TotalSize(OplogBasePath);
}
}
@@ -2678,10 +3199,9 @@ ProjectStore::Project::GetGcReferencerLocks()
}
bool
-ProjectStore::Project::IsExpired(const RwLock::SharedLockScope&,
- const std::string& EntryName,
+ProjectStore::Project::IsExpired(const std::string& EntryName,
const std::filesystem::path& MarkerPath,
- const GcClock::TimePoint ExpireTime)
+ const GcClock::TimePoint ExpireTime) const
{
if (!MarkerPath.empty())
{
@@ -2690,8 +3210,11 @@ ProjectStore::Project::IsExpired(const RwLock::SharedLockScope&,
{
if (Ec)
{
- ZEN_WARN("Failed to check expiry via marker file '{}', assuming {} is not expired",
- EntryName.empty() ? "project" : EntryName,
+ ZEN_WARN("{} '{}{}{}', Failed to check expiry via marker file '{}', assuming not expired",
+ EntryName.empty() ? "project"sv : "oplog"sv,
+ Identifier,
+ EntryName.empty() ? ""sv : "/"sv,
+ EntryName,
MarkerPath.string());
return false;
}
@@ -2701,6 +3224,7 @@ ProjectStore::Project::IsExpired(const RwLock::SharedLockScope&,
const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
+ RwLock::SharedLockScope _(m_LastAccessTimesLock);
if (auto It = m_LastAccessTimes.find(EntryName); It != m_LastAccessTimes.end())
{
if (It->second <= ExpireTicks)
@@ -2712,24 +3236,23 @@ ProjectStore::Project::IsExpired(const RwLock::SharedLockScope&,
}
bool
-ProjectStore::Project::IsExpired(const RwLock::SharedLockScope& ProjectLock, const GcClock::TimePoint ExpireTime)
+ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime) const
{
- return IsExpired(ProjectLock, std::string(), ProjectFilePath, ExpireTime);
+ return IsExpired(std::string(), ProjectFilePath, ExpireTime);
}
bool
-ProjectStore::Project::IsExpired(const RwLock::SharedLockScope& ProjectLock,
- const GcClock::TimePoint ExpireTime,
- const ProjectStore::Oplog& Oplog)
+ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog) const
{
- return IsExpired(ProjectLock, Oplog.OplogId(), Oplog.MarkerPath(), ExpireTime);
+ return IsExpired(Oplog.OplogId(), Oplog.MarkerPath(), ExpireTime);
}
bool
-ProjectStore::Project::IsOplogTouchedSince(const RwLock::SharedLockScope&, const GcClock::TimePoint TouchTime, std::string_view Oplog) const
+ProjectStore::Project::IsOplogTouchedSince(const GcClock::TimePoint TouchTime, std::string_view Oplog) const
{
const GcClock::Tick TouchTicks = TouchTime.time_since_epoch().count();
+ RwLock::ExclusiveLockScope _(m_LastAccessTimesLock);
if (auto It = m_LastAccessTimes.find(std::string(Oplog)); It != m_LastAccessTimes.end())
{
if (It->second > TouchTicks)
@@ -2740,32 +3263,25 @@ ProjectStore::Project::IsOplogTouchedSince(const RwLock::SharedLockScope&, const
return false;
}
-bool
-ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog)
-{
- RwLock::SharedLockScope Lock(m_ProjectLock);
- return IsExpired(Lock, Oplog.OplogId(), Oplog.MarkerPath(), ExpireTime);
-}
-
void
-ProjectStore::Project::TouchProject() const
+ProjectStore::Project::TouchProject()
{
- RwLock::ExclusiveLockScope _(m_ProjectLock);
+ RwLock::ExclusiveLockScope _(m_LastAccessTimesLock);
m_LastAccessTimes.insert_or_assign(std::string(), GcClock::TickCount());
-};
+}
void
-ProjectStore::Project::TouchOplog(std::string_view Oplog) const
+ProjectStore::Project::TouchOplog(std::string_view Oplog)
{
ZEN_ASSERT(!Oplog.empty());
- RwLock::ExclusiveLockScope _(m_ProjectLock);
+ RwLock::ExclusiveLockScope _(m_LastAccessTimesLock);
m_LastAccessTimes.insert_or_assign(std::string(Oplog), GcClock::TickCount());
-};
+}
GcClock::TimePoint
ProjectStore::Project::LastOplogAccessTime(std::string_view Oplog) const
{
- RwLock::SharedLockScope Lock(m_ProjectLock);
+ RwLock::SharedLockScope _(m_LastAccessTimesLock);
if (auto It = m_LastAccessTimes.find(std::string(Oplog)); It != m_LastAccessTimes.end())
{
return GcClock::TimePointFromTick(It->second);
@@ -2843,6 +3359,8 @@ ProjectStore::IterateProjects(std::function<void(Project& Prj)>&& Fn)
void
ProjectStore::Flush()
{
+ ZEN_TRACE_CPU("Store::Flush");
+
ZEN_INFO("flushing project store at '{}'", m_ProjectBasePath);
std::vector<Ref<Project>> Projects;
{
@@ -2854,10 +3372,21 @@ ProjectStore::Flush()
Projects.push_back(Kv.second);
}
}
+
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst);
+ Latch WorkLatch(1);
+
for (const Ref<Project>& Project : Projects)
{
- Project->Flush();
+ WorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&WorkLatch, Project]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ Project->Flush();
+ });
}
+
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
}
void
@@ -2874,7 +3403,7 @@ ProjectStore::ScrubStorage(ScrubContext& Ctx)
for (auto& Kv : m_Projects)
{
- if (Kv.second->IsExpired(Lock, GcClock::TimePoint::min()))
+ if (Kv.second->IsExpired(GcClock::TimePoint::min()))
{
continue;
}
@@ -2912,7 +3441,7 @@ ProjectStore::GatherReferences(GcContext& GcCtx)
for (auto& Kv : m_Projects)
{
- if (Kv.second->IsExpired(Lock, GcCtx.ProjectStoreExpireTime()))
+ if (Kv.second->IsExpired(GcCtx.ProjectStoreExpireTime()))
{
ExpiredProjectCount++;
continue;
@@ -2950,7 +3479,7 @@ ProjectStore::CollectGarbage(GcContext& GcCtx)
RwLock::SharedLockScope Lock(m_ProjectsLock);
for (auto& Kv : m_Projects)
{
- if (Kv.second->IsExpired(Lock, GcCtx.ProjectStoreExpireTime()))
+ if (Kv.second->IsExpired(GcCtx.ProjectStoreExpireTime()))
{
ExpiredProjects.push_back(Kv.second);
ExpiredProjectCount++;
@@ -2971,9 +3500,8 @@ ProjectStore::CollectGarbage(GcContext& GcCtx)
{
std::vector<std::string> ExpiredOplogs;
{
- RwLock::ExclusiveLockScope _(m_ProjectsLock);
- Project->IterateOplogs([&GcCtx, &Project, &ExpiredOplogs](const RwLock::SharedLockScope& Lock, ProjectStore::Oplog& Oplog) {
- if (Project->IsExpired(Lock, GcCtx.ProjectStoreExpireTime(), Oplog))
+ Project->IterateOplogs([&GcCtx, &Project, &ExpiredOplogs](const RwLock::SharedLockScope&, ProjectStore::Oplog& Oplog) {
+ if (Project->IsExpired(GcCtx.ProjectStoreExpireTime(), Oplog))
{
ExpiredOplogs.push_back(Oplog.OplogId());
}
@@ -3000,15 +3528,12 @@ ProjectStore::CollectGarbage(GcContext& GcCtx)
std::filesystem::path PathToRemove;
std::string ProjectId;
{
+ if (!Project->IsExpired(GcCtx.ProjectStoreExpireTime()))
{
- RwLock::SharedLockScope Lock(m_ProjectsLock);
- if (!Project->IsExpired(Lock, GcCtx.ProjectStoreExpireTime()))
- {
- ZEN_DEBUG("ProjectStore::CollectGarbage skipped garbage collect of project '{}'. Project no longer expired.",
- ProjectId);
- continue;
- }
+ ZEN_DEBUG("ProjectStore::CollectGarbage skipped garbage collect of project '{}'. Project no longer expired.", ProjectId);
+ continue;
}
+
RwLock::ExclusiveLockScope _(m_ProjectsLock);
bool Success = Project->PrepareForDelete(PathToRemove);
if (!Success)
@@ -3070,16 +3595,17 @@ ProjectStore::OpenProject(std::string_view ProjectId)
{
RwLock::SharedLockScope _(m_ProjectsLock);
-
- auto ProjIt = m_Projects.find(std::string{ProjectId});
-
- if (ProjIt != m_Projects.end())
+ if (auto ProjIt = m_Projects.find(std::string{ProjectId}); ProjIt != m_Projects.end())
{
return ProjIt->second;
}
}
RwLock::ExclusiveLockScope _(m_ProjectsLock);
+ if (auto ProjIt = m_Projects.find(std::string{ProjectId}); ProjIt != m_Projects.end())
+ {
+ return ProjIt->second;
+ }
std::filesystem::path BasePath = BasePathForProject(ProjectId);
@@ -3087,7 +3613,7 @@ ProjectStore::OpenProject(std::string_view ProjectId)
{
try
{
- ZEN_INFO("opening project {} @ {}", ProjectId, BasePath);
+ ZEN_INFO("project '{}': opening project at '{}'", ProjectId, BasePath);
Ref<Project>& Prj =
m_Projects
@@ -3099,7 +3625,7 @@ ProjectStore::OpenProject(std::string_view ProjectId)
}
catch (const std::exception& e)
{
- ZEN_WARN("failed to open {} @ {} ({})", ProjectId, BasePath, e.what());
+ ZEN_WARN("project '{}': failed to open at {} ({})", ProjectId, BasePath, e.what());
m_Projects.erase(std::string{ProjectId});
}
}
@@ -3148,8 +3674,6 @@ ProjectStore::UpdateProject(std::string_view ProjectId,
{
ZEN_TRACE_CPU("Store::UpdateProject");
- ZEN_INFO("updating project {}", ProjectId);
-
RwLock::ExclusiveLockScope ProjectsLock(m_ProjectsLock);
auto ProjIt = m_Projects.find(std::string{ProjectId});
@@ -3166,6 +3690,8 @@ ProjectStore::UpdateProject(std::string_view ProjectId,
Prj->ProjectFilePath = ProjectFilePath;
Prj->Write();
+ ZEN_INFO("project '{}': updated", ProjectId);
+
return true;
}
@@ -3196,7 +3722,7 @@ ProjectStore::DeleteProject(std::string_view ProjectId)
{
ZEN_TRACE_CPU("Store::DeleteProject");
- ZEN_INFO("deleting project {}", ProjectId);
+ ZEN_INFO("project '{}': deleting", ProjectId);
std::filesystem::path DeletePath;
if (!RemoveProject(ProjectId, DeletePath))
@@ -3208,7 +3734,7 @@ ProjectStore::DeleteProject(std::string_view ProjectId)
{
if (!DeleteDirectories(DeletePath))
{
- ZEN_WARN("Failed to remove old project path '{}'", DeletePath);
+ ZEN_WARN("project '{}': failed to remove old project path '{}'", ProjectId, DeletePath);
return false;
}
}
@@ -3335,7 +3861,11 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
}
catch (const std::exception& Ex)
{
- ZEN_WARN("Failed getting project file info for id {}. Reason: '{}'", Ids[Index], Ex.what());
+ ZEN_WARN("oplog '{}/{}': failed getting project file info for id {}. Reason: '{}'",
+ ProjectId,
+ OplogId,
+ Ids[Index],
+ Ex.what());
}
return true;
},
@@ -3459,7 +3989,7 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId,
}
catch (const std::exception& Ex)
{
- ZEN_WARN("Failed getting chunk info for id {}. Reason: '{}'", Ids[Index], Ex.what());
+ ZEN_WARN("oplog '{}/{}': failed getting chunk info for id {}. Reason: '{}'", ProjectId, OplogId, Ids[Index], Ex.what());
}
return true;
},
@@ -4061,7 +4591,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
else
{
- ZEN_WARN("invalid compressed binary in cas store for {}", RawHash);
+ ZEN_WARN("oplog '{}/{}': invalid compressed binary in cas store for {}", ProjectId, OplogId, RawHash);
}
}
}
@@ -4255,7 +4785,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles;
ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles;
- ZEN_INFO("rewrote {} oplog entries (out of {})", NewOps.size(), OpCount);
+ ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount);
HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save());
return true;
@@ -4527,6 +5057,7 @@ public:
m_ProjectPathsToRemove.clear();
m_OplogPathsToRemove.clear();
+ m_OplogsToCompact.clear();
}
virtual std::string GetGcName(GcCtx&) override { return fmt::format("projectstore: '{}'", m_BasePath.string()); }
@@ -4571,7 +5102,7 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
for (auto& Kv : m_Projects)
{
Stats.CheckedCount++;
- if (Kv.second->IsExpired(Lock, Ctx.Settings.ProjectStoreExpireTime))
+ if (Kv.second->IsExpired(Ctx.Settings.ProjectStoreExpireTime))
{
ExpiredProjects.push_back(Kv.second);
continue;
@@ -4605,26 +5136,26 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
std::vector<std::string> ExpiredOplogs;
{
- Project->IterateOplogs([&Ctx, &Stats, &Project, &ExpiredOplogs, &OplogsToCompact](const RwLock::SharedLockScope& Lock,
- ProjectStore::Oplog& Oplog) {
- Stats.CheckedCount++;
- if (Project->IsExpired(Lock, Ctx.Settings.ProjectStoreExpireTime, Oplog))
- {
- ExpiredOplogs.push_back(Oplog.OplogId());
- }
- else
- {
- GcClock::TimePoint CompactExpireTime = GcClock::Now() - std::chrono::minutes(30);
- if (!Project->IsOplogTouchedSince(Lock, CompactExpireTime, Oplog.OplogId()))
+ Project->IterateOplogs(
+ [&Ctx, &Stats, &Project, &ExpiredOplogs, &OplogsToCompact](const RwLock::SharedLockScope&, ProjectStore::Oplog& Oplog) {
+ Stats.CheckedCount++;
+ if (Project->IsExpired(Ctx.Settings.ProjectStoreExpireTime, Oplog))
{
- const uint32_t CompactUnusedThreshold = 25;
- if (Oplog.GetUnusedSpacePercent() >= CompactUnusedThreshold)
+ ExpiredOplogs.push_back(Oplog.OplogId());
+ }
+ else
+ {
+ GcClock::TimePoint CompactExpireTime = GcClock::Now() - std::chrono::minutes(30);
+ if (!Project->IsOplogTouchedSince(CompactExpireTime, Oplog.OplogId()))
{
- OplogsToCompact.push_back(std::make_pair(Project->Identifier, Oplog.OplogId()));
+ const uint32_t CompactUnusedThreshold = 25;
+ if (Oplog.GetUnusedSpacePercent() >= CompactUnusedThreshold)
+ {
+ OplogsToCompact.push_back(std::make_pair(Project->Identifier, Oplog.OplogId()));
+ }
}
}
- }
- });
+ });
}
std::filesystem::path ProjectPath = BasePathForProject(Project->Identifier);
ExpiredOplogCount += ExpiredOplogs.size();
@@ -4660,8 +5191,7 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
std::string ProjectId = Project->Identifier;
{
{
- RwLock::SharedLockScope Lock(m_ProjectsLock);
- if (!Project->IsExpired(Lock, Ctx.Settings.ProjectStoreExpireTime))
+ if (!Project->IsExpired(Ctx.Settings.ProjectStoreExpireTime))
{
ZEN_DEBUG(
"GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project no longer "
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index bed3c83b7..55a3b7dee 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -84,7 +84,7 @@ public:
[[nodiscard]] static bool ExistsAt(const std::filesystem::path& BasePath);
- void Read(bool AllowCompact);
+ void Read();
void Write();
void Update(const std::filesystem::path& MarkerPath);
bool Reset();
@@ -169,6 +169,9 @@ public:
void GetAttachmentsLocked(std::vector<IoHash>& OutAttachments, bool StoreMetaDataOnDisk);
+ Project* GetOuterProject() const { return m_OuterProject; }
+ void CompactIfUnusedExceeds(bool DryRun, uint32_t CompactUnusedThreshold, std::string_view LogPrefix);
+
private:
struct FileMapEntry
{
@@ -179,12 +182,13 @@ public:
template<class V>
using OidMap = tsl::robin_map<Oid, V, Oid::Hasher>;
- Project* m_OuterProject = nullptr;
- CidStore& m_CidStore;
- std::filesystem::path m_BasePath;
- std::filesystem::path m_MarkerPath;
- std::filesystem::path m_TempPath;
- std::filesystem::path m_MetaPath;
+ Project* m_OuterProject = nullptr;
+ const std::string m_OplogId;
+ CidStore& m_CidStore;
+ const std::filesystem::path m_BasePath;
+ std::filesystem::path m_MarkerPath;
+ std::filesystem::path m_TempPath;
+ std::filesystem::path m_MetaPath;
mutable RwLock m_OplogLock;
OidMap<IoHash> m_ChunkMap; // output data chunk id -> CAS address
@@ -201,14 +205,15 @@ public:
std::unique_ptr<std::vector<IoHash>> m_CapturedAttachments;
RefPtr<OplogStorage> m_Storage;
- const std::string m_OplogId;
+ uint64_t m_LogFlushPosition = 0;
RefPtr<OplogStorage> GetStorage();
/** Scan oplog and register each entry, thus updating the in-memory tracking tables
*/
- void ReplayLog(bool AllowCompact);
uint32_t GetUnusedSpacePercentLocked() const;
+ void WriteIndexSnapshot();
+ void ReadIndexSnapshot();
struct OplogEntryMapping
{
@@ -269,16 +274,12 @@ public:
void IterateOplogs(std::function<void(const RwLock::SharedLockScope&, const Oplog&)>&& Fn) const;
void IterateOplogs(std::function<void(const RwLock::SharedLockScope&, Oplog&)>&& Fn);
std::vector<std::string> ScanForOplogs() const;
- bool IsExpired(const RwLock::SharedLockScope&, const GcClock::TimePoint ExpireTime);
- bool IsExpired(const RwLock::SharedLockScope&, const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog);
- bool IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog);
- bool IsOplogTouchedSince(const RwLock::SharedLockScope& ProjectLock,
- const GcClock::TimePoint TouchTime,
- std::string_view Oplog) const;
-
- void TouchProject() const;
- void TouchOplog(std::string_view Oplog) const;
- GcClock::TimePoint LastOplogAccessTime(std::string_view Oplog) const;
+ bool IsExpired(const GcClock::TimePoint ExpireTime) const;
+ bool IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog) const;
+ bool IsOplogTouchedSince(const GcClock::TimePoint TouchTime, std::string_view Oplog) const;
+ void TouchProject();
+ void TouchOplog(std::string_view Oplog);
+ GcClock::TimePoint LastOplogAccessTime(std::string_view Oplog) const;
Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath);
virtual ~Project();
@@ -288,7 +289,7 @@ public:
[[nodiscard]] static bool Exists(const std::filesystem::path& BasePath);
void Flush();
void ScrubStorage(ScrubContext& Ctx);
- LoggerRef Log();
+ LoggerRef Log() const;
void GatherReferences(GcContext& GcCtx);
static uint64_t TotalSize(const std::filesystem::path& BasePath);
uint64_t TotalSize() const;
@@ -307,18 +308,16 @@ public:
std::map<std::string, std::unique_ptr<Oplog>> m_Oplogs;
std::vector<std::unique_ptr<Oplog>> m_DeletedOplogs;
std::filesystem::path m_OplogStoragePath;
+ mutable RwLock m_LastAccessTimesLock;
mutable tsl::robin_map<std::string, GcClock::Tick> m_LastAccessTimes;
mutable RwLock m_UpdateCaptureLock;
uint32_t m_UpdateCaptureRefCounter = 0;
std::unique_ptr<std::vector<std::string>> m_CapturedOplogs;
- std::filesystem::path BasePathForOplog(std::string_view OplogId);
- bool IsExpired(const RwLock::SharedLockScope&,
- const std::string& EntryName,
- const std::filesystem::path& MarkerPath,
- const GcClock::TimePoint ExpireTime);
- void WriteAccessTimes();
- void ReadAccessTimes();
+ std::filesystem::path BasePathForOplog(std::string_view OplogId) const;
+ bool IsExpired(const std::string& EntryName, const std::filesystem::path& MarkerPath, const GcClock::TimePoint ExpireTime) const;
+ void WriteAccessTimes();
+ void ReadAccessTimes();
friend class ProjectStoreOplogReferenceChecker;
friend class ProjectStoreReferenceChecker;
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 287a3f7fa..e20888ae4 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -1143,6 +1143,8 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
const IterateChunksLargeSizeCallback& LargeSizeCallback,
uint64_t LargeSizeLimit)
{
+ ZEN_TRACE_CPU("BlockStore::IterateBlock");
+
if (InChunkIndexes.empty())
{
return true;
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index 9981aa1eb..ac1db2a4e 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -1035,6 +1035,8 @@ ZenCacheStore::GetBucketInfo(std::string_view NamespaceName, std::string_view Bu
std::vector<RwLock::SharedLockScope>
ZenCacheStore::LockState(GcCtx&)
{
+ ZEN_TRACE_CPU("CacheStore::LockState");
+
std::vector<RwLock::SharedLockScope> Locks;
Locks.emplace_back(RwLock::SharedLockScope(m_NamespacesLock));
for (auto& NamespaceIt : m_Namespaces)
diff --git a/src/zenstore/caslog.cpp b/src/zenstore/caslog.cpp
index 2c26e522f..139456349 100644
--- a/src/zenstore/caslog.cpp
+++ b/src/zenstore/caslog.cpp
@@ -153,13 +153,13 @@ CasLogFile::Close()
}
uint64_t
-CasLogFile::GetLogSize()
+CasLogFile::GetLogSize() const
{
return m_File.FileSize();
}
uint64_t
-CasLogFile::GetLogCount()
+CasLogFile::GetLogCount() const
{
uint64_t LogFileSize = m_AppendOffset.load(std::memory_order_acquire);
if (LogFileSize < sizeof(FileHeader))
diff --git a/src/zenstore/include/zenstore/caslog.h b/src/zenstore/include/zenstore/caslog.h
index d8c3f22f3..edb4f8d9b 100644
--- a/src/zenstore/include/zenstore/caslog.h
+++ b/src/zenstore/include/zenstore/caslog.h
@@ -26,8 +26,8 @@ public:
void Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount);
void Flush();
void Close();
- uint64_t GetLogSize();
- uint64_t GetLogCount();
+ uint64_t GetLogSize() const;
+ uint64_t GetLogCount() const;
private:
struct FileHeader