// Copyright Epic Games, Inc. All Rights Reserved. #include "projectstore.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "config.h" #if ZEN_PLATFORM_WINDOWS # include #endif #define USE_ROCKSDB 0 ZEN_THIRD_PARTY_INCLUDES_START #if USE_ROCKSDB # pragma comment(lib, "Rpcrt4.lib") // RocksDB made me do this # include #endif #include #include ZEN_THIRD_PARTY_INCLUDES_END #include #include namespace zen { #if USE_ROCKSDB namespace rocksdb = ROCKSDB_NAMESPACE; #endif ////////////////////////////////////////////////////////////////////////// Oid OpKeyStringAsOId(std::string_view OpKey) { CbObjectWriter Writer; Writer << "key" << OpKey; XXH3_128Stream KeyHasher; Writer.Save()["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); XXH3_128 KeyHash = KeyHasher.GetHash(); Oid OpId; memcpy(OpId.OidBits, &KeyHash, sizeof(OpId.OidBits)); return OpId; } ////////////////////////////////////////////////////////////////////////// struct ProjectStore::OplogStorage : public RefCounted { OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath) { } ~OplogStorage() { ZEN_INFO("closing oplog storage at {}", m_OplogStoragePath); Flush(); #if USE_ROCKSDB if (m_RocksDb) { // Column families must be torn down before database is closed for (const auto& Handle : m_RocksDbColumnHandles) { m_RocksDb->DestroyColumnFamilyHandle(Handle); } rocksdb::Status Status = m_RocksDb->Close(); if (!Status.ok()) { ZEN_WARN("db close error reported for '{}' : '{}'", m_OplogStoragePath, Status.getState()); } } #endif } [[nodiscard]] bool Exists() { return Exists(m_OplogStoragePath); } [[nodiscard]] static bool Exists(std::filesystem::path BasePath) { return std::filesystem::exists(BasePath / "ops.zlog") && std::filesystem::exists(BasePath / "ops.zops"); } static bool Delete(std::filesystem::path BasePath) { return DeleteDirectories(BasePath); } void Open(bool IsCreate) { ZEN_INFO("initializing oplog storage at '{}'", m_OplogStoragePath); if (IsCreate) { DeleteDirectories(m_OplogStoragePath); CreateDirectories(m_OplogStoragePath); } m_Oplog.Open(m_OplogStoragePath / "ops.zlog", IsCreate ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); m_Oplog.Initialize(); m_OpBlobs.Open(m_OplogStoragePath / "ops.zops", IsCreate ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); ZEN_ASSERT(IsPow2(m_OpsAlign)); ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1))); #if USE_ROCKSDB { std::string RocksdbPath = PathToUtf8(m_OplogStoragePath / "ops.rdb"); ZEN_DEBUG("opening rocksdb db at '{}'", RocksdbPath); rocksdb::DB* Db; rocksdb::DBOptions Options; Options.create_if_missing = true; std::vector ExistingColumnFamilies; rocksdb::Status Status = rocksdb::DB::ListColumnFamilies(Options, RocksdbPath, &ExistingColumnFamilies); std::vector ColumnDescriptors; if (Status.IsPathNotFound()) { ColumnDescriptors.emplace_back(rocksdb::ColumnFamilyDescriptor{rocksdb::kDefaultColumnFamilyName, {}}); } else if (Status.ok()) { for (const std::string& Column : ExistingColumnFamilies) { rocksdb::ColumnFamilyDescriptor ColumnFamily; ColumnFamily.name = Column; ColumnDescriptors.push_back(ColumnFamily); } } else { throw std::runtime_error( fmt::format("column family iteration failed for '{}': '{}'", RocksdbPath, Status.getState()).c_str()); } Status = rocksdb::DB::Open(Options, RocksdbPath, ColumnDescriptors, &m_RocksDbColumnHandles, &Db); if (!Status.ok()) { throw std::runtime_error(fmt::format("database open failed for '{}': '{}'", RocksdbPath, Status.getState()).c_str()); } m_RocksDb.reset(Db); } #endif } void ReplayLog(std::function&& Handler) { ZEN_TRACE_CPU("ProjectStore::OplogStorage::ReplayLog"); // This could use memory mapping or do something clever but for now it just reads the file sequentially ZEN_INFO("replaying log for '{}'", m_OplogStoragePath); Stopwatch Timer; uint64_t InvalidEntries = 0; m_Oplog.Replay( [&](const zen::OplogEntry& LogEntry) { if (LogEntry.OpCoreSize == 0) { ++InvalidEntries; return; } IoBuffer OpBuffer(LogEntry.OpCoreSize); const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); // Verify checksum, ignore op data if incorrect const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), OpBuffer.Size()) & 0xffffFFFF); if (OpCoreHash != LogEntry.OpCoreHash) { ZEN_WARN("skipping oplog entry with bad checksum!"); return; } CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), OpBuffer.Size())); m_NextOpsOffset = Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn); Handler(Op, LogEntry); }, 0); if (InvalidEntries) { ZEN_WARN("ignored {} zero-sized oplog entries", InvalidEntries); } ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()), m_MaxLsn, m_NextOpsOffset); } void ReplayLog(const std::vector& Entries, std::function&& Handler) { for (const OplogEntryAddress& Entry : Entries) { CbObject Op = GetOp(Entry); Handler(Op); } } CbObject GetOp(const OplogEntryAddress& Entry) { IoBuffer OpBuffer(Entry.Size); const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign; m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset); return CbObject(SharedBuffer(std::move(OpBuffer))); } OplogEntry AppendOp(CbObject Op) { ZEN_TRACE_CPU("ProjectStore::OplogStorage::AppendOp"); SharedBuffer Buffer = Op.GetBuffer(); const uint64_t WriteSize = Buffer.GetSize(); const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF); ZEN_ASSERT(WriteSize != 0); XXH3_128Stream KeyHasher; Op["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); XXH3_128 KeyHash = KeyHasher.GetHash(); RwLock::ExclusiveLockScope _(m_RwLock); const uint64_t WriteOffset = m_NextOpsOffset; const uint32_t OpLsn = ++m_MaxLsn; m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign); ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign)); OplogEntry Entry = {.OpLsn = OpLsn, .OpCoreOffset = gsl::narrow_cast(WriteOffset / m_OpsAlign), .OpCoreSize = uint32_t(Buffer.GetSize()), .OpCoreHash = OpCoreHash, .OpKeyHash = KeyHash}; m_Oplog.Append(Entry); m_OpBlobs.Write(Buffer.GetData(), WriteSize, WriteOffset); return Entry; } void Flush() { m_Oplog.Flush(); m_OpBlobs.Flush(); } spdlog::logger& Log() { return m_OwnerOplog->Log(); } private: ProjectStore::Oplog* m_OwnerOplog; std::filesystem::path m_OplogStoragePath; RwLock m_RwLock; TCasLogFile m_Oplog; BasicFile m_OpBlobs; std::atomic m_NextOpsOffset{0}; uint64_t m_OpsAlign = 32; std::atomic m_MaxLsn{0}; #if USE_ROCKSDB std::unique_ptr m_RocksDb; std::vector m_RocksDbColumnHandles; #endif }; ////////////////////////////////////////////////////////////////////////// ProjectStore::Oplog::Oplog(std::string_view Id, Project* Project, CidStore& Store, std::filesystem::path BasePath) : m_OuterProject(Project) , m_CidStore(Store) , m_BasePath(BasePath) , m_OplogId(Id) { m_Storage = new OplogStorage(this, m_BasePath); const bool StoreExists = m_Storage->Exists(); m_Storage->Open(/* IsCreate */ !StoreExists); m_TempPath = m_BasePath / "temp"; zen::CleanDirectory(m_TempPath); } ProjectStore::Oplog::~Oplog() { Flush(); } void ProjectStore::Oplog::Flush() { m_Storage->Flush(); } void ProjectStore::Oplog::Scrub(ScrubContext& Ctx) const { ZEN_UNUSED(Ctx); } void ProjectStore::Oplog::GatherReferences(GcContext& GcCtx) { RwLock::SharedLockScope _(m_OplogLock); std::vector Hashes; for (const auto& Kv : m_ChunkMap) { Hashes.push_back(Kv.second); } GcCtx.ContributeCids(Hashes); Hashes.clear(); for (const auto& Kv : m_MetaMap) { Hashes.push_back(Kv.second); } GcCtx.ContributeCids(Hashes); } bool ProjectStore::Oplog::ExistsAt(std::filesystem::path BasePath) { return OplogStorage::Exists(BasePath); } void ProjectStore::Oplog::ReplayLog() { m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(Op, OpEntry, kUpdateReplay); }); } IoBuffer ProjectStore::Oplog::FindChunk(Oid ChunkId) { RwLock::SharedLockScope _(m_OplogLock); if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) { _.ReleaseNow(); IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkIt->second); Chunk.SetContentType(ZenContentType::kCompressedBinary); return Chunk; } if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) { _.ReleaseNow(); std::filesystem::path FilePath = m_OuterProject->RootDir / FileIt->second.ServerPath; IoBuffer FileChunk = IoBufferBuilder::MakeFromFile(FilePath); FileChunk.SetContentType(ZenContentType::kBinary); return FileChunk; } if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) { _.ReleaseNow(); IoBuffer Chunk = m_CidStore.FindChunkByCid(MetaIt->second); Chunk.SetContentType(ZenContentType::kCompressedBinary); return Chunk; } return {}; } void ProjectStore::Oplog::IterateFileMap( std::function&& Fn) { RwLock::SharedLockScope _(m_OplogLock); for (const auto& Kv : m_FileMap) { Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath); } } void ProjectStore::Oplog::IterateOplog(std::function&& Handler) { RwLock::SharedLockScope _(m_OplogLock); std::vector Entries; Entries.reserve(m_LatestOpMap.size()); for (const auto& Kv : m_LatestOpMap) { const auto AddressEntry = m_OpAddressMap.find(Kv.second); ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); Entries.push_back(AddressEntry->second); } std::sort(Entries.begin(), Entries.end(), [](const OplogEntryAddress& Lhs, const OplogEntryAddress& Rhs) { return Lhs.Offset < Rhs.Offset; }); m_Storage->ReplayLog(Entries, [&](CbObject Op) { Handler(Op); }); } std::optional ProjectStore::Oplog::GetOplog(const Oid& Key) { RwLock::SharedLockScope _(m_OplogLock); if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end()) { const auto AddressEntry = m_OpAddressMap.find(LatestOp->second); ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); return m_Storage->GetOp(AddressEntry->second); } return {}; } bool ProjectStore::Oplog::AddFileMapping(Oid FileId, IoHash Hash, std::string_view ServerPath, std::string_view ClientPath) { // NOTE: Caller must hold an exclusive lock on m_OplogLock if (ServerPath.empty() || ClientPath.empty()) { return false; } FileMapEntry Entry; Entry.ServerPath = ServerPath; Entry.ClientPath = ClientPath; m_FileMap[FileId] = std::move(Entry); if (Hash != IoHash::Zero) { m_ChunkMap.insert_or_assign(FileId, Hash); } return true; } void ProjectStore::Oplog::AddChunkMapping(Oid ChunkId, IoHash Hash) { // NOTE: Caller must hold an exclusive lock on m_OplogLock m_ChunkMap.insert_or_assign(ChunkId, Hash); } void ProjectStore::Oplog::AddMetaMapping(Oid ChunkId, IoHash Hash) { // NOTE: Caller must hold an exclusive lock on m_OplogLock m_MetaMap.insert_or_assign(ChunkId, Hash); } uint32_t ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry, UpdateType TypeOfUpdate) { ZEN_TRACE_CPU("ProjectStore::Oplog::RegisterOplogEntry"); ZEN_UNUSED(TypeOfUpdate); // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however RwLock::ExclusiveLockScope _(m_OplogLock); using namespace std::literals; // Update chunk id maps if (Core["package"sv]) { CbObjectView PkgObj = Core["package"sv].AsObjectView(); Oid PackageId = PkgObj["id"sv].AsObjectId(); IoHash PackageHash = PkgObj["data"sv].AsBinaryAttachment(); AddChunkMapping(PackageId, PackageHash); ZEN_DEBUG("package data {} -> {}", PackageId, PackageHash); } for (CbFieldView& Entry : Core["bulkdata"sv]) { CbObjectView BulkObj = Entry.AsObjectView(); Oid BulkDataId = BulkObj["id"sv].AsObjectId(); IoHash BulkDataHash = BulkObj["data"sv].AsBinaryAttachment(); AddChunkMapping(BulkDataId, BulkDataHash); ZEN_DEBUG("bulkdata {} -> {}", BulkDataId, BulkDataHash); } if (Core["files"sv]) { Stopwatch Timer; int32_t FileCount = 0; for (CbFieldView& Entry : Core["files"sv]) { CbObjectView FileObj = Entry.AsObjectView(); const Oid FileId = FileObj["id"sv].AsObjectId(); IoHash FileDataHash = FileObj["data"sv].AsBinaryAttachment(); std::string_view ServerPath = FileObj["serverpath"sv].AsString(); std::string_view ClientPath = FileObj["clientpath"sv].AsString(); if (AddFileMapping(FileId, FileDataHash, ServerPath, ClientPath)) { ++FileCount; } else { ZEN_WARN("invalid file"); } } ZEN_DEBUG("added {} file(s) in {}", FileCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } for (CbFieldView& Entry : Core["meta"sv]) { CbObjectView MetaObj = Entry.AsObjectView(); const Oid MetaId = MetaObj["id"sv].AsObjectId(); auto NameString = MetaObj["name"sv].AsString(); IoHash MetaDataHash = MetaObj["data"sv].AsBinaryAttachment(); AddMetaMapping(MetaId, MetaDataHash); ZEN_DEBUG("meta data ({}) {} -> {}", NameString, MetaId, MetaDataHash); } m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); m_LatestOpMap[OpEntry.OpKeyAsOId()] = OpEntry.OpLsn; return OpEntry.OpLsn; } uint32_t ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) { ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); using namespace std::literals; const CbObject& Core = OpPackage.GetObject(); const OplogEntry OpEntry = m_Storage->AppendOp(Core); // Persist attachments uint64_t AttachmentBytes = 0; uint64_t NewAttachmentBytes = 0; auto Attachments = OpPackage.GetAttachments(); for (const auto& Attach : Attachments) { ZEN_ASSERT(Attach.IsCompressedBinary()); CompressedBuffer AttachmentData = Attach.AsCompressedBinary(); const uint64_t AttachmentSize = AttachmentData.GetRawSize(); CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData); if (InsertResult.New) { NewAttachmentBytes += AttachmentSize; } AttachmentBytes += AttachmentSize; } const uint32_t EntryId = RegisterOplogEntry(Core, OpEntry, kUpdateNewEntry); ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, zen::NiceBytes(NewAttachmentBytes), zen::NiceBytes(AttachmentBytes)); return EntryId; } ////////////////////////////////////////////////////////////////////////// ProjectStore::Project::Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath) : m_ProjectStore(PrjStore) , m_CidStore(Store) , m_OplogStoragePath(BasePath) { } ProjectStore::Project::~Project() { } bool ProjectStore::Project::Exists(std::filesystem::path BasePath) { return std::filesystem::exists(BasePath / "Project.zcb"); } void ProjectStore::Project::Read() { std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"; ZEN_INFO("reading config for project '{}' from {}", Identifier, ProjectStateFilePath); BasicFile Blob; Blob.Open(ProjectStateFilePath, BasicFile::Mode::kRead); IoBuffer Obj = Blob.ReadAll(); CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); if (ValidationError == CbValidateError::None) { CbObject Cfg = LoadCompactBinaryObject(Obj); Identifier = Cfg["id"].AsString(); RootDir = Cfg["root"].AsString(); ProjectRootDir = Cfg["project"].AsString(); EngineRootDir = Cfg["engine"].AsString(); } else { ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), ProjectStateFilePath); } } void ProjectStore::Project::Write() { BinaryWriter Mem; CbObjectWriter Cfg; Cfg << "id" << Identifier; Cfg << "root" << PathToUtf8(RootDir); Cfg << "project" << ProjectRootDir; Cfg << "engine" << EngineRootDir; Cfg.Save(Mem); CreateDirectories(m_OplogStoragePath); std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"; ZEN_INFO("persisting config for project '{}' to {}", Identifier, ProjectStateFilePath); BasicFile Blob; Blob.Open(ProjectStateFilePath, BasicFile::Mode::kTruncate); Blob.Write(Mem.Data(), Mem.Size(), 0); Blob.Flush(); } spdlog::logger& ProjectStore::Project::Log() { return m_ProjectStore->Log(); } std::filesystem::path ProjectStore::Project::BasePathForOplog(std::string_view OplogId) { return m_OplogStoragePath / OplogId; } ProjectStore::Oplog* ProjectStore::Project::NewOplog(std::string_view OplogId) { RwLock::ExclusiveLockScope _(m_ProjectLock); std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); try { Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CidStore, OplogBasePath).first->second; return &Log; } catch (std::exception&) { // In case of failure we need to ensure there's no half constructed entry around // // (This is probably already ensured by the try_emplace implementation?) m_Oplogs.erase(std::string{OplogId}); return nullptr; } } ProjectStore::Oplog* ProjectStore::Project::OpenOplog(std::string_view OplogId) { { RwLock::SharedLockScope _(m_ProjectLock); auto OplogIt = m_Oplogs.find(std::string(OplogId)); if (OplogIt != m_Oplogs.end()) { return &OplogIt->second; } } RwLock::ExclusiveLockScope _(m_ProjectLock); std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); if (Oplog::ExistsAt(OplogBasePath)) { // Do open of existing oplog try { Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CidStore, OplogBasePath).first->second; Log.ReplayLog(); return &Log; } catch (std::exception& ex) { ZEN_ERROR("failed to open oplog '{}' @ '{}': {}", OplogId, OplogBasePath, ex.what()); m_Oplogs.erase(std::string{OplogId}); } } return nullptr; } void ProjectStore::Project::DeleteOplog(std::string_view OplogId) { { RwLock::ExclusiveLockScope _(m_ProjectLock); auto OplogIt = m_Oplogs.find(std::string(OplogId)); if (OplogIt != m_Oplogs.end()) { m_Oplogs.erase(OplogIt); } } // Actually erase std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); OplogStorage::Delete(OplogBasePath); } void ProjectStore::Project::DiscoverOplogs() { DirectoryContent DirContent; GetDirectoryContent(m_OplogStoragePath, DirectoryContent::IncludeDirsFlag, DirContent); for (const std::filesystem::path& DirPath : DirContent.Directories) { OpenOplog(PathToUtf8(DirPath.stem())); } } void ProjectStore::Project::IterateOplogs(std::function&& Fn) const { // TODO: should also iterate over oplogs which are present on disk but not yet loaded RwLock::SharedLockScope _(m_ProjectLock); for (auto& Kv : m_Oplogs) { Fn(Kv.second); } } void ProjectStore::Project::IterateOplogs(std::function&& Fn) { // TODO: should also iterate over oplogs which are present on disk but not yet loaded RwLock::SharedLockScope _(m_ProjectLock); for (auto& Kv : m_Oplogs) { Fn(Kv.second); } } void ProjectStore::Project::Flush() { // TODO } void ProjectStore::Project::Scrub(ScrubContext& Ctx) { IterateOplogs([&](const Oplog& Ops) { Ops.Scrub(Ctx); }); } void ProjectStore::Project::GatherReferences(GcContext& GcCtx) { IterateOplogs([&](Oplog& Ops) { Ops.GatherReferences(GcCtx); }); } ////////////////////////////////////////////////////////////////////////// ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, CasGc& Gc) : GcContributor(Gc) , m_Log(zen::logging::Get("project")) , m_CidStore(Store) , m_ProjectBasePath(BasePath) { ZEN_INFO("initializing project store at '{}'", BasePath); // m_Log.set_level(spdlog::level::debug); } ProjectStore::~ProjectStore() { ZEN_INFO("closing project store ('{}')", m_ProjectBasePath); } std::filesystem::path ProjectStore::BasePathForProject(std::string_view ProjectId) { return m_ProjectBasePath / ProjectId; } void ProjectStore::DiscoverProjects() { if (!std::filesystem::exists(m_ProjectBasePath)) { return; } DirectoryContent DirContent; GetDirectoryContent(m_ProjectBasePath, DirectoryContent::IncludeDirsFlag, DirContent); for (const std::filesystem::path& DirPath : DirContent.Directories) { std::string DirName = PathToUtf8(DirPath.stem()); Project* Project = OpenProject(DirName); if (Project) { Project->DiscoverOplogs(); } } } void ProjectStore::IterateProjects(std::function&& Fn) { RwLock::SharedLockScope _(m_ProjectsLock); for (auto& Kv : m_Projects) { Fn(Kv.second); } } void ProjectStore::Flush() { // TODO RwLock::SharedLockScope _(m_ProjectsLock); for (auto& Kv : m_Projects) { Kv.second.Flush(); } } void ProjectStore::Scrub(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_ProjectsLock); for (auto& Kv : m_Projects) { Kv.second.Scrub(Ctx); } } void ProjectStore::GatherReferences(GcContext& GcCtx) { Stopwatch Timer; const auto Guard = MakeGuard([&] { ZEN_INFO("project store gathered all references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); DiscoverProjects(); RwLock::SharedLockScope _(m_ProjectsLock); for (auto& Kv : m_Projects) { Kv.second.GatherReferences(GcCtx); } } ProjectStore::Project* ProjectStore::OpenProject(std::string_view ProjectId) { { RwLock::SharedLockScope _(m_ProjectsLock); auto ProjIt = m_Projects.find(std::string{ProjectId}); if (ProjIt != m_Projects.end()) { return &(ProjIt->second); } } RwLock::ExclusiveLockScope _(m_ProjectsLock); std::filesystem::path ProjectBasePath = BasePathForProject(ProjectId); if (Project::Exists(ProjectBasePath)) { try { ZEN_INFO("opening project {} @ {}", ProjectId, ProjectBasePath); ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CidStore, ProjectBasePath).first->second; Prj.Identifier = ProjectId; Prj.Read(); return &Prj; } catch (std::exception& e) { ZEN_WARN("failed to open {} @ {} ({})", ProjectId, ProjectBasePath, e.what()); m_Projects.erase(std::string{ProjectId}); } } return nullptr; } ProjectStore::Project* ProjectStore::NewProject(std::filesystem::path BasePath, std::string_view ProjectId, std::string_view RootDir, std::string_view EngineRootDir, std::string_view ProjectRootDir) { RwLock::ExclusiveLockScope _(m_ProjectsLock); ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CidStore, BasePath).first->second; Prj.Identifier = ProjectId; Prj.RootDir = RootDir; Prj.EngineRootDir = EngineRootDir; Prj.ProjectRootDir = ProjectRootDir; Prj.Write(); return &Prj; } void ProjectStore::DeleteProject(std::string_view ProjectId) { std::filesystem::path ProjectBasePath = BasePathForProject(ProjectId); ZEN_INFO("deleting project {} @ {}", ProjectId, ProjectBasePath); m_Projects.erase(std::string{ProjectId}); DeleteDirectories(ProjectBasePath); } bool ProjectStore::Exists(std::string_view ProjectId) { return Project::Exists(BasePathForProject(ProjectId)); } ProjectStore::Oplog* ProjectStore::OpenProjectOplog(std::string_view ProjectId, std::string_view OplogId) { if (Project* ProjectIt = OpenProject(ProjectId)) { return ProjectIt->OpenOplog(OplogId); } return nullptr; } ////////////////////////////////////////////////////////////////////////// HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) : m_Log(logging::Get("project")) , m_CidStore(Store) , m_ProjectStore(Projects) { using namespace std::literals; m_Router.AddPattern("project", "([[:alnum:]_.]+)"); m_Router.AddPattern("log", "([[:alnum:]_.]+)"); m_Router.AddPattern("op", "([[:digit:]]+?)"); m_Router.AddPattern("chunk", "([[:xdigit:]]{24})"); m_Router.AddPattern("hash", "([[:xdigit:]]{40})"); m_Router.RegisterRoute( "list", [this](HttpRouterRequest& Req) { m_ProjectStore->DiscoverProjects(); CbWriter Response; Response.BeginArray(); m_ProjectStore->IterateProjects([&Response](ProjectStore::Project& Prj) { Response.BeginObject(); Response << "Id"sv << Prj.Identifier; Response << "RootDir"sv << Prj.RootDir.string(); Response << "ProjectRootDir"sv << Prj.ProjectRootDir; Response << "EngineRootDir"sv << Prj.EngineRootDir; Response.EndObject(); }); Response.EndArray(); Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save().AsArray()); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}/batch", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); if (FoundLog == nullptr) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } // Parse Request IoBuffer Payload = HttpReq.ReadPayload(); BinaryReader Reader(Payload); struct RequestHeader { enum { kMagic = 0xAAAA'77AC }; uint32_t Magic; uint32_t ChunkCount; uint32_t Reserved1; uint32_t Reserved2; }; struct RequestChunkEntry { Oid ChunkId; uint32_t CorrelationId; uint64_t Offset; uint64_t RequestBytes; }; if (Payload.Size() <= sizeof(RequestHeader)) { HttpReq.WriteResponse(HttpResponseCode::BadRequest); } RequestHeader RequestHdr; Reader.Read(&RequestHdr, sizeof RequestHdr); if (RequestHdr.Magic != RequestHeader::kMagic) { HttpReq.WriteResponse(HttpResponseCode::BadRequest); } std::vector RequestedChunks; RequestedChunks.resize(RequestHdr.ChunkCount); Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount); // Make Response struct ResponseHeader { uint32_t Magic = 0xbada'b00f; uint32_t ChunkCount; uint32_t Reserved1 = 0; uint32_t Reserved2 = 0; }; struct ResponseChunkEntry { uint32_t CorrelationId; uint32_t Flags = 0; uint64_t ChunkSize; }; std::vector OutBlobs; OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry)); for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) { const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; IoBuffer FoundChunk = FoundLog->FindChunk(RequestedChunk.ChunkId); if (FoundChunk) { if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1)) { uint64_t Offset = RequestedChunk.Offset; if (Offset > FoundChunk.Size()) { Offset = FoundChunk.Size(); } uint64_t Size = RequestedChunk.RequestBytes; if ((Offset + Size) > FoundChunk.Size()) { Size = FoundChunk.Size() - Offset; } FoundChunk = IoBuffer(FoundChunk, Offset, Size); } } OutBlobs.emplace_back(std::move(FoundChunk)); } uint8_t* ResponsePtr = reinterpret_cast(OutBlobs[0].MutableData()); ResponseHeader ResponseHdr; ResponseHdr.ChunkCount = RequestHdr.ChunkCount; memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr)); ResponsePtr += sizeof(ResponseHdr); for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) { // const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]); ResponseChunkEntry ResponseChunk; ResponseChunk.CorrelationId = ChunkIndex; if (FoundChunk) { ResponseChunk.ChunkSize = FoundChunk.Size(); } else { ResponseChunk.ChunkSize = uint64_t(-1); } memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk)); ResponsePtr += sizeof(ResponseChunk); } return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs); }, HttpVerb::kPost); m_Router.RegisterRoute( "{project}/oplog/{log}/files", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); // File manifest fetch, returns the client file list const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); if (FoundLog == nullptr) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Oplog& Log = *FoundLog; HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); const bool FilterClient = Params.GetValue("filter") == "client"; CbObjectWriter Response; Response.BeginArray("files"); Log.IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) { Response.BeginObject(); Response << "id" << Id; Response << "clientpath" << ClientPath; if (!FilterClient) { Response << "serverpath" << ServerPath; } Response.EndObject(); }); Response.EndArray(); return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}/{chunk}/info", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); const auto& ChunkId = Req.GetCapture(3); ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); if (FoundLog == nullptr) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Oplog& Log = *FoundLog; Oid Obj = Oid::FromHexString(ChunkId); IoBuffer Chunk = Log.FindChunk(Obj); if (!Chunk) { m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); return HttpReq.WriteResponse(HttpResponseCode::NotFound); } uint64_t ChunkSize = Chunk.GetSize(); if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); ZEN_ASSERT(!Compressed.IsNull()); ChunkSize = Compressed.GetRawSize(); } CbObjectWriter Response; Response << "size" << ChunkSize; HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}/{chunk}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); const auto& ChunkId = Req.GetCapture(3); bool IsOffset = false; uint64_t Offset = 0; uint64_t Size = ~(0ull); auto QueryParms = Req.ServerRequest().GetQueryParams(); if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false) { if (auto OffsetVal = ParseInt(OffsetParm)) { Offset = OffsetVal.value(); IsOffset = true; } else { return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } } if (auto SizeParm = QueryParms.GetValue("size"); SizeParm.empty() == false) { if (auto SizeVal = ParseInt(SizeParm)) { Size = SizeVal.value(); IsOffset = true; } else { return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } } HttpContentType AcceptType = HttpReq.AcceptContentType(); if (AcceptType == HttpContentType::kUnknownContentType) { AcceptType = HttpContentType::kBinary; } ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); if (FoundLog == nullptr) { m_Log.warn("chunk - '{}/{}/{}' FAILED, missing oplog", ProjectId, OplogId, ChunkId); return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Oplog& Log = *FoundLog; Oid Obj = Oid::FromHexString(ChunkId); IoBuffer Chunk = Log.FindChunk(Obj); if (!Chunk) { m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); return HttpReq.WriteResponse(HttpResponseCode::NotFound); } IoBuffer Value = Chunk; HttpContentType ContentType = Chunk.GetContentType(); if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); ZEN_ASSERT(!Compressed.IsNull()); if (IsOffset) { if ((Offset + Size) > Compressed.GetRawSize()) { Size = Compressed.GetRawSize() - Offset; } if (AcceptType == HttpContentType::kBinary) { Value = Compressed.Decompress(Offset, Size).AsIoBuffer(); ContentType = HttpContentType::kBinary; } else { Value = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer(); ContentType = HttpContentType::kCompressedBinary; } } else { if (AcceptType == HttpContentType::kBinary) { Value = Compressed.Decompress().AsIoBuffer(); ContentType = HttpContentType::kBinary; } else { Value = Compressed.GetCompressed().Flatten().AsIoBuffer(); ContentType = HttpContentType::kCompressedBinary; } } } else if (IsOffset) { if ((Offset + Size) > Chunk.GetSize()) { Size = Chunk.GetSize() - Offset; } Value = IoBuffer(Chunk, Offset, Size); } m_Log.debug("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(ContentType)); return HttpReq.WriteResponse(HttpResponseCode::OK, ContentType, Value); }, HttpVerb::kGet | HttpVerb::kHead); m_Router.RegisterRoute( "{project}/oplog/{log}/{hash}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& HashString = Req.GetCapture(3); IoHash Hash = IoHash::FromHexString(HashString); HttpContentType AcceptType = HttpReq.AcceptContentType(); if (AcceptType == HttpContentType::kUnknownContentType) { AcceptType = HttpContentType::kBinary; } HttpContentType ContentType = HttpContentType::kCompressedBinary; IoBuffer Value = m_CidStore.FindChunkByCid(Hash); if (!Value) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } if (AcceptType == HttpContentType::kBinary) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Value)); Value = Compressed.Decompress().AsIoBuffer(); ContentType = HttpContentType::kBinary; } return HttpReq.WriteResponse(HttpResponseCode::OK, ContentType, Value); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}/prep", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); if (FoundLog == nullptr) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } // This operation takes a list of referenced hashes and decides which // chunks are not present on this server. This list is then returned in // the "need" list in the response IoBuffer Payload = HttpReq.ReadPayload(); CbObject RequestObject = LoadCompactBinaryObject(Payload); std::vector NeedList; for (auto Entry : RequestObject["have"sv]) { const IoHash FileHash = Entry.AsHash(); if (!m_CidStore.FindChunkByCid(FileHash)) { ZEN_DEBUG("prep - NEED: {}", FileHash); NeedList.push_back(FileHash); } } CbObjectWriter Cbo; Cbo.BeginArray("need"); for (const IoHash& Hash : NeedList) { Cbo << Hash; } Cbo.EndArray(); CbObject Response = Cbo.Save(); return HttpReq.WriteResponse(HttpResponseCode::OK, Response); }, HttpVerb::kPost); m_Router.RegisterRoute( "{project}/oplog/{log}/new", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); bool IsUsingSalt = false; IoHash SaltHash = IoHash::Zero; if (std::string_view SaltParam = Params.GetValue("salt"); SaltParam.empty() == false) { const uint32_t Salt = std::stoi(std::string(SaltParam)); SaltHash = IoHash::HashBuffer(&Salt, sizeof Salt); IsUsingSalt = true; } ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); if (FoundLog == nullptr) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Oplog& Oplog = *FoundLog; IoBuffer Payload = HttpReq.ReadPayload(); // This will attempt to open files which may not exist for the case where // the prep step rejected the chunk. This should be fixed since there's // a performance cost associated with any file system activity bool IsValid = true; std::vector MissingChunks; CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer { IoHash AttachmentId; if (IsUsingSalt) { IoHash AttachmentSpec[]{SaltHash, Hash}; AttachmentId = IoHash::HashBuffer(MakeMemoryView(AttachmentSpec)); } else { AttachmentId = Hash; } std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString(); if (IoBuffer CompressedData = m_CidStore.FindChunkByCid(Hash)) { return SharedBuffer(std::move(CompressedData)); } else if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath)) { return SharedBuffer(std::move(Data)); } else { IsValid = false; MissingChunks.push_back(Hash); return {}; } }; CbPackage Package; if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver)) { std::filesystem::path BadPackagePath = Oplog.TempPath() / "bad_packages" / fmt::format("session{}_request{}", HttpReq.SessionId(), HttpReq.RequestId()); ZEN_ERROR("Received malformed package! Saving payload to '{}'", BadPackagePath); zen::WriteFile(BadPackagePath, Payload); return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"); } if (!IsValid) { // TODO: emit diagnostics identifying missing chunks return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing chunk reference"); } CbObject Core = Package.GetObject(); if (!Core["key"sv]) { return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified"); } // Write core to oplog const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Package); if (OpLsn == ProjectStore::Oplog::kInvalidOp) { return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } ZEN_INFO("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Payload.Size()), Core["key"sv].AsString()); HttpReq.WriteResponse(HttpResponseCode::Created); }, HttpVerb::kPost); m_Router.RegisterRoute( "{project}/oplog/{log}/{op}", [](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); // TODO: look up op and respond with the payload! HttpReq.WriteResponse(HttpResponseCode::Accepted, HttpContentType::kText, u8"yeee"sv); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}", [this](HttpRouterRequest& Req) { const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); ProjectStore::Project* ProjectIt = m_ProjectStore->OpenProject(ProjectId); if (!ProjectIt) { return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("project {} not found", ProjectId)); } ProjectStore::Project& Prj = *ProjectIt; switch (Req.ServerRequest().RequestVerb()) { case HttpVerb::kGet: { ProjectStore::Oplog* OplogIt = Prj.OpenOplog(OplogId); if (!OplogIt) { return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("oplog {} not found in project {}", OplogId, ProjectId)); } ProjectStore::Oplog& Log = *OplogIt; CbObjectWriter Cb; Cb << "id"sv << Log.OplogId() << "project"sv << Prj.Identifier << "tempdir"sv << Log.TempPath().c_str(); Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cb.Save()); } break; case HttpVerb::kPost: { ProjectStore::Oplog* OplogIt = Prj.OpenOplog(OplogId); if (!OplogIt) { if (!Prj.NewOplog(OplogId)) { // TODO: indicate why the operation failed! return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError); } ZEN_INFO("established oplog '{}/{}'", ProjectId, OplogId); return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); } // I guess this should ultimately be used to execute RPCs but for now, it // does absolutely nothing return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); } break; case HttpVerb::kDelete: { ZEN_INFO("deleting oplog '{}/{}'", ProjectId, OplogId); ProjectIt->DeleteOplog(OplogId); return Req.ServerRequest().WriteResponse(HttpResponseCode::OK); } break; default: break; } }, HttpVerb::kPost | HttpVerb::kGet | HttpVerb::kDelete); m_Router.RegisterRoute( "{project}/oplog/{log}/entries", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); if (FoundLog == nullptr) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } CbObjectWriter Response; if (FoundLog->OplogCount() > 0) { HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); if (auto OpKey = Params.GetValue("opkey"); !OpKey.empty()) { Oid OpKeyId = OpKeyStringAsOId(OpKey); std::optional Op = FoundLog->GetOplog(OpKeyId); if (Op.has_value()) { Response << "entry" << Op.value(); } else { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } } else { Response.BeginArray("entries"sv); FoundLog->IterateOplog([&Response](CbObject Op) { Response << Op; }); Response.EndArray(); } } return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}", [this](HttpRouterRequest& Req) { const std::string ProjectId = Req.GetCapture(1); switch (Req.ServerRequest().RequestVerb()) { case HttpVerb::kPost: { IoBuffer Payload = Req.ServerRequest().ReadPayload(); CbObject Params = LoadCompactBinaryObject(Payload); std::string_view Id = Params["id"sv].AsString(); std::string_view Root = Params["root"sv].AsString(); std::string_view EngineRoot = Params["engine"sv].AsString(); std::string_view ProjectRoot = Params["project"sv].AsString(); const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId; m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot); ZEN_INFO("established project - {} (id: '{}', roots: '{}', '{}', '{}')", ProjectId, Id, Root, EngineRoot, ProjectRoot); Req.ServerRequest().WriteResponse(HttpResponseCode::Created); } break; case HttpVerb::kGet: { ProjectStore::Project* ProjectIt = m_ProjectStore->OpenProject(ProjectId); if (!ProjectIt) { return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("project {} not found", ProjectId)); } const ProjectStore::Project& Prj = *ProjectIt; CbObjectWriter Response; Response << "id" << Prj.Identifier << "root" << PathToUtf8(Prj.RootDir); Response.BeginArray("oplogs"sv); Prj.IterateOplogs([&](const ProjectStore::Oplog& I) { Response.BeginObject(); Response << "id"sv << I.OplogId(); Response.EndObject(); }); Response.EndArray(); // oplogs Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save()); } break; case HttpVerb::kDelete: { ProjectStore::Project* ProjectIt = m_ProjectStore->OpenProject(ProjectId); if (!ProjectIt) { return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("project {} not found", ProjectId)); } m_ProjectStore->DeleteProject(ProjectId); return Req.ServerRequest().WriteResponse(HttpResponseCode::NoContent); } break; default: break; } }, HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kDelete); } HttpProjectService::~HttpProjectService() { } const char* HttpProjectService::BaseUri() const { return "/prj/"; } void HttpProjectService::HandleRequest(HttpServerRequest& Request) { if (m_Router.HandleRequest(Request) == false) { ZEN_WARN("No route found for {0}", Request.RelativeUri()); } } #if ZEN_USE_NAMED_PIPES ////////////////////////////////////////////////////////////////////////// # if ZEN_PLATFORM_WINDOWS class SecurityAttributes { public: inline SECURITY_ATTRIBUTES* Attributes() { return &m_Attributes; } protected: SECURITY_ATTRIBUTES m_Attributes{}; SECURITY_DESCRIPTOR m_Sd{}; }; // Security attributes which allows any user access class AnyUserSecurityAttributes : public SecurityAttributes { public: AnyUserSecurityAttributes() { m_Attributes.nLength = sizeof m_Attributes; m_Attributes.bInheritHandle = false; // Disable inheritance const BOOL success = InitializeSecurityDescriptor(&m_Sd, SECURITY_DESCRIPTOR_REVISION); if (success) { const BOOL bSetOk = SetSecurityDescriptorDacl(&m_Sd, TRUE, (PACL)NULL, FALSE); if (bSetOk) { m_Attributes.lpSecurityDescriptor = &m_Sd; } } } }; # else struct AnyUserSecurityAttributes { int Attributes() { return 0666; } }; # endif // ZEN_PLATFORM_WINDOWS ////////////////////////////////////////////////////////////////////////// struct LocalProjectService::LocalProjectImpl { LocalProjectImpl() : m_WorkerThreadPool(ServiceThreadCount) {} ~LocalProjectImpl() { Stop(); } void Start() { ZEN_ASSERT(!m_IsStarted); for (int i = 0; i < 32; ++i) { PipeConnection* NewPipe = new PipeConnection(this); m_ServicePipes.push_back(NewPipe); m_IoContext.post([NewPipe] { NewPipe->Accept(); }); } for (int i = 0; i < ServiceThreadCount; ++i) { asio::post(m_WorkerThreadPool, [this] { try { m_IoContext.run(); } catch (std::exception& ex) { ZEN_ERROR("exception caught in pipe project service loop: {}", ex.what()); } m_ShutdownLatch.count_down(); }); } m_IsStarted = true; } void Stop() { if (!m_IsStarted) { return; } for (PipeConnection* Pipe : m_ServicePipes) { Pipe->Disconnect(); } m_IoContext.stop(); m_ShutdownLatch.wait(); for (PipeConnection* Pipe : m_ServicePipes) { delete Pipe; } m_ServicePipes.clear(); } private: asio::io_context& IoContext() { return m_IoContext; } auto PipeSecurityAttributes() { return m_AnyUserSecurityAttributes.Attributes(); } static const int ServiceThreadCount = 4; std::latch m_ShutdownLatch{ServiceThreadCount}; asio::thread_pool m_WorkerThreadPool; asio::io_context m_IoContext; # if ZEN_PLATFORM_WINDOWS class PipeConnection { enum PipeState { kUninitialized, kConnecting, kReading, kWriting, kDisconnected, kInvalid }; LocalProjectImpl* m_Outer; asio::windows::stream_handle m_PipeHandle; std::atomic m_PipeState{kUninitialized}; public: PipeConnection(LocalProjectImpl* Outer) : m_Outer(Outer), m_PipeHandle{m_Outer->IoContext()} {} ~PipeConnection() {} void Disconnect() { m_PipeState = kDisconnected; DisconnectNamedPipe(m_PipeHandle.native_handle()); } void Accept() { StringBuilder<64> PipeName; PipeName << "\\\\.\\pipe\\zenprj"; // TODO: this should use an instance-specific identifier! HANDLE hPipe = CreateNamedPipeA(PipeName.c_str(), PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, // Max instance count 65536, // Output buffer size 65536, // Input buffer size 10'000, // Default timeout (ms) m_Outer->PipeSecurityAttributes() // Security attributes ); if (hPipe == INVALID_HANDLE_VALUE) { ZEN_WARN("failed while creating named pipe {}", PipeName.c_str()); // TODO: error - how to best handle? } m_PipeHandle.assign(hPipe); // This now owns the handle and will close it m_PipeState = kConnecting; asio::windows::overlapped_ptr OverlappedPtr( m_PipeHandle.get_executor(), std::bind(&PipeConnection::OnClientConnect, this, std::placeholders::_1, std::placeholders::_2)); OVERLAPPED* Overlapped = OverlappedPtr.get(); BOOL Ok = ConnectNamedPipe(hPipe, Overlapped); DWORD LastError = GetLastError(); if (!Ok && LastError != ERROR_IO_PENDING) { m_PipeState = kInvalid; // The operation completed immediately, so a completion notification needs // to be posted. When complete() is called, ownership of the OVERLAPPED- // derived object passes to the io_service. std::error_code Ec(LastError, asio::error::get_system_category()); OverlappedPtr.complete(Ec, 0); } else { // The operation was successfully initiated, so ownership of the // OVERLAPPED-derived object has now passed to the io_service. OverlappedPtr.release(); } } private: void OnClientConnect(const std::error_code& Ec, size_t BytesTransferred) { ZEN_UNUSED(BytesTransferred); if (Ec) { if (m_PipeState == kDisconnected) { return; } ZEN_WARN("pipe connection error: {}", Ec.message()); // TODO: should disconnect and issue a new connect return; } ZEN_DEBUG("pipe connection established"); IssueRead(); } void IssueRead() { m_PipeState = kReading; m_PipeHandle.async_read_some(asio::mutable_buffer(m_MsgBuffer, sizeof m_MsgBuffer), std::bind(&PipeConnection::OnClientRead, this, std::placeholders::_1, std::placeholders::_2)); } void OnClientRead(const std::error_code& Ec, size_t Bytes) { if (Ec) { if (m_PipeState == kDisconnected) { return; } ZEN_WARN("pipe read error: {}", Ec.message()); // TODO: should disconnect and issue a new connect return; } ZEN_DEBUG("received message: {} bytes", Bytes); // TODO: Actually process request m_PipeState = kWriting; asio::async_write(m_PipeHandle, asio::buffer(m_MsgBuffer, Bytes), std::bind(&PipeConnection::OnWriteCompletion, this, std::placeholders::_1, std::placeholders::_2)); } void OnWriteCompletion(const std::error_code& Ec, size_t Bytes) { ZEN_UNUSED(Bytes); if (Ec) { if (m_PipeState == kDisconnected) { return; } ZEN_WARN("pipe write error: {}", Ec.message()); // TODO: should disconnect and issue a new connect return; } // Go back to reading IssueRead(); } uint8_t m_MsgBuffer[16384]; }; # else class PipeConnection { public: PipeConnection(LocalProjectImpl*) {} void Accept() {} void Disconnect() {} }; # endif AnyUserSecurityAttributes m_AnyUserSecurityAttributes; std::vector m_ServicePipes; bool m_IsStarted = false; }; LocalProjectService::LocalProjectService(CasStore& Store, ProjectStore* Projects) : m_CasStore(Store), m_ProjectStore(Projects) { m_Impl = std::make_unique(); m_Impl->Start(); } LocalProjectService::~LocalProjectService() { m_Impl->Stop(); } #endif ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS TEST_CASE("prj.store") { using namespace std::literals; ScopedTemporaryDirectory TempDir; } #endif void prj_forcelink() { } } // namespace zen