diff options
Diffstat (limited to 'zenserver/projectstore.cpp')
| -rw-r--r-- | zenserver/projectstore.cpp | 186 |
1 files changed, 181 insertions, 5 deletions
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 73d61c124..8f8b6e163 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -10,14 +10,20 @@ #include <zencore/logging.h> #include <zencore/stream.h> #include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> #include <zencore/timer.h> #include <zencore/windows.h> #include <zenstore/basicfile.h> #include <zenstore/cas.h> #include <zenstore/caslog.h> +#include "config.h" + #define USE_ROCKSDB 0 +ZEN_THIRD_PARTY_INCLUDES_START + #if USE_ROCKSDB # pragma comment(lib, "Rpcrt4.lib") // RocksDB made me do this # include <rocksdb/db.h> @@ -25,6 +31,8 @@ #include <xxh3.h> #include <asio.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + #include <latch> #include <string> @@ -165,7 +173,16 @@ struct ProjectStore::OplogStorage : public RefCounted Stopwatch Timer; + uint64_t InvalidEntries = 0; + m_Oplog.Replay([&](const zen::OplogEntry& LogEntry) { + if (LogEntry.OpCoreSize == 0) + { + ++InvalidEntries; + + return; + } + IoBuffer OpBuffer(LogEntry.OpCoreSize); const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; @@ -190,6 +207,11 @@ struct ProjectStore::OplogStorage : public RefCounted Handler(Op, LogEntry); }); + if (InvalidEntries) + { + ZEN_WARN("ignored {} zero-sized oplog entries", InvalidEntries); + } + ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()), m_MaxLsn, @@ -221,6 +243,8 @@ struct ProjectStore::OplogStorage : public RefCounted const uint64_t WriteSize = Buffer.GetSize(); const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF); + ZEN_ASSERT(WriteSize != 0); + XXH3_128Stream KeyHasher; Op["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); XXH3_128 KeyHash = KeyHasher.GetHash(); @@ -298,6 +322,36 @@ ProjectStore::Oplog::Flush() m_Storage->Flush(); } +void +ProjectStore::Oplog::Scrub(ScrubContext& Ctx) const +{ + ZEN_UNUSED(Ctx); +} + +void +ProjectStore::Oplog::GatherReferences(GcContext& GcCtx) +{ + RwLock::SharedLockScope _(m_OplogLock); + + std::vector<IoHash> Hashes; + + for (const auto& Kv : m_ChunkMap) + { + Hashes.push_back(Kv.second); + } + + GcCtx.ContributeCids(Hashes); + + Hashes.clear(); + + for (const auto& Kv : m_MetaMap) + { + Hashes.push_back(Kv.second); + } + + GcCtx.ContributeCids(Hashes); +} + bool ProjectStore::Oplog::ExistsAt(std::filesystem::path BasePath) { @@ -748,9 +802,51 @@ ProjectStore::Project::DeleteOplog(std::string_view OplogId) } void +ProjectStore::Project::DiscoverOplogs() +{ + FileSystemTraversal Traversal; + struct Visitor : public FileSystemTraversal::TreeVisitor + { + virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent, + [[maybe_unused]] const path_view& File, + [[maybe_unused]] uint64_t FileSize) override + { + } + + virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override + { + Dirs.push_back(WideToUtf8(DirectoryName)); + return false; + } + + std::vector<std::string> Dirs; + } Visit; + + Traversal.TraverseFileSystem(m_OplogStoragePath, Visit); + + for (const std::string& Dir : Visit.Dirs) + { + OpenOplog(Dir); + } +} + +void ProjectStore::Project::IterateOplogs(std::function<void(const Oplog&)>&& Fn) const { - // TODO: should iterate over oplogs which are present on disk but not yet loaded + // TODO: should also iterate over oplogs which are present on disk but not yet loaded + + RwLock::SharedLockScope _(m_ProjectLock); + + for (auto& Kv : m_Oplogs) + { + Fn(Kv.second); + } +} + +void +ProjectStore::Project::IterateOplogs(std::function<void(Oplog&)>&& Fn) +{ + // TODO: should also iterate over oplogs which are present on disk but not yet loaded RwLock::SharedLockScope _(m_ProjectLock); @@ -769,13 +865,20 @@ ProjectStore::Project::Flush() void ProjectStore::Project::Scrub(ScrubContext& Ctx) { - ZEN_UNUSED(Ctx); + IterateOplogs([&](const Oplog& Ops) { Ops.Scrub(Ctx); }); +} + +void +ProjectStore::Project::GatherReferences(GcContext& GcCtx) +{ + IterateOplogs([&](Oplog& Ops) { Ops.GatherReferences(GcCtx); }); } ////////////////////////////////////////////////////////////////////////// -ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath) -: m_Log(zen::logging::Get("project")) +ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, CasGc& Gc) +: GcContributor(Gc) +, m_Log(zen::logging::Get("project")) , m_ProjectBasePath(BasePath) , m_CidStore(Store) { @@ -795,6 +898,45 @@ ProjectStore::BasePathForProject(std::string_view ProjectId) } void +ProjectStore::DiscoverProjects() +{ + FileSystemTraversal Traversal; + struct Visitor : public FileSystemTraversal::TreeVisitor + { + virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent, + [[maybe_unused]] const path_view& File, + [[maybe_unused]] uint64_t FileSize) override + { + } + + virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override + { + Dirs.push_back(WideToUtf8(DirectoryName)); + return false; + } + + std::vector<std::string> Dirs; + } Visit; + + if (!std::filesystem::exists(m_ProjectBasePath)) + { + return; + } + + Traversal.TraverseFileSystem(m_ProjectBasePath, Visit); + + for (const auto& Dir : Visit.Dirs) + { + Project* Project = OpenProject(Dir); + + if (Project) + { + Project->DiscoverOplogs(); + } + } +} + +void ProjectStore::Flush() { // TODO @@ -818,6 +960,19 @@ ProjectStore::Scrub(ScrubContext& Ctx) } } +void +ProjectStore::GatherReferences(GcContext& GcCtx) +{ + DiscoverProjects(); + + RwLock::SharedLockScope _(m_ProjectsLock); + + for (auto& Kv : m_Projects) + { + Kv.second.GatherReferences(GcCtx); + } +} + ProjectStore::Project* ProjectStore::OpenProject(std::string_view ProjectId) { @@ -1440,7 +1595,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } - ZEN_INFO("op #{} - '{}' {} '{}/{}' ", OpLsn, Core["key"sv].AsString(), NiceBytes(Payload.Size()), ProjectId, OplogId); + ZEN_INFO("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Payload.Size()), Core["key"sv].AsString()); HttpReq.WriteResponse(HttpResponseCode::Created); }, @@ -1678,6 +1833,8 @@ HttpProjectService::HandleRequest(HttpServerRequest& Request) } } +#if ZEN_USE_NAMED_PIPES + ////////////////////////////////////////////////////////////////////////// class SecurityAttributes @@ -1957,6 +2114,25 @@ LocalProjectService::~LocalProjectService() m_Impl->Stop(); } +#endif + ////////////////////////////////////////////////////////////////////////// +#if ZEN_WITH_TESTS + +TEST_CASE("prj.store") +{ + using namespace fmt::literals; + using namespace std::literals; + + ScopedTemporaryDirectory TempDir; +} + +#endif + +void +prj_forcelink() +{ +} + } // namespace zen |