aboutsummaryrefslogtreecommitdiff
path: root/zenserver/projectstore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/projectstore.cpp')
-rw-r--r--zenserver/projectstore.cpp186
1 files changed, 181 insertions, 5 deletions
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 73d61c124..8f8b6e163 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -10,14 +10,20 @@
#include <zencore/logging.h>
#include <zencore/stream.h>
#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
#include <zencore/timer.h>
#include <zencore/windows.h>
#include <zenstore/basicfile.h>
#include <zenstore/cas.h>
#include <zenstore/caslog.h>
+#include "config.h"
+
#define USE_ROCKSDB 0
+ZEN_THIRD_PARTY_INCLUDES_START
+
#if USE_ROCKSDB
# pragma comment(lib, "Rpcrt4.lib") // RocksDB made me do this
# include <rocksdb/db.h>
@@ -25,6 +31,8 @@
#include <xxh3.h>
#include <asio.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
#include <latch>
#include <string>
@@ -165,7 +173,16 @@ struct ProjectStore::OplogStorage : public RefCounted
Stopwatch Timer;
+ uint64_t InvalidEntries = 0;
+
m_Oplog.Replay([&](const zen::OplogEntry& LogEntry) {
+ if (LogEntry.OpCoreSize == 0)
+ {
+ ++InvalidEntries;
+
+ return;
+ }
+
IoBuffer OpBuffer(LogEntry.OpCoreSize);
const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
@@ -190,6 +207,11 @@ struct ProjectStore::OplogStorage : public RefCounted
Handler(Op, LogEntry);
});
+ if (InvalidEntries)
+ {
+ ZEN_WARN("ignored {} zero-sized oplog entries", InvalidEntries);
+ }
+
ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}",
NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
m_MaxLsn,
@@ -221,6 +243,8 @@ struct ProjectStore::OplogStorage : public RefCounted
const uint64_t WriteSize = Buffer.GetSize();
const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF);
+ ZEN_ASSERT(WriteSize != 0);
+
XXH3_128Stream KeyHasher;
Op["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
XXH3_128 KeyHash = KeyHasher.GetHash();
@@ -298,6 +322,36 @@ ProjectStore::Oplog::Flush()
m_Storage->Flush();
}
+void
+ProjectStore::Oplog::Scrub(ScrubContext& Ctx) const
+{
+ ZEN_UNUSED(Ctx);
+}
+
+void
+ProjectStore::Oplog::GatherReferences(GcContext& GcCtx)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+
+ std::vector<IoHash> Hashes;
+
+ for (const auto& Kv : m_ChunkMap)
+ {
+ Hashes.push_back(Kv.second);
+ }
+
+ GcCtx.ContributeCids(Hashes);
+
+ Hashes.clear();
+
+ for (const auto& Kv : m_MetaMap)
+ {
+ Hashes.push_back(Kv.second);
+ }
+
+ GcCtx.ContributeCids(Hashes);
+}
+
bool
ProjectStore::Oplog::ExistsAt(std::filesystem::path BasePath)
{
@@ -748,9 +802,51 @@ ProjectStore::Project::DeleteOplog(std::string_view OplogId)
}
void
+ProjectStore::Project::DiscoverOplogs()
+{
+ FileSystemTraversal Traversal;
+ struct Visitor : public FileSystemTraversal::TreeVisitor
+ {
+ virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent,
+ [[maybe_unused]] const path_view& File,
+ [[maybe_unused]] uint64_t FileSize) override
+ {
+ }
+
+ virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override
+ {
+ Dirs.push_back(WideToUtf8(DirectoryName));
+ return false;
+ }
+
+ std::vector<std::string> Dirs;
+ } Visit;
+
+ Traversal.TraverseFileSystem(m_OplogStoragePath, Visit);
+
+ for (const std::string& Dir : Visit.Dirs)
+ {
+ OpenOplog(Dir);
+ }
+}
+
+void
ProjectStore::Project::IterateOplogs(std::function<void(const Oplog&)>&& Fn) const
{
- // TODO: should iterate over oplogs which are present on disk but not yet loaded
+ // TODO: should also iterate over oplogs which are present on disk but not yet loaded
+
+ RwLock::SharedLockScope _(m_ProjectLock);
+
+ for (auto& Kv : m_Oplogs)
+ {
+ Fn(Kv.second);
+ }
+}
+
+void
+ProjectStore::Project::IterateOplogs(std::function<void(Oplog&)>&& Fn)
+{
+ // TODO: should also iterate over oplogs which are present on disk but not yet loaded
RwLock::SharedLockScope _(m_ProjectLock);
@@ -769,13 +865,20 @@ ProjectStore::Project::Flush()
void
ProjectStore::Project::Scrub(ScrubContext& Ctx)
{
- ZEN_UNUSED(Ctx);
+ IterateOplogs([&](const Oplog& Ops) { Ops.Scrub(Ctx); });
+}
+
+void
+ProjectStore::Project::GatherReferences(GcContext& GcCtx)
+{
+ IterateOplogs([&](Oplog& Ops) { Ops.GatherReferences(GcCtx); });
}
//////////////////////////////////////////////////////////////////////////
-ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath)
-: m_Log(zen::logging::Get("project"))
+ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, CasGc& Gc)
+: GcContributor(Gc)
+, m_Log(zen::logging::Get("project"))
, m_ProjectBasePath(BasePath)
, m_CidStore(Store)
{
@@ -795,6 +898,45 @@ ProjectStore::BasePathForProject(std::string_view ProjectId)
}
void
+ProjectStore::DiscoverProjects()
+{
+ FileSystemTraversal Traversal;
+ struct Visitor : public FileSystemTraversal::TreeVisitor
+ {
+ virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent,
+ [[maybe_unused]] const path_view& File,
+ [[maybe_unused]] uint64_t FileSize) override
+ {
+ }
+
+ virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override
+ {
+ Dirs.push_back(WideToUtf8(DirectoryName));
+ return false;
+ }
+
+ std::vector<std::string> Dirs;
+ } Visit;
+
+ if (!std::filesystem::exists(m_ProjectBasePath))
+ {
+ return;
+ }
+
+ Traversal.TraverseFileSystem(m_ProjectBasePath, Visit);
+
+ for (const auto& Dir : Visit.Dirs)
+ {
+ Project* Project = OpenProject(Dir);
+
+ if (Project)
+ {
+ Project->DiscoverOplogs();
+ }
+ }
+}
+
+void
ProjectStore::Flush()
{
// TODO
@@ -818,6 +960,19 @@ ProjectStore::Scrub(ScrubContext& Ctx)
}
}
+void
+ProjectStore::GatherReferences(GcContext& GcCtx)
+{
+ DiscoverProjects();
+
+ RwLock::SharedLockScope _(m_ProjectsLock);
+
+ for (auto& Kv : m_Projects)
+ {
+ Kv.second.GatherReferences(GcCtx);
+ }
+}
+
ProjectStore::Project*
ProjectStore::OpenProject(std::string_view ProjectId)
{
@@ -1440,7 +1595,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
}
- ZEN_INFO("op #{} - '{}' {} '{}/{}' ", OpLsn, Core["key"sv].AsString(), NiceBytes(Payload.Size()), ProjectId, OplogId);
+ ZEN_INFO("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Payload.Size()), Core["key"sv].AsString());
HttpReq.WriteResponse(HttpResponseCode::Created);
},
@@ -1678,6 +1833,8 @@ HttpProjectService::HandleRequest(HttpServerRequest& Request)
}
}
+#if ZEN_USE_NAMED_PIPES
+
//////////////////////////////////////////////////////////////////////////
class SecurityAttributes
@@ -1957,6 +2114,25 @@ LocalProjectService::~LocalProjectService()
m_Impl->Stop();
}
+#endif
+
//////////////////////////////////////////////////////////////////////////
+#if ZEN_WITH_TESTS
+
+TEST_CASE("prj.store")
+{
+ using namespace fmt::literals;
+ using namespace std::literals;
+
+ ScopedTemporaryDirectory TempDir;
+}
+
+#endif
+
+void
+prj_forcelink()
+{
+}
+
} // namespace zen