aboutsummaryrefslogtreecommitdiff
path: root/zenserver/projectstore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/projectstore.cpp')
-rw-r--r--zenserver/projectstore.cpp212
1 files changed, 183 insertions, 29 deletions
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 3e242653a..5c7de2a43 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -274,10 +274,15 @@ private:
//////////////////////////////////////////////////////////////////////////
-ProjectStore::Oplog::Oplog(std::string_view Id, Project* Project, CidStore& Store, std::filesystem::path BasePath)
+ProjectStore::Oplog::Oplog(std::string_view Id,
+ Project* Project,
+ CidStore& Store,
+ std::filesystem::path BasePath,
+ const std::filesystem::path& MarkerPath)
: m_OuterProject(Project)
, m_CidStore(Store)
, m_BasePath(BasePath)
+, m_MarkerPath(MarkerPath)
, m_OplogId(Id)
{
using namespace std::literals;
@@ -348,6 +353,16 @@ ProjectStore::Oplog::TotalSize() const
return 0;
}
+bool
+ProjectStore::Oplog::IsExpired() const
+{
+ if (m_MarkerPath.empty())
+ {
+ return false;
+ }
+ return !std::filesystem::exists(m_MarkerPath);
+}
+
std::filesystem::path
ProjectStore::Oplog::PrepareForDelete(bool MoveFolder)
{
@@ -373,19 +388,90 @@ ProjectStore::Oplog::PrepareForDelete(bool MoveFolder)
bool
ProjectStore::Oplog::ExistsAt(std::filesystem::path BasePath)
{
- return OplogStorage::Exists(BasePath);
+ using namespace std::literals;
+
+ std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv;
+ return std::filesystem::is_regular_file(StateFilePath);
+}
+
+void
+ProjectStore::Oplog::Read()
+{
+ using namespace std::literals;
+
+ std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv;
+ if (std::filesystem::is_regular_file(StateFilePath))
+ {
+ ZEN_INFO("reading config for oplog '{}' in project '{}' from {}", m_OplogId, m_OuterProject->Identifier, StateFilePath);
+
+ BasicFile Blob;
+ Blob.Open(StateFilePath, BasicFile::Mode::kRead);
+
+ IoBuffer Obj = Blob.ReadAll();
+ CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All);
+
+ if (ValidationError != CbValidateError::None)
+ {
+ ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), StateFilePath);
+ return;
+ }
+
+ CbObject Cfg = LoadCompactBinaryObject(Obj);
+
+ m_MarkerPath = Cfg["gcpath"sv].AsString();
+ }
+ else
+ {
+ ZEN_INFO("config for oplog '{}' in project '{}' not found at {}. Assuming legacy store",
+ m_OplogId,
+ m_OuterProject->Identifier,
+ StateFilePath);
+ }
+ ReplayLog();
+}
+
+void
+ProjectStore::Oplog::Write()
+{
+ using namespace std::literals;
+
+ BinaryWriter Mem;
+
+ CbObjectWriter Cfg;
+
+ Cfg << "gcpath"sv << PathToUtf8(m_MarkerPath);
+
+ Cfg.Save(Mem);
+
+ std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv;
+
+ ZEN_INFO("persisting config for oplog '{}' in project '{}' to {}", m_OplogId, m_OuterProject->Identifier, StateFilePath);
+
+ BasicFile Blob;
+ Blob.Open(StateFilePath, BasicFile::Mode::kTruncate);
+ Blob.Write(Mem.Data(), Mem.Size(), 0);
+ Blob.Flush();
}
void
ProjectStore::Oplog::ReplayLog()
{
- m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(Op, OpEntry, kUpdateReplay); });
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ if (!m_Storage)
+ {
+ return;
+ }
+ m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, Op, OpEntry, kUpdateReplay); });
}
IoBuffer
ProjectStore::Oplog::FindChunk(Oid ChunkId)
{
RwLock::SharedLockScope OplogLock(m_OplogLock);
+ if (!m_Storage)
+ {
+ return IoBuffer{};
+ }
if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end())
{
@@ -429,6 +515,10 @@ ProjectStore::Oplog::IterateFileMap(
std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn)
{
RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return;
+ }
for (const auto& Kv : m_FileMap)
{
@@ -440,6 +530,10 @@ void
ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler)
{
RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return;
+ }
std::vector<OplogEntryAddress> Entries;
Entries.reserve(m_LatestOpMap.size());
@@ -463,6 +557,10 @@ std::optional<CbObject>
ProjectStore::Oplog::GetOpByKey(const Oid& Key)
{
RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return {};
+ }
if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end())
{
@@ -479,6 +577,10 @@ std::optional<CbObject>
ProjectStore::Oplog::GetOpByIndex(int Index)
{
RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return {};
+ }
if (const auto AddressEntryIt = m_OpAddressMap.find(Index); AddressEntryIt != m_OpAddressMap.end())
{
@@ -532,7 +634,10 @@ ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid Chunk
}
uint32_t
-ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry, UpdateType TypeOfUpdate)
+ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
+ CbObject Core,
+ const OplogEntry& OpEntry,
+ UpdateType TypeOfUpdate)
{
ZEN_TRACE_CPU("ProjectStore::Oplog::RegisterOplogEntry");
@@ -541,8 +646,6 @@ ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry
// For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing
// too many problems. Longer term we'll probably want to ensure we can do concurrent updates however
- RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
-
using namespace std::literals;
// Update chunk id maps
@@ -624,12 +727,15 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
{
ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry");
- ZEN_ASSERT(m_Storage);
-
using namespace std::literals;
const CbObject& Core = OpPackage.GetObject();
const uint32_t EntryId = AppendNewOplogEntry(Core);
+ if (EntryId == 0xffffffffu)
+ {
+ // The oplog has been deleted so just drop this
+ return EntryId;
+ }
// Persist attachments after oplog entry so GC won't find attachments without references
@@ -663,12 +769,16 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core)
{
ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry");
- ZEN_ASSERT(m_Storage);
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ if (!m_Storage)
+ {
+ return 0xffffffffu;
+ }
using namespace std::literals;
const OplogEntry OpEntry = m_Storage->AppendOp(Core);
- const uint32_t EntryId = RegisterOplogEntry(Core, OpEntry, kUpdateNewEntry);
+ const uint32_t EntryId = RegisterOplogEntry(OplogLock, Core, OpEntry, kUpdateNewEntry);
return EntryId;
}
@@ -764,7 +874,7 @@ ProjectStore::Project::BasePathForOplog(std::string_view OplogId)
}
ProjectStore::Oplog*
-ProjectStore::Project::NewOplog(std::string_view OplogId)
+ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath)
{
RwLock::ExclusiveLockScope _(m_ProjectLock);
@@ -772,10 +882,12 @@ ProjectStore::Project::NewOplog(std::string_view OplogId)
try
{
- Oplog* Log =
- m_Oplogs.try_emplace(std::string{OplogId}, std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath))
- .first->second.get();
+ Oplog* Log = m_Oplogs
+ .try_emplace(std::string{OplogId},
+ std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, MarkerPath))
+ .first->second.get();
+ Log->Write();
return Log;
}
catch (std::exception&)
@@ -815,10 +927,11 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId)
try
{
Oplog* Log =
- m_Oplogs.try_emplace(std::string{OplogId}, std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath))
+ m_Oplogs
+ .try_emplace(std::string{OplogId},
+ std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, std::filesystem::path{}))
.first->second.get();
-
- Log->ReplayLog();
+ Log->Read();
return Log;
}
@@ -910,7 +1023,12 @@ ProjectStore::Project::Scrub(ScrubContext& Ctx)
{
OpenOplog(OpLogId);
}
- IterateOplogs([&](const Oplog& Ops) { Ops.Scrub(Ctx); });
+ IterateOplogs([&](const Oplog& Ops) {
+ if (!Ops.IsExpired())
+ {
+ Ops.Scrub(Ctx);
+ }
+ });
}
void
@@ -929,7 +1047,12 @@ ProjectStore::Project::GatherReferences(GcContext& GcCtx)
{
OpenOplog(OpLogId);
}
- IterateOplogs([&](Oplog& Ops) { Ops.GatherReferences(GcCtx); });
+ IterateOplogs([&](Oplog& Ops) {
+ if (!Ops.IsExpired())
+ {
+ Ops.GatherReferences(GcCtx);
+ }
+ });
}
uint64_t
@@ -1133,6 +1256,7 @@ ProjectStore::CollectGarbage(GcContext& GcCtx)
ExpiredProjectCount);
});
std::vector<Ref<Project>> ExpiredProjects;
+ std::vector<Ref<Project>> Projects;
{
RwLock::SharedLockScope _(m_ProjectsLock);
@@ -1144,19 +1268,41 @@ ProjectStore::CollectGarbage(GcContext& GcCtx)
ExpiredProjectCount++;
continue;
}
+ Projects.push_back(Kv.second);
ProjectCount++;
}
}
- if (ExpiredProjects.empty())
+ if (!GcCtx.IsDeletionMode())
{
- ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired projects found", m_ProjectBasePath.string());
+ ZEN_DEBUG("garbage collect DISABLED, for '{}' ", m_ProjectBasePath.string());
return;
}
- if (!GcCtx.IsDeletionMode())
+ for (const Ref<Project>& Project : Projects)
{
- ZEN_DEBUG("garbage collect DISABLED, for '{}' ", m_ProjectBasePath.string());
+ std::vector<std::string> ExpiredOplogs;
+ {
+ RwLock::ExclusiveLockScope _(m_ProjectsLock);
+ Project->IterateOplogs([&ExpiredOplogs](ProjectStore::Oplog& Oplog) {
+ if (Oplog.IsExpired())
+ {
+ ExpiredOplogs.push_back(Oplog.OplogId());
+ }
+ });
+ }
+ for (const std::string& OplogId : ExpiredOplogs)
+ {
+ ZEN_DEBUG("ProjectStore::CollectGarbage garbage collected oplog '{}' in project '{}'. Removing storage on disk",
+ OplogId,
+ Project->Identifier);
+ Project->DeleteOplog(OplogId);
+ }
+ }
+
+ if (ExpiredProjects.empty())
+ {
+ ZEN_DEBUG("garbage collect for '{}', no expired projects found", m_ProjectBasePath.string());
return;
}
@@ -2347,17 +2493,25 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
case HttpVerb::kPost:
{
+ std::filesystem::path OplogMarkerPath;
+ IoBuffer Payload = Req.ServerRequest().ReadPayload();
+ if (Payload.GetSize() > 0)
+ {
+ CbObject Params = LoadCompactBinaryObject(Payload);
+ OplogMarkerPath = Params["gcpath"sv].AsString();
+ }
+
ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId);
if (!OplogIt)
{
- if (!Project->NewOplog(OplogId))
+ if (!Project->NewOplog(OplogId, OplogMarkerPath))
{
// TODO: indicate why the operation failed!
return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError);
}
- ZEN_INFO("established oplog '{}/{}'", ProjectId, OplogId);
+ ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath);
return Req.ServerRequest().WriteResponse(HttpResponseCode::Created);
}
@@ -2683,7 +2837,7 @@ TEST_CASE("project.store.lifetimes")
EngineRootDir.string(),
ProjectRootDir.string(),
ProjectFilePath.string()));
- ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1");
+ ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {});
CHECK(Oplog != nullptr);
std::filesystem::path DeletePath;
@@ -2736,7 +2890,7 @@ TEST_CASE("project.store.gc")
EngineRootDir.string(),
Project1RootDir.string(),
Project1FilePath.string()));
- ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1");
+ ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1", {});
CHECK(Oplog != nullptr);
Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {}));
@@ -2752,7 +2906,7 @@ TEST_CASE("project.store.gc")
EngineRootDir.string(),
Project2RootDir.string(),
Project2FilePath.string()));
- ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog1");
+ ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog1", {});
CHECK(Oplog != nullptr);
Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {}));
@@ -2821,7 +2975,7 @@ TEST_CASE("project.store.partial.read")
EngineRootDir.string(),
Project1RootDir.string(),
Project1FilePath.string()));
- ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv);
+ ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, {});
CHECK(Oplog != nullptr);
Attachments[OpIds[0]] = {};
Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77});