aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-08-12 19:35:18 +0200
committerGitHub Enterprise <[email protected]>2024-08-12 19:35:18 +0200
commita812b426c3047c490a5da92cd8c3c46bf65b69c1 (patch)
treefef13131f55f5d0e215a17a8340e56bee8fbee52 /src
parentSkip chunk in block stores when iterating a block if the location is out of r... (diff)
downloadzen-a812b426c3047c490a5da92cd8c3c46bf65b69c1.tar.xz
zen-a812b426c3047c490a5da92cd8c3c46bf65b69c1.zip
add compacting of oplogs as part of GC (#106)
* add compacting of oplogs as part of GC * force retain of LSN unless we have less than 16 miln entries left
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/httpprojectstore.cpp24
-rw-r--r--src/zenserver/projectstore/projectstore.cpp560
-rw-r--r--src/zenserver/projectstore/projectstore.h19
-rw-r--r--src/zenserver/vfs/vfsimpl.cpp2
4 files changed, 482 insertions, 123 deletions
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp
index 6de91cf94..28506145b 100644
--- a/src/zenserver/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/projectstore/httpprojectstore.cpp
@@ -210,7 +210,7 @@ namespace {
{
for (const std::string& OpLogId : OpLogs)
{
- ProjectStore::Oplog* Oplog = Project.OpenOplog(OpLogId);
+ ProjectStore::Oplog* Oplog = Project.OpenOplog(OpLogId, /*AllowCompact*/ false);
if (Oplog != nullptr)
{
CbWriteOplog(CidStore, *Oplog, Details, OpDetails, AttachmentDetails, Cbo);
@@ -487,7 +487,7 @@ HttpProjectService::HandleChunkBatchRequest(HttpRouterRequest& Req)
}
Project->TouchProject();
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true);
if (!FoundLog)
{
return HttpReq.WriteResponse(HttpResponseCode::NotFound);
@@ -964,7 +964,7 @@ HttpProjectService::HandleOplogOpPrepRequest(HttpRouterRequest& Req)
}
Project->TouchProject();
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true);
if (!FoundLog)
{
return HttpReq.WriteResponse(HttpResponseCode::NotFound);
@@ -1042,7 +1042,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req)
}
Project->TouchProject();
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true);
if (!FoundLog)
{
return HttpReq.WriteResponse(HttpResponseCode::NotFound);
@@ -1184,7 +1184,7 @@ HttpProjectService::HandleOpLogOpRequest(HttpRouterRequest& Req)
}
Project->TouchProject();
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true);
if (!FoundLog)
{
return HttpReq.WriteResponse(HttpResponseCode::NotFound);
@@ -1281,13 +1281,14 @@ HttpProjectService::HandleOpLogRequest(HttpRouterRequest& Req)
{
case HttpVerb::kGet:
{
- ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId, /*AllowCompact*/ true);
if (!OplogIt)
{
return HttpReq.WriteResponse(HttpResponseCode::NotFound,
HttpContentType::kText,
fmt::format("oplog {} not found in project {}", OplogId, ProjectId));
}
+
Project->TouchOplog(OplogId);
ProjectStore::Oplog& Log = *OplogIt;
@@ -1296,7 +1297,6 @@ HttpProjectService::HandleOpLogRequest(HttpRouterRequest& Req)
Cb << "id"sv << Log.OplogId() << "project"sv << Project->Identifier << "tempdir"sv << Log.TempPath().c_str()
<< "markerpath"sv << Log.MarkerPath().c_str() << "totalsize"sv << Log.TotalSize() << "opcount" << Log.OplogCount()
<< "expired"sv << Project->IsExpired(GcClock::TimePoint::min(), Log);
-
HttpReq.WriteResponse(HttpResponseCode::OK, Cb.Save());
m_ProjectStats.OpLogReadCount++;
@@ -1315,7 +1315,7 @@ HttpProjectService::HandleOpLogRequest(HttpRouterRequest& Req)
OplogMarkerPath = Params["gcpath"sv].AsString();
}
- ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId, /*AllowCompact*/ false);
if (!OplogIt)
{
if (!Project->NewOplog(OplogId, OplogMarkerPath))
@@ -1352,7 +1352,7 @@ HttpProjectService::HandleOpLogRequest(HttpRouterRequest& Req)
OplogMarkerPath = Params["gcpath"sv].AsString();
}
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false);
if (!FoundLog)
{
if (!Project->NewOplog(OplogId, OplogMarkerPath))
@@ -1420,7 +1420,7 @@ HttpProjectService::HandleOpLogEntriesRequest(HttpRouterRequest& Req)
}
Project->TouchProject();
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true);
if (!FoundLog)
{
return HttpReq.WriteResponse(HttpResponseCode::NotFound);
@@ -1880,7 +1880,7 @@ HttpProjectService::HandleOplogDetailsRequest(HttpRouterRequest& Req)
return HttpReq.WriteResponse(HttpResponseCode::NotFound);
}
- ProjectStore::Oplog* FoundLog = FoundProject->OpenOplog(OplogId);
+ ProjectStore::Oplog* FoundLog = FoundProject->OpenOplog(OplogId, /*AllowCompact*/ false);
if (!FoundLog)
{
return HttpReq.WriteResponse(HttpResponseCode::NotFound);
@@ -1935,7 +1935,7 @@ HttpProjectService::HandleOplogOpDetailsRequest(HttpRouterRequest& Req)
return HttpReq.WriteResponse(HttpResponseCode::NotFound);
}
- ProjectStore::Oplog* FoundLog = FoundProject->OpenOplog(OplogId);
+ ProjectStore::Oplog* FoundLog = FoundProject->OpenOplog(OplogId, /*AllowCompact*/ false);
if (!FoundLog)
{
return HttpReq.WriteResponse(HttpResponseCode::NotFound);
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index a6f058cf4..d41498a61 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -285,18 +285,19 @@ struct ProjectStore::OplogStorage : public RefCounted
[[nodiscard]] bool Exists() const { return Exists(m_OplogStoragePath); }
[[nodiscard]] static bool Exists(const std::filesystem::path& BasePath)
{
- return std::filesystem::exists(BasePath / "ops.zlog") && std::filesystem::exists(BasePath / "ops.zops");
+ return std::filesystem::exists(GetLogPath(BasePath)) && std::filesystem::exists(GetBlobsPath(BasePath));
}
static bool Delete(const std::filesystem::path& BasePath) { return DeleteDirectories(BasePath); }
- uint64_t OpBlobsSize() const { return OpBlobsSize(m_OplogStoragePath); }
- static uint64_t OpBlobsSize(const std::filesystem::path& BasePath)
+ uint64_t OpBlobsSize() const { return std::filesystem::file_size(GetBlobsPath()); }
+
+ uint64_t OpsSize() const { return OpsSize(m_OplogStoragePath); }
+ static uint64_t OpsSize(const std::filesystem::path& BasePath)
{
- using namespace std::literals;
if (Exists(BasePath))
{
- return std::filesystem::file_size(BasePath / "ops.zlog"sv) + std::filesystem::file_size(BasePath / "ops.zops"sv);
+ return std::filesystem::file_size(GetLogPath(BasePath)) + std::filesystem::file_size(GetBlobsPath(BasePath));
}
return 0;
}
@@ -305,8 +306,6 @@ struct ProjectStore::OplogStorage : public RefCounted
{
ZEN_TRACE_CPU("Store::OplogStorage::Open");
- using namespace std::literals;
-
ZEN_INFO("initializing oplog storage at '{}'", m_OplogStoragePath);
if (IsCreate)
@@ -315,31 +314,226 @@ struct ProjectStore::OplogStorage : public RefCounted
CreateDirectories(m_OplogStoragePath);
}
- m_Oplog.Open(m_OplogStoragePath / "ops.zlog"sv, IsCreate ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
+ m_Oplog.Open(GetLogPath(m_OplogStoragePath), IsCreate ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
m_Oplog.Initialize();
- m_OpBlobs.Open(m_OplogStoragePath / "ops.zops"sv, IsCreate ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite);
+ m_OpBlobs.Open(GetBlobsPath(m_OplogStoragePath), IsCreate ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite);
ZEN_ASSERT(IsPow2(m_OpsAlign));
ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1)));
}
+ IoBuffer GetOpBuffer(BasicFileBuffer& OpBlobsBuffer, const OplogEntry& LogEntry) const
+ {
+ const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
+ const MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreSize, OpFileOffset);
+ if (OpBufferView.GetSize() == LogEntry.OpCoreSize)
+ {
+ return IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize());
+ }
+ else
+ {
+ IoBuffer OpBuffer(LogEntry.OpCoreSize);
+ OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
+ return OpBuffer;
+ }
+ }
+
+ uint64_t GetEffectiveBlobsSize(std::span<const OplogEntryAddress> Addresses) const
+ {
+ uint64_t EffectiveSize = 0;
+ for (const OplogEntryAddress& Address : Addresses)
+ {
+ EffectiveSize += RoundUp(Address.Size, m_OpsAlign);
+ }
+ return EffectiveSize;
+ }
+
+ void Compact(
+ std::span<const uint32_t> LSNs,
+ std::function<void(const Oid& OpKeyHash, uint32_t OldLSN, uint32_t NewLSN, const OplogEntryAddress& NewAddress)>&& Callback,
+ bool RetainLSNs,
+ bool DryRun)
+ {
+ ZEN_TRACE_CPU("Store::OplogStorage::Compact()");
+
+ ZEN_INFO("compacting log for '{}'", m_OplogStoragePath);
+
+ Stopwatch Timer;
+
+ StringBuilder<64> OplogName;
+ Oid::NewOid().ToString(OplogName);
+
+ std::filesystem::path OplogPath = m_OplogStoragePath / OplogName.c_str();
+
+ std::error_code Ec;
+ TCasLogFile<OplogEntry> Oplog;
+ Oplog.Open(OplogPath, CasLogFile::Mode::kTruncate);
+ (void)Oplog.Initialize();
+
+ TemporaryFile OpBlobs;
+ OpBlobs.CreateTemporary(m_OplogStoragePath, Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to create temp file for op blob at '{}'", m_OplogStoragePath));
+ }
+
+ try
+ {
+ std::vector<OplogEntry> Ops;
+ Ops.reserve(LSNs.size());
+
+ tsl::robin_map<uint32_t, size_t> LSNToIndex;
+ LSNToIndex.reserve(LSNs.size());
+ for (uint32_t LSN : LSNs)
+ {
+ LSNToIndex[LSN] = (size_t)-1;
+ }
+
+ RwLock::ExclusiveLockScope Lock(m_RwLock);
+
+ const uint64_t SkipEntryCount = 0;
+ m_Oplog.Replay(
+ [&](const OplogEntry& LogEntry) {
+ if (auto It = LSNToIndex.find(LogEntry.OpLsn); It != LSNToIndex.end())
+ {
+ if (It->second != (size_t)-1)
+ {
+ Ops[It->second] = LogEntry;
+ }
+ else
+ {
+ LSNToIndex[LogEntry.OpLsn] = Ops.size();
+ Ops.push_back(LogEntry);
+ }
+ }
+ },
+ SkipEntryCount);
+
+ std::sort(Ops.begin(), Ops.end(), [&](const OplogEntry& Lhs, const OplogEntry& Rhs) {
+ return Lhs.OpCoreOffset < Rhs.OpCoreOffset;
+ });
+
+ std::vector<uint32_t> OldLSNs;
+ OldLSNs.reserve(Ops.size());
+
+ uint64_t OpWriteOffset = 0;
+ uint32_t MaxLSN = 0;
+ {
+ BasicFileBuffer OldBlobsBuffer(m_OpBlobs, 65536);
+ BasicFileWriter NewOpBlobsBuffer(OpBlobs, 65536);
+
+ for (OplogEntry& LogEntry : Ops)
+ {
+ OldLSNs.push_back(LogEntry.OpLsn);
+
+ IoBuffer OpBuffer = GetOpBuffer(OldBlobsBuffer, LogEntry);
+ if (RetainLSNs)
+ {
+ MaxLSN = Max(MaxLSN, LogEntry.OpLsn);
+ }
+ else
+ {
+ LogEntry.OpLsn = ++MaxLSN;
+ }
+ LogEntry.OpCoreOffset = gsl::narrow<uint32_t>(OpWriteOffset / m_OpsAlign);
+ NewOpBlobsBuffer.Write(OpBuffer.GetData(), LogEntry.OpCoreSize, OpWriteOffset);
+ OpWriteOffset = RoundUp((LogEntry.OpCoreOffset * m_OpsAlign) + LogEntry.OpCoreSize, m_OpsAlign);
+ }
+ Oplog.Append(Ops);
+ }
+
+ uint64_t OldOpLogSize = m_Oplog.GetLogSize();
+ uint64_t OldOpBlobsSize = m_OpBlobs.FileSize();
+
+ if (!DryRun)
+ {
+ m_Oplog.Close();
+ m_OpBlobs.Close();
+
+ Oplog.Close();
+ std::filesystem::rename(OplogPath, GetLogPath(), Ec);
+ if (Ec)
+ {
+ throw std::system_error(
+ Ec,
+ fmt::format("Oplog::Compact failed to rename temporary oplog blob storage file from '{}' to '{}'",
+ OplogPath,
+ GetLogPath()));
+ }
+ OpBlobs.MoveTemporaryIntoPlace(GetBlobsPath(), Ec);
+ if (Ec)
+ {
+ // We failed late - clean everything up as best we can
+ std::filesystem::remove(OpBlobs.GetPath(), Ec);
+ std::filesystem::remove(GetLogPath(), Ec);
+ std::filesystem::remove(GetBlobsPath(), Ec);
+ throw std::system_error(Ec,
+ fmt::format("Oplog::Compact failed to rename temporary oplog file from '{}' to '{}'",
+ OpBlobs.GetPath(),
+ GetBlobsPath()));
+ }
+
+ m_Oplog.Open(GetLogPath(), CasLogFile::Mode::kWrite);
+ m_Oplog.Initialize();
+
+ m_OpBlobs.Open(GetBlobsPath(), BasicFile::Mode::kWrite);
+
+ m_MaxLsn.store(MaxLSN);
+ m_NextOpsOffset.store(OpWriteOffset);
+ }
+
+ for (size_t Index = 0; Index < Ops.size(); Index++)
+ {
+ const OplogEntry& LogEntry = Ops[Index];
+ Callback(LogEntry.OpKeyHash,
+ OldLSNs[Index],
+ LogEntry.OpLsn,
+ OplogEntryAddress{.Offset = LogEntry.OpCoreOffset, .Size = LogEntry.OpCoreSize});
+ }
+
+ ZEN_INFO("oplog compact completed in {} - Max LSN# {}, New size: {}, old size {}.",
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ m_MaxLsn.load(),
+ NiceBytes(m_Oplog.GetLogSize() + m_OpBlobs.FileSize()),
+ NiceBytes(OldOpLogSize + OldOpBlobsSize));
+ }
+ catch (std::exception& /*Ex*/)
+ {
+ std::filesystem::remove(OpBlobs.GetPath(), Ec);
+ throw;
+ }
+ }
+
+ static std::filesystem::path GetLogPath(const std::filesystem::path& OplogStoragePath)
+ {
+ using namespace std::literals;
+ return OplogStoragePath / "ops.zlog"sv;
+ }
+
+ static std::filesystem::path GetBlobsPath(const std::filesystem::path& OplogStoragePath)
+ {
+ using namespace std::literals;
+ return OplogStoragePath / "ops.zops"sv;
+ }
+
+ std::filesystem::path GetLogPath() const { return GetLogPath(m_OplogStoragePath); }
+
+ std::filesystem::path GetBlobsPath() const { return GetBlobsPath(m_OplogStoragePath); }
+
void ReplayLog(std::function<void(CbObjectView, const OplogEntry&)>&& Handler)
{
ZEN_TRACE_CPU("Store::OplogStorage::ReplayLog");
- // This could use memory mapping or do something clever but for now it just reads the file sequentially
-
ZEN_INFO("replaying log for '{}'", m_OplogStoragePath);
uint64_t OpsBlockSize = m_OpBlobs.FileSize();
Stopwatch Timer;
- uint64_t InvalidEntries = 0;
-
std::vector<OplogEntry> OpLogEntries;
- std::vector<size_t> OplogOrder;
+ uint64_t InvalidEntries = 0;
+
{
tsl::robin_map<Oid, size_t, Oid::Hasher> LatestKeys;
const uint64_t SkipEntryCount = 0;
@@ -391,82 +585,59 @@ struct ProjectStore::OplogStorage : public RefCounted
{
const size_t OpIndex = OpLogEntries.size();
LatestKeys[LogEntry.OpKeyHash] = OpIndex;
- OplogOrder.push_back(OpIndex);
OpLogEntries.push_back(LogEntry);
}
},
SkipEntryCount);
}
- std::sort(OplogOrder.begin(), OplogOrder.end(), [&](size_t Lhs, size_t Rhs) {
- const OplogEntry& LhsEntry = OpLogEntries[Lhs];
- const OplogEntry& RhsEntry = OpLogEntries[Rhs];
- return LhsEntry.OpCoreOffset < RhsEntry.OpCoreOffset;
+ std::sort(OpLogEntries.begin(), OpLogEntries.end(), [&](const OplogEntry& Lhs, const OplogEntry& Rhs) {
+ return Lhs.OpCoreOffset < Rhs.OpCoreOffset;
});
uint64_t TombstoneEntries = 0;
BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
- uint32_t MaxOpLsn = m_MaxLsn.load(std::memory_order_relaxed);
- uint64_t NextOpFileOffset = m_NextOpsOffset.load(std::memory_order_relaxed);
+ uint32_t MaxOpLsn = 0;
+ uint64_t NextOpFileOffset = 0;
- for (size_t OplogOrderIndex : OplogOrder)
+ for (const OplogEntry& LogEntry : OpLogEntries)
{
- const OplogEntry& LogEntry = OpLogEntries[OplogOrderIndex];
-
if (LogEntry.IsTombstone())
{
TombstoneEntries++;
}
else
{
- // Verify checksum, ignore op data if incorrect
+ IoBuffer OpBuffer = GetOpBuffer(OpBlobsBuffer, LogEntry);
- auto VerifyAndHandleOp = [&](MemoryView OpBufferView) {
- const uint32_t ExpectedOpCoreHash = LogEntry.OpCoreHash;
- const uint32_t OpCoreHash = uint32_t(XXH3_64bits(OpBufferView.GetData(), LogEntry.OpCoreSize) & 0xffffFFFF);
+ // Verify checksum, ignore op data if incorrect
- if (OpCoreHash != ExpectedOpCoreHash)
- {
- ZEN_WARN("skipping bad checksum op - {}. Expected: {}, found: {}",
- LogEntry.OpKeyHash,
- ExpectedOpCoreHash,
- OpCoreHash);
- }
- else if (CbValidateError Err = ValidateCompactBinary(OpBufferView, CbValidateMode::Default);
- Err != CbValidateError::None)
- {
- ZEN_WARN("skipping invalid format op - {}. Error: '{}'", LogEntry.OpKeyHash, ToString(Err));
- }
- else
- {
- Handler(CbObjectView(OpBufferView.GetData()), LogEntry);
- MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn);
- const uint64_t EntryNextOpFileOffset =
- RoundUp((LogEntry.OpCoreOffset * m_OpsAlign) + LogEntry.OpCoreSize, m_OpsAlign);
- NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset);
- }
- };
+ const uint32_t ExpectedOpCoreHash = LogEntry.OpCoreHash;
+ const uint32_t OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.GetData(), LogEntry.OpCoreSize) & 0xffffFFFF);
- const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
- const MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreSize, OpFileOffset);
- if (OpBufferView.GetSize() == LogEntry.OpCoreSize)
+ if (OpCoreHash != ExpectedOpCoreHash)
{
- VerifyAndHandleOp(OpBufferView);
+ ZEN_WARN("skipping bad checksum op - {}. Expected: {}, found: {}", LogEntry.OpKeyHash, ExpectedOpCoreHash, OpCoreHash);
+ }
+ else if (CbValidateError Err = ValidateCompactBinary(OpBuffer.GetView(), CbValidateMode::Default);
+ Err != CbValidateError::None)
+ {
+ ZEN_WARN("skipping invalid format op - {}. Error: '{}'", LogEntry.OpKeyHash, ToString(Err));
}
else
{
- IoBuffer OpBuffer(LogEntry.OpCoreSize);
- OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
-
- VerifyAndHandleOp(OpBuffer);
+ Handler(CbObjectView(OpBuffer.GetData()), LogEntry);
+ MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn);
+ const uint64_t EntryNextOpFileOffset = RoundUp((LogEntry.OpCoreOffset * m_OpsAlign) + LogEntry.OpCoreSize, m_OpsAlign);
+ NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset);
}
}
}
- m_NextOpsOffset = NextOpFileOffset;
m_MaxLsn = MaxOpLsn;
+ m_NextOpsOffset = NextOpFileOffset;
ZEN_INFO("oplog replay completed in {} - Max LSN# {}, Next offset: {}, {} tombstones, {} invalid entries",
NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
@@ -540,14 +711,17 @@ struct ProjectStore::OplogStorage : public RefCounted
{
ZEN_TRACE_CPU("Store::OplogStorage::AppendOp");
- using namespace std::literals;
-
uint64_t WriteSize = OpData.Buffer.GetSize();
RwLock::ExclusiveLockScope Lock(m_RwLock);
const uint64_t WriteOffset = m_NextOpsOffset;
const uint32_t OpLsn = ++m_MaxLsn;
- m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign);
+ if (OpLsn == std::numeric_limits<uint32_t>::max())
+ {
+ ZEN_ERROR("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId());
+ throw std::runtime_error(fmt::format("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId()));
+ }
+ m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign);
Lock.ReleaseNow();
ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign));
@@ -568,8 +742,6 @@ struct ProjectStore::OplogStorage : public RefCounted
{
ZEN_TRACE_CPU("Store::OplogStorage::AppendOps");
- using namespace std::literals;
-
size_t OpCount = Ops.size();
std::vector<std::pair<uint64_t, uint64_t>> OffsetAndSizes;
std::vector<uint32_t> OpLsns;
@@ -592,7 +764,12 @@ struct ProjectStore::OplogStorage : public RefCounted
{
OffsetAndSizes[OpIndex].first = WriteOffset - WriteStart;
OpLsns[OpIndex] = ++m_MaxLsn;
- WriteOffset = RoundUp(WriteOffset + OffsetAndSizes[OpIndex].second, m_OpsAlign);
+ if (OpLsns[OpIndex] == std::numeric_limits<uint32_t>::max())
+ {
+ ZEN_ERROR("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId());
+ throw std::runtime_error(fmt::format("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId()));
+ }
+ WriteOffset = RoundUp(WriteOffset + OffsetAndSizes[OpIndex].second, m_OpsAlign);
}
WriteLength = WriteOffset - WriteStart;
m_NextOpsOffset = RoundUp(WriteOffset, m_OpsAlign);
@@ -633,8 +810,6 @@ struct ProjectStore::OplogStorage : public RefCounted
m_OpBlobs.Flush();
}
- uint32_t GetMaxLsn() const { return m_MaxLsn.load(); }
-
LoggerRef Log() { return m_OwnerOplog->Log(); }
private:
@@ -778,7 +953,7 @@ ProjectStore::Oplog::TotalSize(const std::filesystem::path& BasePath)
{
using namespace std::literals;
- uint64_t Size = OplogStorage::OpBlobsSize(BasePath);
+ uint64_t Size = OplogStorage::OpsSize(BasePath);
std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv;
if (std::filesystem::exists(StateFilePath))
{
@@ -833,7 +1008,7 @@ ProjectStore::Oplog::ExistsAt(const std::filesystem::path& BasePath)
}
void
-ProjectStore::Oplog::Read()
+ProjectStore::Oplog::Read(bool AllowCompact)
{
using namespace std::literals;
@@ -865,7 +1040,7 @@ ProjectStore::Oplog::Read()
m_OuterProject->Identifier,
StateFilePath);
}
- ReplayLog();
+ ReplayLog(AllowCompact);
}
void
@@ -932,7 +1107,7 @@ ProjectStore::Oplog::Reset()
}
void
-ProjectStore::Oplog::ReplayLog()
+ProjectStore::Oplog::ReplayLog(bool AllowCompact)
{
ZEN_LOG_SCOPE("ReplayLog '{}'", m_OplogId);
@@ -941,7 +1116,115 @@ ProjectStore::Oplog::ReplayLog()
{
return;
}
- m_Storage->ReplayLog([&](CbObjectView Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, GetMapping(Op), OpEntry); });
+
+ uint32_t MaxLSN = 0;
+ m_Storage->ReplayLog([&](CbObjectView Op, const OplogEntry& OpEntry) {
+ MaxLSN = Max(OpEntry.OpLsn, MaxLSN);
+ RegisterOplogEntry(OplogLock, GetMapping(Op), OpEntry);
+ });
+
+ if (AllowCompact)
+ {
+ const uint32_t CompactUnusedThreshold = 50;
+ uint32_t UnusedPercent = GetUnusedSpacePercentLocked();
+ if (UnusedPercent >= CompactUnusedThreshold)
+ {
+ Compact(OplogLock,
+ /*DryRun*/ false,
+ /*RetainLSNs*/ MaxLSN <=
+ 0xff000000ul, // If we have less than 16 miln entries left of our LSN range, allow renumbering of LSNs
+ fmt::format("Compact on initial open of oplog {}/{}: ", m_OuterProject->Identifier, m_OplogId));
+ }
+ }
+}
+
+uint32_t
+ProjectStore::Oplog::GetUnusedSpacePercent() const
+{
+ RwLock::SharedLockScope OplogLock(m_OplogLock);
+ return GetUnusedSpacePercentLocked();
+}
+
+uint32_t
+ProjectStore::Oplog::GetUnusedSpacePercentLocked() const
+{
+ const uint64_t ActualBlobsSize = m_Storage->OpBlobsSize();
+ if (ActualBlobsSize == 0)
+ {
+ return 0;
+ }
+
+ std::vector<OplogEntryAddress> Addresses;
+ {
+ Addresses.reserve(m_LatestOpMap.size());
+ for (auto It : m_LatestOpMap)
+ {
+ if (auto AddressIt = m_OpAddressMap.find(It.second); AddressIt != m_OpAddressMap.end())
+ {
+ Addresses.push_back(AddressIt->second);
+ }
+ }
+ }
+
+ const uint64_t EffectiveBlobsSize = m_Storage->GetEffectiveBlobsSize(std::move(Addresses));
+
+ if (EffectiveBlobsSize < ActualBlobsSize)
+ {
+ return gsl::narrow<uint32_t>((100 * (ActualBlobsSize - EffectiveBlobsSize)) / ActualBlobsSize);
+ }
+
+ return 0;
+}
+
+void
+ProjectStore::Oplog::Compact(bool DryRun, bool RetainLSNs, std::string_view LogPrefix)
+{
+ RwLock::ExclusiveLockScope Lock(m_OplogLock);
+ Compact(Lock, DryRun, RetainLSNs, LogPrefix);
+}
+
+void
+ProjectStore::Oplog::Compact(RwLock::ExclusiveLockScope&, bool DryRun, bool RetainLSNs, std::string_view LogPrefix)
+{
+ Stopwatch Timer;
+
+ std::vector<uint32_t> LSNs;
+ LSNs.reserve(m_LatestOpMap.size());
+ for (auto It : m_LatestOpMap)
+ {
+ LSNs.push_back(It.second);
+ }
+
+ tsl::robin_map<uint32_t, OplogEntryAddress> OpAddressMap; // Index LSN -> op data in ops blob file
+ OidMap<uint32_t> LatestOpMap; // op key -> latest op LSN for key
+
+ uint64_t PreSize = TotalSize();
+
+ m_Storage->Compact(
+ LSNs,
+ [&](const Oid& OpKeyHash, uint32_t, uint32_t NewLSN, const OplogEntryAddress& NewAddress) {
+ LatestOpMap.insert_or_assign(OpKeyHash, NewLSN);
+ OpAddressMap.insert_or_assign(NewLSN, NewAddress);
+ },
+ RetainLSNs,
+ /*DryRun*/ DryRun);
+
+ if (!DryRun)
+ {
+ m_OpAddressMap.swap(OpAddressMap);
+ m_LatestOpMap.swap(LatestOpMap);
+ }
+
+ uint64_t PostSize = TotalSize();
+ uint64_t FreedSize = (PreSize > PostSize) ? (PreSize - PostSize) : 0;
+
+ ZEN_INFO("{} Compacted oplog {}/{} in {}. New size: {}, freeing: {}",
+ LogPrefix,
+ m_OuterProject->Identifier,
+ m_OplogId,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ NiceBytes(PreSize),
+ NiceBytes(FreedSize));
}
IoBuffer
@@ -1960,7 +2243,7 @@ ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem:
}
ProjectStore::Oplog*
-ProjectStore::Project::OpenOplog(std::string_view OplogId)
+ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact)
{
ZEN_TRACE_CPU("Store::OpenOplog");
{
@@ -1974,7 +2257,7 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId)
}
}
- RwLock::ExclusiveLockScope _(m_ProjectLock);
+ RwLock::ExclusiveLockScope Lock(m_ProjectLock);
std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
@@ -1989,7 +2272,7 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId)
.try_emplace(std::string{OplogId},
std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, std::filesystem::path{}))
.first->second.get();
- Log->Read();
+ Log->Read(AllowCompact);
return Log;
}
@@ -2103,7 +2386,7 @@ ProjectStore::Project::ScrubStorage(ScrubContext& Ctx)
std::vector<std::string> OpLogs = ScanForOplogs();
for (const std::string& OpLogId : OpLogs)
{
- OpenOplog(OpLogId);
+ OpenOplog(OpLogId, /*AllowCompact*/ false);
}
IterateOplogs([&](const RwLock::SharedLockScope& ProjectLock, Oplog& Ops) {
if (!IsExpired(ProjectLock, GcClock::TimePoint::min(), Ops))
@@ -2127,7 +2410,7 @@ ProjectStore::Project::GatherReferences(GcContext& GcCtx)
std::vector<std::string> OpLogs = ScanForOplogs();
for (const std::string& OpLogId : OpLogs)
{
- OpenOplog(OpLogId);
+ OpenOplog(OpLogId, /*AllowCompact*/ false);
}
{
@@ -2311,6 +2594,21 @@ ProjectStore::Project::IsExpired(const RwLock::SharedLockScope& ProjectLock,
}
bool
+ProjectStore::Project::IsOplogTouchedSince(const RwLock::SharedLockScope&, const GcClock::TimePoint TouchTime, std::string_view Oplog) const
+{
+ const GcClock::Tick TouchTicks = TouchTime.time_since_epoch().count();
+
+ if (auto It = m_LastAccessTimes.find(std::string(Oplog)); It != m_LastAccessTimes.end())
+ {
+ if (It->second > TouchTicks)
+ {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool
ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog)
{
RwLock::SharedLockScope Lock(m_ProjectLock);
@@ -2829,7 +3127,7 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
}
Project->TouchProject();
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true);
if (!FoundLog)
{
return {HttpResponseCode::NotFound, fmt::format("Project files for unknown oplog '{}/{}'", ProjectId, OplogId)};
@@ -2957,7 +3255,7 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId,
}
Project->TouchProject();
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true);
if (!FoundLog)
{
return {HttpResponseCode::NotFound, fmt::format("unknown oplog '{}/{}'", ProjectId, OplogId)};
@@ -3076,7 +3374,7 @@ ProjectStore::GetChunkInfo(const std::string_view ProjectId,
}
Project->TouchProject();
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true);
if (!FoundLog)
{
return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)};
@@ -3156,7 +3454,7 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId,
}
Project->TouchProject();
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true);
if (!FoundLog)
{
return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)};
@@ -3266,7 +3564,7 @@ ProjectStore::GetChunk(const std::string_view ProjectId,
}
Project->TouchProject();
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true);
if (!FoundLog)
{
return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)};
@@ -3313,7 +3611,7 @@ ProjectStore::PutChunk(const std::string_view ProjectId,
}
Project->TouchProject();
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true);
if (!FoundLog)
{
return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown oplog '{}/{}'", ProjectId, OplogId)};
@@ -3355,7 +3653,7 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie
}
Project->TouchProject();
- ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ true);
if (!Oplog)
{
return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)};
@@ -3440,7 +3738,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
}
Project->TouchProject();
- ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ true);
if (!Oplog)
{
return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)};
@@ -3570,7 +3868,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
Project->TouchProject();
- ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ true);
if (!Oplog)
{
HttpReq.WriteResponse(HttpResponseCode::NotFound,
@@ -3990,12 +4288,16 @@ ProjectStore::GetGcName(GcCtx&)
class ProjectStoreGcStoreCompactor : public GcStoreCompactor
{
public:
- ProjectStoreGcStoreCompactor(const std::filesystem::path& BasePath,
- std::vector<std::filesystem::path>&& OplogPathsToRemove,
- std::vector<std::filesystem::path>&& ProjectPathsToRemove)
- : m_BasePath(BasePath)
+ ProjectStoreGcStoreCompactor(ProjectStore& ProjectStore,
+ const std::filesystem::path& BasePath,
+ std::vector<std::filesystem::path>&& OplogPathsToRemove,
+ std::vector<std::filesystem::path>&& ProjectPathsToRemove,
+ std::vector<std::pair<std::string, std::string>>&& OplogsToCompact)
+ : m_ProjectStore(ProjectStore)
+ , m_BasePath(BasePath)
, m_OplogPathsToRemove(std::move(OplogPathsToRemove))
, m_ProjectPathsToRemove(std::move(ProjectPathsToRemove))
+ , m_OplogsToCompact(std::move(OplogsToCompact))
{
}
@@ -4053,12 +4355,39 @@ public:
}
else
{
- ZEN_DEBUG("GCV2: projectstore [COMPACT] '{}': Skipped deleting of {} oplogs and {} projects",
+ ZEN_DEBUG("GCV2: projectstore [COMPACT] '{}': Skipped deleting and compacting of {} oplogs and {} projects",
m_BasePath,
m_OplogPathsToRemove.size(),
m_ProjectPathsToRemove.size());
}
+ {
+ for (auto It : m_OplogsToCompact)
+ {
+ const std::string& ProjectId = It.first;
+ const std::string& OplogId = It.second;
+ Ref<ProjectStore::Project> Project = m_ProjectStore.OpenProject(ProjectId);
+ if (Project)
+ {
+ ProjectStore::Oplog* OpLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false);
+ if (OpLog)
+ {
+ const uint64_t PreSize = OpLog->TotalSize();
+
+ OpLog->Compact(!Ctx.Settings.IsDeleteMode,
+ /*RetainLSNs*/ true,
+ fmt::format("GCV2: projectstore [COMPACT] '{}': ", m_BasePath));
+
+ const uint64_t PostSize = OpLog->TotalSize();
+
+ const uint64_t FreedSize = (PreSize > PostSize) ? (PreSize - PostSize) : 0;
+
+ Stats.RemovedDisk += FreedSize;
+ }
+ }
+ }
+ }
+
m_ProjectPathsToRemove.clear();
m_OplogPathsToRemove.clear();
}
@@ -4066,9 +4395,11 @@ public:
virtual std::string GetGcName(GcCtx&) override { return fmt::format("projectstore: '{}'", m_BasePath.string()); }
private:
- std::filesystem::path m_BasePath;
- std::vector<std::filesystem::path> m_OplogPathsToRemove;
- std::vector<std::filesystem::path> m_ProjectPathsToRemove;
+ ProjectStore& m_ProjectStore;
+ std::filesystem::path m_BasePath;
+ std::vector<std::filesystem::path> m_OplogPathsToRemove;
+ std::vector<std::filesystem::path> m_ProjectPathsToRemove;
+ std::vector<std::pair<std::string, std::string>> m_OplogsToCompact;
};
GcStoreCompactor*
@@ -4117,7 +4448,7 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
std::vector<std::string> OpLogs = Project->ScanForOplogs();
for (const std::string& OpLogId : OpLogs)
{
- Project->OpenOplog(OpLogId);
+ Project->OpenOplog(OpLogId, /*AllowCompact*/ false);
if (Ctx.IsCancelledFlag)
{
return nullptr;
@@ -4125,6 +4456,8 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
}
}
+ std::vector<std::pair<std::string, std::string>> OplogsToCompact;
+
size_t ExpiredOplogCount = 0;
for (const Ref<Project>& Project : Projects)
{
@@ -4135,14 +4468,26 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
std::vector<std::string> ExpiredOplogs;
{
- Project->IterateOplogs(
- [&Ctx, &Stats, &Project, &ExpiredOplogs](const RwLock::SharedLockScope& Lock, ProjectStore::Oplog& Oplog) {
- Stats.CheckedCount++;
- if (Project->IsExpired(Lock, Ctx.Settings.ProjectStoreExpireTime, Oplog))
+ Project->IterateOplogs([&Ctx, &Stats, &Project, &ExpiredOplogs, &OplogsToCompact](const RwLock::SharedLockScope& Lock,
+ ProjectStore::Oplog& Oplog) {
+ Stats.CheckedCount++;
+ if (Project->IsExpired(Lock, Ctx.Settings.ProjectStoreExpireTime, Oplog))
+ {
+ ExpiredOplogs.push_back(Oplog.OplogId());
+ }
+ else
+ {
+ GcClock::TimePoint CompactExpireTime = GcClock::Now() - std::chrono::minutes(30);
+ if (!Project->IsOplogTouchedSince(Lock, CompactExpireTime, Oplog.OplogId()))
{
- ExpiredOplogs.push_back(Oplog.OplogId());
+ const uint32_t CompactUnusedThreshold = 25;
+ if (Oplog.GetUnusedSpacePercent() >= CompactUnusedThreshold)
+ {
+ OplogsToCompact.push_back(std::make_pair(Project->Identifier, Oplog.OplogId()));
+ }
}
- });
+ }
+ });
}
std::filesystem::path ProjectPath = BasePathForProject(Project->Identifier);
ExpiredOplogCount += ExpiredOplogs.size();
@@ -4164,9 +4509,10 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
}
}
- if (ExpiredProjects.empty() && ExpiredOplogCount == 0)
+ if (ExpiredProjects.empty() && ExpiredOplogCount == 0 && OplogsToCompact.empty())
{
- ZEN_DEBUG("GCV2: projectstore [REMOVE EXPIRED] '{}': no expired projects found", m_ProjectBasePath);
+ ZEN_DEBUG("GCV2: projectstore [REMOVE EXPIRED] '{}': no expired projects, expired oplogs or oplogs to compact found",
+ m_ProjectBasePath);
return nullptr;
}
@@ -4209,9 +4555,13 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
size_t ExpiredProjectCount = ExpiredProjects.size();
Stats.FoundCount += ExpiredOplogCount + ExpiredProjectCount;
- if (!OplogPathsToRemove.empty() || !ProjectPathsToRemove.empty())
+ if (!OplogPathsToRemove.empty() || !ProjectPathsToRemove.empty() || !OplogsToCompact.empty())
{
- return new ProjectStoreGcStoreCompactor(m_ProjectBasePath, std::move(OplogPathsToRemove), std::move(ProjectPathsToRemove));
+ return new ProjectStoreGcStoreCompactor(*this,
+ m_ProjectBasePath,
+ std::move(OplogPathsToRemove),
+ std::move(ProjectPathsToRemove),
+ std::move(OplogsToCompact));
}
return nullptr;
}
@@ -4693,7 +5043,7 @@ TEST_CASE("project.store.lifetimes")
std::filesystem::path DeletePath;
CHECK(Project->PrepareForDelete(DeletePath));
CHECK(!DeletePath.empty());
- CHECK(Project->OpenOplog("oplog1") == nullptr);
+ CHECK(Project->OpenOplog("oplog1", /*AllowCompact*/ false) == nullptr);
// Oplog is now invalid, but pointer can still be accessed since we store old oplog pointers
CHECK(Oplog->OplogCount() == 0);
// Project is still valid since we have a Ref to it
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 036c17106..bb042e331 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -79,7 +79,7 @@ public:
[[nodiscard]] static bool ExistsAt(const std::filesystem::path& BasePath);
- void Read();
+ void Read(bool AllowCompact);
void Write();
void Update(const std::filesystem::path& MarkerPath);
bool Reset();
@@ -159,6 +159,9 @@ public:
std::vector<IoHash> GetCapturedAttachments();
RwLock::SharedLockScope GetGcReferencerLock() { return RwLock::SharedLockScope(m_OplogLock); }
+ uint32_t GetUnusedSpacePercent() const;
+ void Compact(bool DryRun, bool RetainLSNs, std::string_view LogPrefix);
+
private:
struct FileMapEntry
{
@@ -195,7 +198,8 @@ public:
/** Scan oplog and register each entry, thus updating the in-memory tracking tables
*/
- void ReplayLog();
+ void ReplayLog(bool AllowCompact);
+ uint32_t GetUnusedSpacePercentLocked() const;
struct OplogEntryMapping
{
@@ -235,6 +239,7 @@ public:
std::string_view ClientPath);
void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash);
void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash);
+ void Compact(RwLock::ExclusiveLockScope& Lock, bool DryRun, bool RetainLSNs, std::string_view LogPrefix);
friend class ProjectStoreOplogReferenceChecker;
friend class ProjectStoreReferenceChecker;
@@ -249,15 +254,19 @@ public:
std::filesystem::path ProjectFilePath;
Oplog* NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath);
- Oplog* OpenOplog(std::string_view OplogId);
+ Oplog* OpenOplog(std::string_view OplogId, bool AllowCompact);
bool DeleteOplog(std::string_view OplogId);
bool RemoveOplog(std::string_view OplogId, std::filesystem::path& OutDeletePath);
void IterateOplogs(std::function<void(const RwLock::SharedLockScope&, const Oplog&)>&& Fn) const;
void IterateOplogs(std::function<void(const RwLock::SharedLockScope&, Oplog&)>&& Fn);
std::vector<std::string> ScanForOplogs() const;
bool IsExpired(const RwLock::SharedLockScope&, const GcClock::TimePoint ExpireTime);
- bool IsExpired(const RwLock::SharedLockScope&, const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog);
- bool IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog);
+ bool IsExpired(const RwLock::SharedLockScope&, const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog);
+ bool IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog);
+ bool IsOplogTouchedSince(const RwLock::SharedLockScope& ProjectLock,
+ const GcClock::TimePoint TouchTime,
+ std::string_view Oplog) const;
+
void TouchProject() const;
void TouchOplog(std::string_view Oplog) const;
GcClock::TimePoint LastOplogAccessTime(std::string_view Oplog) const;
diff --git a/src/zenserver/vfs/vfsimpl.cpp b/src/zenserver/vfs/vfsimpl.cpp
index 5c9f32c69..292fce9eb 100644
--- a/src/zenserver/vfs/vfsimpl.cpp
+++ b/src/zenserver/vfs/vfsimpl.cpp
@@ -362,7 +362,7 @@ VfsServiceDataSource::PopulateDirectory(std::string NodePath, VfsTreeNode& DirNo
// Oplog contents enumeration
- if (ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId))
+ if (ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId, false))
{
Ref<VfsOplogDataSource> DataSource = GetOplogDataSource(ProjectId, OplogId);