diff options
| author | Stefan Boberg <[email protected]> | 2021-08-18 13:02:37 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-08-18 13:02:37 +0200 |
| commit | b2c4d971d5e6774178c579c326ee763db7e540ce (patch) | |
| tree | 0a0e1051f18c48abbd9ba88db4ea9e3ca950a0b9 | |
| parent | added 'zen drop' command to drop cache buckets online (diff) | |
| download | zen-b2c4d971d5e6774178c579c326ee763db7e540ce.tar.xz zen-b2c4d971d5e6774178c579c326ee763db7e540ce.zip | |
Removed accidental check-in
| -rw-r--r-- | zenserver/projectstore.cpp-bdcc9566 | 1842 |
1 files changed, 0 insertions, 1842 deletions
diff --git a/zenserver/projectstore.cpp-bdcc9566 b/zenserver/projectstore.cpp-bdcc9566 deleted file mode 100644 index 0700fc35d..000000000 --- a/zenserver/projectstore.cpp-bdcc9566 +++ /dev/null @@ -1,1842 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "projectstore.h" - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/stream.h> -#include <zencore/string.h> -#include <zencore/timer.h> -#include <zencore/windows.h> -#include <zenstore/basicfile.h> -#include <zenstore/cas.h> -#include <zenstore/caslog.h> - -#define USE_ROCKSDB 0 - -#if USE_ROCKSDB -# pragma comment(lib, "Rpcrt4.lib") // RocksDB made me do this -# include <rocksdb/db.h> -#endif - -#include <ppl.h> -#include <spdlog/spdlog.h> -#include <xxh3.h> -#include <asio.hpp> -#include <future> -#include <latch> -#include <string> - -namespace zen { - -using namespace fmt::literals; - -#if USE_ROCKSDB -namespace rocksdb = ROCKSDB_NAMESPACE; -#endif - -////////////////////////////////////////////////////////////////////////// - -struct ProjectStore::OplogStorage : public RefCounted -{ - OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath) - { - } - - ~OplogStorage() - { - Log().info("closing oplog storage at {}", m_OplogStoragePath); - Flush(); - -#if USE_ROCKSDB - if (m_RocksDb) - { - // Column families must be torn down before database is closed - for (const auto& Handle : m_RocksDbColumnHandles) - { - m_RocksDb->DestroyColumnFamilyHandle(Handle); - } - - rocksdb::Status Status = m_RocksDb->Close(); - - if (!Status.ok()) - { - Log().warn("db close error reported for '{}' : '{}'", m_OplogStoragePath, Status.getState()); - } - } -#endif - } - - [[nodiscard]] bool Exists() { return Exists(m_OplogStoragePath); } - [[nodiscard]] static bool Exists(std::filesystem::path BasePath) - { - return std::filesystem::exists(BasePath / "ops.zlog") && std::filesystem::exists(BasePath / "ops.zops"); - } - - static bool Delete(std::filesystem::path BasePath) { return DeleteDirectories(BasePath); } - - void Open(bool IsCreate) - { - Log().info("initializing oplog storage at '{}'", m_OplogStoragePath); - - if (IsCreate) - { - DeleteDirectories(m_OplogStoragePath); - CreateDirectories(m_OplogStoragePath); - } - - m_Oplog.Open(m_OplogStoragePath / "ops.zlog", IsCreate); - m_Oplog.Initialize(); - - m_OpBlobs.Open(m_OplogStoragePath / "ops.zops", IsCreate); - - ZEN_ASSERT(IsPow2(m_OpsAlign)); - ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1))); - -#if USE_ROCKSDB - { - std::string RocksdbPath = WideToUtf8((m_OplogStoragePath / "ops.rdb").native().c_str()); - - Log().debug("opening rocksdb db at '{}'", RocksdbPath); - - rocksdb::DB* Db; - rocksdb::DBOptions Options; - Options.create_if_missing = true; - - std::vector<std::string> ExistingColumnFamilies; - rocksdb::Status Status = rocksdb::DB::ListColumnFamilies(Options, RocksdbPath, &ExistingColumnFamilies); - - std::vector<rocksdb::ColumnFamilyDescriptor> ColumnDescriptors; - - if (Status.IsPathNotFound()) - { - ColumnDescriptors.emplace_back(rocksdb::ColumnFamilyDescriptor{rocksdb::kDefaultColumnFamilyName, {}}); - } - else if (Status.ok()) - { - for (const std::string& Column : ExistingColumnFamilies) - { - rocksdb::ColumnFamilyDescriptor ColumnFamily; - ColumnFamily.name = Column; - ColumnDescriptors.push_back(ColumnFamily); - } - } - else - { - throw std::exception("column family iteration failed for '{}': '{}'"_format(RocksdbPath, Status.getState()).c_str()); - } - - Status = rocksdb::DB::Open(Options, RocksdbPath, ColumnDescriptors, &m_RocksDbColumnHandles, &Db); - - if (!Status.ok()) - { - throw std::exception("database open failed for '{}': '{}'"_format(RocksdbPath, Status.getState()).c_str()); - } - - m_RocksDb.reset(Db); - } -#endif - } - - void ReplayLog(std::function<void(CbObject, const OplogEntry&)>&& Handler) - { - // This could use memory mapping or do something clever but for now it just reads the file sequentially - - spdlog::info("replaying log for '{}'", m_OplogStoragePath); - - Stopwatch Timer; - - m_Oplog.Replay([&](const zen::OplogEntry& LogEntry) { - IoBuffer OpBuffer(LogEntry.OpCoreSize); - - const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; - - m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); - - // Verify checksum, ignore op data if incorrect - const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), OpBuffer.Size()) & 0xffffFFFF); - - if (OpCoreHash != LogEntry.OpCoreHash) - { - Log().warn("skipping oplog entry with bad checksum!"); - return; - } - - CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), OpBuffer.Size())); - - m_NextOpsOffset = - Max(m_NextOpsOffset.load(std::memory_order::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); - m_MaxLsn = Max(m_MaxLsn.load(std::memory_order::memory_order_relaxed), LogEntry.OpLsn); - - Handler(Op, LogEntry); - }); - - spdlog::info("Oplog replay completed in {} - Max LSN# {}, Next offset: {}", - NiceTimeSpanMs(Timer.getElapsedTimeMs()), - m_MaxLsn, - m_NextOpsOffset); - } - - void ReplayLog(const std::vector<OplogEntryAddress>& Entries, std::function<void(CbObject)>&& Handler) - { - for (const OplogEntryAddress& Entry : Entries) - { - IoBuffer OpBuffer(Entry.Size); - - const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign; - m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset); - - CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), OpBuffer.Size())); - - Handler(Op); - } - } - - CbObject GetOp(const OplogEntryAddress& Entry) - { - IoBuffer OpBuffer(Entry.Size); - - const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign; - m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset); - - return CbObject(SharedBuffer::MakeView(OpBuffer.Data(), OpBuffer.Size())); - } - - OplogEntry AppendOp(CbObject Op) - { - SharedBuffer Buffer = Op.GetBuffer(); - const uint64_t WriteSize = Buffer.GetSize(); - const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF); - - XXH3_128Stream KeyHasher; - Op["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); - XXH3_128 KeyHash = KeyHasher.GetHash(); - - RwLock::ExclusiveLockScope _(m_RwLock); - const uint64_t WriteOffset = m_NextOpsOffset; - const uint32_t OpLsn = ++m_MaxLsn; - - m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign); - - ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign)); - - OplogEntry Entry = {.OpLsn = OpLsn, - .OpCoreOffset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign), - .OpCoreSize = uint32_t(Buffer.GetSize()), - .OpCoreHash = OpCoreHash, - .OpKeyHash = KeyHash}; - - m_Oplog.Append(Entry); - - m_OpBlobs.Write(Buffer.GetData(), WriteSize, WriteOffset); - - return Entry; - } - - void Flush() - { - m_Oplog.Flush(); - m_OpBlobs.Flush(); - } - - spdlog::logger& Log() { return m_OwnerOplog->Log(); } - -private: - ProjectStore::Oplog* m_OwnerOplog; - std::filesystem::path m_OplogStoragePath; - RwLock m_RwLock; - TCasLogFile<OplogEntry> m_Oplog; - BasicFile m_OpBlobs; - std::atomic<uint64_t> m_NextOpsOffset{0}; - uint64_t m_OpsAlign = 32; - std::atomic<uint32_t> m_MaxLsn{0}; - -#if USE_ROCKSDB - std::unique_ptr<rocksdb::DB> m_RocksDb; - std::vector<rocksdb::ColumnFamilyHandle*> m_RocksDbColumnHandles; -#endif -}; - -////////////////////////////////////////////////////////////////////////// - -ProjectStore::Oplog::Oplog(std::string_view Id, Project* Outer, CasStore& Store, std::filesystem::path BasePath) -: m_OuterProject(Outer) -, m_CasStore(Store) -, m_OplogId(Id) -, m_BasePath(BasePath) -{ - m_Storage = new OplogStorage(this, m_BasePath); - const bool StoreExists = m_Storage->Exists(); - m_Storage->Open(/* IsCreate */ !StoreExists); - - m_TempPath = m_BasePath / "temp"; - - zen::CleanDirectory(m_TempPath); -} - -ProjectStore::Oplog::~Oplog() = default; - -bool -ProjectStore::Oplog::ExistsAt(std::filesystem::path BasePath) -{ - return OplogStorage::Exists(BasePath); -} - -void -ProjectStore::Oplog::ReplayLog() -{ - m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(Op, OpEntry, kUpdateReplay); }); -} - -IoBuffer -ProjectStore::Oplog::FindChunk(Oid ChunkId) -{ - RwLock::SharedLockScope _(m_OplogLock); - - if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) - { - _.ReleaseNow(); - - return m_CasStore.FindChunk(ChunkIt->second); - } - - if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) - { - _.ReleaseNow(); - - std::filesystem::path FilePath = m_OuterProject->RootDir / FileIt->second.ServerPath; - - return IoBufferBuilder::MakeFromFile(FilePath.native().c_str()); - } - - if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) - { - _.ReleaseNow(); - - return m_CasStore.FindChunk(MetaIt->second); - } - - return {}; -} - -void -ProjectStore::Oplog::IterateFileMap( - std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn) -{ - RwLock::SharedLockScope _(m_OplogLock); - - for (const auto& Kv : m_FileMap) - { - Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath); - } -} - -void -ProjectStore::Oplog::IterateChunkMap(std::function<void(const Oid&, const IoHash&)>&& Fn) -{ - RwLock::SharedLockScope _(m_OplogLock); - - for (const auto& Kv : m_ChunkMap) - { - Fn(Kv.first, Kv.second); - } -} - -void -ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler) -{ - std::vector<OplogEntryAddress> Entries; - Entries.reserve(m_LatestOpMap.size()); - - for (const auto& Kv : m_LatestOpMap) - { - const auto AddressEntry = m_OpAddressMap.find(Kv.second); - ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); - - Entries.push_back(AddressEntry->second); - } - - std::sort(Entries.begin(), Entries.end(), [](const OplogEntryAddress& Lhs, const OplogEntryAddress& Rhs) { - return Lhs.Offset < Rhs.Offset; - }); - - m_Storage->ReplayLog(Entries, [&](CbObject Op) { Handler(Op); }); -} - -std::optional<CbObject> -ProjectStore::Oplog::GetOplog(const Oid& Key) -{ - if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end()) - { - const auto AddressEntry = m_OpAddressMap.find(LatestOp->second); - ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); - - return m_Storage->GetOp(AddressEntry->second); - } - - return {}; -} - -bool -ProjectStore::Oplog::AddFileMapping(Oid FileId, IoHash Hash, std::string_view ServerPath, std::string_view ClientPath) -{ - // NOTE: Caller must hold an exclusive lock on m_OplogLock - - if (ServerPath.empty() || ClientPath.empty()) - { - return false; - } - - if (ServerPath[0] == '/' || ClientPath[0] != '/') - { - return false; - } - - FileMapEntry Entry; - Entry.ServerPath = ServerPath; - Entry.ClientPath = ClientPath; - - m_FileMap[FileId] = std::move(Entry); - - if (Hash != IoHash::Zero) - { - m_ChunkMap[FileId] = Hash; - } - - return true; -} - -void -ProjectStore::Oplog::AddChunkMapping(Oid ChunkId, IoHash Hash) -{ - // NOTE: Caller must hold an exclusive lock on m_OplogLock - - m_ChunkMap[ChunkId] = Hash; -} - -void -ProjectStore::Oplog::AddMetaMapping(Oid ChunkId, IoHash Hash) -{ - // NOTE: Caller must hold an exclusive lock on m_OplogLock - - m_MetaMap[ChunkId] = Hash; -} - -uint32_t -ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry, UpdateType TypeOfUpdate) -{ - ZEN_UNUSED(TypeOfUpdate); - - // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing - // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however - - RwLock::ExclusiveLockScope _(m_OplogLock); - - using namespace std::literals; - - // Update chunk id maps - - if (Core["package"sv]) - { - CbObjectView PkgObj = Core["package"sv].AsObjectView(); - Oid PackageId = PkgObj["id"sv].AsObjectId(); - IoHash PackageHash = PkgObj["data"sv].AsBinaryAttachment(); - - AddChunkMapping(PackageId, PackageHash); - - Log().debug("package data {} -> {}", PackageId, PackageHash); - } - - for (CbFieldView& Entry : Core["bulkdata"sv]) - { - CbObjectView BulkObj = Entry.AsObjectView(); - - Oid BulkDataId = BulkObj["id"sv].AsObjectId(); - IoHash BulkDataHash = BulkObj["data"sv].AsBinaryAttachment(); - - AddChunkMapping(BulkDataId, BulkDataHash); - - Log().debug("bulkdata {} -> {}", BulkDataId, BulkDataHash); - } - - if (Core["files"sv]) - { - Stopwatch Timer; - int32_t FileCount = 0; - - for (CbFieldView& Entry : Core["files"sv]) - { - CbObjectView FileObj = Entry.AsObjectView(); - const Oid FileId = FileObj["id"sv].AsObjectId(); - IoHash FileDataHash = FileObj["data"sv].AsBinaryAttachment(); - std::string_view ServerPath = FileObj["serverpath"sv].AsString(); - std::string_view ClientPath = FileObj["clientpath"sv].AsString(); - - if (AddFileMapping(FileId, FileDataHash, ServerPath, ClientPath)) - { - ++FileCount; - } - else - { - Log().warn("invalid file"); - } - } - - Log().debug("added {} file(s) in {}", FileCount, NiceTimeSpanMs(Timer.getElapsedTimeMs())); - } - - for (CbFieldView& Entry : Core["meta"sv]) - { - CbObjectView MetaObj = Entry.AsObjectView(); - const Oid MetaId = MetaObj["id"sv].AsObjectId(); - auto NameString = MetaObj["name"sv].AsString(); - IoHash MetaDataHash = MetaObj["data"sv].AsBinaryAttachment(); - - AddMetaMapping(MetaId, MetaDataHash); - - Log().debug("meta data ({}) {} -> {}", NameString, MetaId, MetaDataHash); - } - - m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); - m_LatestOpMap[OpEntry.OpKeyAsOId()] = OpEntry.OpLsn; - - return OpEntry.OpLsn; -} - -uint32_t -ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) -{ - using namespace std::literals; - - const CbObject& Core = OpPackage.GetObject(); - const OplogEntry OpEntry = m_Storage->AppendOp(Core); - - // Persist attachments - - auto Attachments = OpPackage.GetAttachments(); - - for (const auto& Attach : Attachments) - { - IoBuffer AttachmentData = Attach.AsBinaryView().AsIoBuffer(); - m_CasStore.InsertChunk(AttachmentData, Attach.GetHash()); - } - - return RegisterOplogEntry(Core, OpEntry, kUpdateNewEntry); -} - -////////////////////////////////////////////////////////////////////////// - -ProjectStore::Project::Project(ProjectStore* PrjStore, CasStore& Store, std::filesystem::path BasePath) -: m_ProjectStore(PrjStore) -, m_CasStore(Store) -, m_OplogStoragePath(BasePath) -{ -} - -ProjectStore::Project::~Project() -{ -} - -bool -ProjectStore::Project::Exists(std::filesystem::path BasePath) -{ - return std::filesystem::exists(BasePath / "Project.zcb"); -} - -void -ProjectStore::Project::Read() -{ - std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"; - - spdlog::info("reading config for project '{}' from {}", Identifier, ProjectStateFilePath); - - BasicFile Blob; - Blob.Open(ProjectStateFilePath, false); - - IoBuffer Obj = Blob.ReadAll(); - CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); - - if (ValidationError == CbValidateError::None) - { - CbObject Cfg = LoadCompactBinaryObject(Obj); - - Identifier = Cfg["id"].AsString(); - RootDir = Cfg["root"].AsString(); - ProjectRootDir = Cfg["project"].AsString(); - EngineRootDir = Cfg["engine"].AsString(); - } - else - { - spdlog::error("validation error {} hit for '{}'", int(ValidationError), ProjectStateFilePath); - } -} - -void -ProjectStore::Project::Write() -{ - MemoryOutStream Mem; - BinaryWriter Writer(Mem); - - CbObjectWriter Cfg; - Cfg << "id" << Identifier; - Cfg << "root" << WideToUtf8(RootDir.c_str()); - Cfg << "project" << ProjectRootDir; - Cfg << "engine" << EngineRootDir; - - Cfg.Save(Writer); - - CreateDirectories(m_OplogStoragePath); - - std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"; - - spdlog::info("persisting config for project '{}' to {}", Identifier, ProjectStateFilePath); - - BasicFile Blob; - Blob.Open(ProjectStateFilePath, true); - Blob.Write(Mem.Data(), Mem.Size(), 0); - Blob.Flush(); -} - -spdlog::logger& -ProjectStore::Project::Log() -{ - return m_ProjectStore->Log(); -} - -std::filesystem::path -ProjectStore::Project::BasePathForOplog(std::string_view OplogId) -{ - return m_OplogStoragePath / OplogId; -} - -ProjectStore::Oplog* -ProjectStore::Project::NewOplog(std::string_view OplogId) -{ - RwLock::ExclusiveLockScope _(m_ProjectLock); - - std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); - - try - { - Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CasStore, OplogBasePath).first->second; - - return &Log; - } - catch (std::exception&) - { - // In case of failure we need to ensure there's no half constructed entry around - // - // (This is probably already ensured by the try_emplace implementation?) - - m_Oplogs.erase(std::string{OplogId}); - - return nullptr; - } -} - -ProjectStore::Oplog* -ProjectStore::Project::OpenOplog(std::string_view OplogId) -{ - { - RwLock::SharedLockScope _(m_ProjectLock); - - auto OplogIt = m_Oplogs.find(std::string(OplogId)); - - if (OplogIt != m_Oplogs.end()) - { - return &OplogIt->second; - } - } - - RwLock::ExclusiveLockScope _(m_ProjectLock); - - std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); - - if (Oplog::ExistsAt(OplogBasePath)) - { - // Do open of existing oplog - - try - { - Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CasStore, OplogBasePath).first->second; - - Log.ReplayLog(); - - return &Log; - } - catch (std::exception& ex) - { - spdlog::error("failed to open oplog '{}' @ '{}': {}", OplogId, OplogBasePath, ex.what()); - - m_Oplogs.erase(std::string{OplogId}); - } - } - - return nullptr; -} - -void -ProjectStore::Project::DeleteOplog(std::string_view OplogId) -{ - bool Exists = false; - - { - RwLock::ExclusiveLockScope _(m_ProjectLock); - - auto OplogIt = m_Oplogs.find(std::string(OplogId)); - - if (OplogIt != m_Oplogs.end()) - { - Exists = true; - - m_Oplogs.erase(OplogIt); - } - } - - // Actually erase - - std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); - - OplogStorage::Delete(OplogBasePath); -} - -void -ProjectStore::Project::IterateOplogs(std::function<void(const Oplog&)>&& Fn) const -{ - // TODO: should iterate over oplogs which are present on disk but not yet loaded - - RwLock::SharedLockScope _(m_ProjectLock); - - for (auto& Kv : m_Oplogs) - { - Fn(Kv.second); - } -} - -////////////////////////////////////////////////////////////////////////// - -ProjectStore::ProjectStore(CasStore& Store, std::filesystem::path BasePath) -: m_Log("project", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks())) -, m_ProjectBasePath(BasePath) -, m_CasStore(Store) -{ - m_Log.info("initializing project store at '{}'", BasePath); - m_Log.set_level(spdlog::level::debug); -} - -ProjectStore::~ProjectStore() -{ - m_Log.info("closing project store ('{}')", m_ProjectBasePath); -} - -std::filesystem::path -ProjectStore::BasePathForProject(std::string_view ProjectId) -{ - return m_ProjectBasePath / ProjectId; -} - -ProjectStore::Project* -ProjectStore::OpenProject(std::string_view ProjectId) -{ - { - RwLock::SharedLockScope _(m_ProjectsLock); - - auto ProjIt = m_Projects.find(std::string{ProjectId}); - - if (ProjIt != m_Projects.end()) - { - return &(ProjIt->second); - } - } - - RwLock::ExclusiveLockScope _(m_ProjectsLock); - - std::filesystem::path ProjectBasePath = BasePathForProject(ProjectId); - - if (Project::Exists(ProjectBasePath)) - { - try - { - Log().info("opening project {} @ {}", ProjectId, ProjectBasePath); - - ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CasStore, ProjectBasePath).first->second; - Prj.Identifier = ProjectId; - Prj.Read(); - return &Prj; - } - catch (std::exception& e) - { - Log().warn("failed to open {} @ {} ({})", ProjectId, ProjectBasePath, e.what()); - m_Projects.erase(std::string{ProjectId}); - } - } - - return nullptr; -} - -ProjectStore::Project* -ProjectStore::NewProject(std::filesystem::path BasePath, - std::string_view ProjectId, - std::string_view RootDir, - std::string_view EngineRootDir, - std::string_view ProjectRootDir) -{ - RwLock::ExclusiveLockScope _(m_ProjectsLock); - - ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CasStore, BasePath).first->second; - Prj.Identifier = ProjectId; - Prj.RootDir = RootDir; - Prj.EngineRootDir = EngineRootDir; - Prj.ProjectRootDir = ProjectRootDir; - Prj.Write(); - - return &Prj; -} - -void -ProjectStore::DeleteProject(std::string_view ProjectId) -{ - std::filesystem::path ProjectBasePath = BasePathForProject(ProjectId); - - Log().info("deleting project {} @ {}", ProjectId, ProjectBasePath); - - m_Projects.erase(std::string{ProjectId}); - - DeleteDirectories(ProjectBasePath); -} - -bool -ProjectStore::Exists(std::string_view ProjectId) -{ - return Project::Exists(BasePathForProject(ProjectId)); -} - -ProjectStore::Oplog* -ProjectStore::OpenProjectOplog(std::string_view ProjectId, std::string_view OplogId) -{ - if (Project* ProjectIt = OpenProject(ProjectId)) - { - return ProjectIt->OpenOplog(OplogId); - } - - return nullptr; -} - -////////////////////////////////////////////////////////////////////////// - -HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) -: m_CasStore(Store) -, m_Log("project", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks())) -, m_ProjectStore(Projects) -{ - using namespace std::literals; - - m_Router.AddPattern("project", "([[:alnum:]_.]+)"); - m_Router.AddPattern("log", "([[:alnum:]_.]+)"); - m_Router.AddPattern("op", "([[:digit:]]+?)"); - m_Router.AddPattern("chunk", "([[:xdigit:]]{24})"); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/batch", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); - - if (FoundLog == nullptr) - { - return HttpReq.WriteResponse(HttpResponse::NotFound); - } - - ProjectStore::Oplog& Log = *FoundLog; - - // Parse Request - - IoBuffer Payload = HttpReq.ReadPayload(); - MemoryInStream MemIn(Payload.Data(), Payload.Size()); - BinaryReader Reader(MemIn); - - struct RequestHeader - { - enum - { - kMagic = 0xAAAA'77AC - }; - uint32_t Magic; - uint32_t ChunkCount; - uint32_t Reserved1; - uint32_t Reserved2; - }; - - struct RequestChunkEntry - { - Oid ChunkId; - uint32_t CorrelationId; - uint64_t Offset; - uint64_t RequestBytes; - }; - - if (Payload.Size() <= sizeof(RequestHeader)) - { - HttpReq.WriteResponse(HttpResponse::BadRequest); - } - - RequestHeader RequestHdr; - Reader.Read(&RequestHdr, sizeof RequestHdr); - - if (RequestHdr.Magic != RequestHeader::kMagic) - { - HttpReq.WriteResponse(HttpResponse::BadRequest); - } - - std::vector<RequestChunkEntry> RequestedChunks; - RequestedChunks.resize(RequestHdr.ChunkCount); - Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount); - - // Make Response - - struct ResponseHeader - { - uint32_t Magic = 0xbada'b00f; - uint32_t ChunkCount; - uint32_t Reserved1 = 0; - uint32_t Reserved2 = 0; - }; - - struct ResponseChunkEntry - { - uint32_t CorrelationId; - uint32_t Flags = 0; - uint64_t ChunkSize; - }; - - std::vector<IoBuffer> OutBlobs; - OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry)); - for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) - { - const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; - IoBuffer FoundChunk = FoundLog->FindChunk(RequestedChunk.ChunkId); - if (FoundChunk) - { - if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1)) - { - uint64_t Offset = RequestedChunk.Offset; - if (Offset > FoundChunk.Size()) - { - Offset = FoundChunk.Size(); - } - uint64_t Size = RequestedChunk.RequestBytes; - if ((Offset + Size) > FoundChunk.Size()) - { - Size = FoundChunk.Size() - Offset; - } - FoundChunk = IoBuffer(FoundChunk, Offset, Size); - } - } - OutBlobs.emplace_back(std::move(FoundChunk)); - } - uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData()); - ResponseHeader ResponseHdr; - ResponseHdr.ChunkCount = RequestHdr.ChunkCount; - memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr)); - ResponsePtr += sizeof(ResponseHdr); - for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) - { - const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; - const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]); - ResponseChunkEntry ResponseChunk; - ResponseChunk.CorrelationId = ChunkIndex; - if (FoundChunk) - { - ResponseChunk.ChunkSize = FoundChunk.Size(); - } - else - { - ResponseChunk.ChunkSize = uint64_t(-1); - } - memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk)); - ResponsePtr += sizeof(ResponseChunk); - } - - return HttpReq.WriteResponse(HttpResponse::OK, HttpContentType::kBinary, OutBlobs); - }, - HttpVerb::kPost); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/files", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - // File manifest fetch, returns the client file list - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); - - if (FoundLog == nullptr) - { - return HttpReq.WriteResponse(HttpResponse::NotFound); - } - - ProjectStore::Oplog& Log = *FoundLog; - - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - - const bool FilterClient = Params.GetValue("filter") == "client"; - - CbObjectWriter Response; - Response.BeginArray("files"); - - Log.IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) { - Response.BeginObject(); - Response << "id" << Id; - Response << "clientpath" << ClientPath; - if (!FilterClient) - { - Response << "serverpath" << ServerPath; - } - Response.EndObject(); - }); - - Response.EndArray(); - - return HttpReq.WriteResponse(HttpResponse::OK, Response.Save()); - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/{chunk}/info", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - const auto& ChunkId = Req.GetCapture(3); - - ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); - - if (FoundLog == nullptr) - { - return HttpReq.WriteResponse(HttpResponse::NotFound); - } - - ProjectStore::Oplog& Log = *FoundLog; - - Oid Obj = Oid::FromHexString(ChunkId); - - IoBuffer Value = Log.FindChunk(Obj); - - if (Value) - { - CbObjectWriter Response; - Response << "size" << Value.Size(); - return HttpReq.WriteResponse(HttpResponse::OK, Response.Save()); - } - - return HttpReq.WriteResponse(HttpResponse::NotFound); - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/{chunk}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - const auto& ChunkId = Req.GetCapture(3); - - bool IsOffset = false; - uint64_t Offset = 0; - uint64_t Size = ~(0ull); - - auto QueryParms = Req.ServerRequest().GetQueryParams(); - - if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false) - { - if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm)) - { - Offset = OffsetVal.value(); - IsOffset = true; - } - else - { - return HttpReq.WriteResponse(HttpResponse::BadRequest); - } - } - - if (auto SizeParm = QueryParms.GetValue("size"); SizeParm.empty() == false) - { - if (auto SizeVal = ParseInt<uint64_t>(SizeParm)) - { - Size = SizeVal.value(); - IsOffset = true; - } - else - { - return HttpReq.WriteResponse(HttpResponse::BadRequest); - } - } - - m_Log.debug("chunk - {} / {} / {}", ProjectId, OplogId, ChunkId); - - ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); - - if (FoundLog == nullptr) - { - return HttpReq.WriteResponse(HttpResponse::NotFound); - } - - ProjectStore::Oplog& Log = *FoundLog; - - Oid Obj = Oid::FromHexString(ChunkId); - - IoBuffer Value = Log.FindChunk(Obj); - - switch (HttpVerb Verb = HttpReq.RequestVerb()) - { - case HttpVerb::kHead: - case HttpVerb::kGet: - if (!Value) - { - return HttpReq.WriteResponse(HttpResponse::NotFound); - } - - if (Verb == HttpVerb::kHead) - { - HttpReq.SetSuppressResponseBody(); - } - - if (IsOffset) - { - if (Offset > Value.Size()) - { - Offset = Value.Size(); - } - - if ((Offset + Size) > Value.Size()) - { - Size = Value.Size() - Offset; - } - - // Send only a subset of data - IoBuffer InnerValue(Value, Offset, Size); - - return HttpReq.WriteResponse(HttpResponse::OK, HttpContentType::kBinary, InnerValue); - } - - return HttpReq.WriteResponse(HttpResponse::OK, HttpContentType::kBinary, Value); - } - }, - HttpVerb::kGet | HttpVerb::kHead); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/prep", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); - - if (FoundLog == nullptr) - { - return HttpReq.WriteResponse(HttpResponse::NotFound); - } - - ProjectStore::Oplog& Log = *FoundLog; - - // This operation takes a list of referenced hashes and decides which - // chunks are not present on this server. This list is then returned in - // the "need" list in the response - - IoBuffer Payload = HttpReq.ReadPayload(); - CbObject RequestObject = LoadCompactBinaryObject(Payload); - - std::vector<IoHash> NeedList; - - for (auto Entry : RequestObject["have"sv]) - { - const IoHash FileHash = Entry.AsHash(); - - if (!m_CasStore.FindChunk(FileHash)) - { - spdlog::debug("NEED: {}", FileHash); - - NeedList.push_back(FileHash); - } - } - - CbObjectWriter Cbo; - Cbo.BeginArray("need"); - - for (const IoHash& Hash : NeedList) - { - Cbo << Hash; - } - - Cbo.EndArray(); - CbObject Response = Cbo.Save(); - - return HttpReq.WriteResponse(HttpResponse::OK, Response); - }, - HttpVerb::kPost); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/new", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - - bool IsUsingSalt = false; - IoHash SaltHash = IoHash::Zero; - - if (std::string_view SaltParam = Params.GetValue("salt"); SaltParam.empty() == false) - { - const uint32_t Salt = std::stoi(std::string(SaltParam)); - SaltHash = IoHash::HashMemory(&Salt, sizeof Salt); - IsUsingSalt = true; - } - - ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); - - if (FoundLog == nullptr) - { - return HttpReq.WriteResponse(HttpResponse::NotFound); - } - - ProjectStore::Oplog& Log = *FoundLog; - - IoBuffer Payload = HttpReq.ReadPayload(); - - // This will attempt to open files which may not exist for the case where - // the prep step rejected the chunk. This should be fixed since there's - // a performance cost associated with any file system activity - - bool IsValid = true; - std::vector<IoHash> MissingChunks; - - CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer { - IoHash AttachmentId; - - if (IsUsingSalt) - { - IoHash AttachmentSpec[]{SaltHash, Hash}; - AttachmentId = IoHash::HashMemory(MakeMemoryView(AttachmentSpec)); - } - else - { - AttachmentId = Hash; - } - - std::filesystem::path AttachmentPath = Log.TempPath() / AttachmentId.ToHexString(); - - if (IoBuffer Data = m_CasStore.FindChunk(Hash)) - { - return SharedBuffer(std::move(Data)); - } - else if (Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath.native().c_str())) - { - return SharedBuffer(std::move(Data)); - } - else - { - IsValid = false; - MissingChunks.push_back(Hash); - - return {}; - } - }; - - CbPackage Package; - Package.Load(Payload, &UniqueBuffer::Alloc, &Resolver); - - if (!IsValid) - { - // TODO: emit diagnostics identifying missing chunks - - return HttpReq.WriteResponse(HttpResponse::NotFound, HttpContentType::kText, "Missing chunk reference"); - } - - CbObject Core = Package.GetObject(); - - if (!Core["key"sv]) - { - return HttpReq.WriteResponse(HttpResponse::BadRequest, HttpContentType::kText, "No oplog entry key specified"); - } - - // Write core to oplog - - const uint32_t OpLsn = Log.AppendNewOplogEntry(Package); - - if (OpLsn == ProjectStore::Oplog::kInvalidOp) - { - return HttpReq.WriteResponse(HttpResponse::BadRequest); - } - - m_Log.info("new op #{:4} - {}/{} ({:>6}) {}", OpLsn, ProjectId, OplogId, NiceBytes(Payload.Size()), Core["key"sv].AsString()); - - HttpReq.WriteResponse(HttpResponse::Created); - }, - HttpVerb::kPost); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/{op}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - // TODO: look up op and respond with the payload! - - HttpReq.WriteResponse(HttpResponse::Accepted, HttpContentType::kText, u8"yeee"sv); - }, - HttpVerb::kGet); - - using namespace fmt::literals; - - m_Router.RegisterRoute( - "{project}/oplog/{log}", - [this](HttpRouterRequest& Req) { - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - ProjectStore::Project* ProjectIt = m_ProjectStore->OpenProject(ProjectId); - - if (!ProjectIt) - { - return Req.ServerRequest().WriteResponse(HttpResponse::NotFound, - HttpContentType::kText, - "project {} not found"_format(ProjectId)); - } - - ProjectStore::Project& Prj = *ProjectIt; - - switch (Req.ServerRequest().RequestVerb()) - { - case HttpVerb::kGet: - { - ProjectStore::Oplog* OplogIt = Prj.OpenOplog(OplogId); - - if (!OplogIt) - { - return Req.ServerRequest().WriteResponse(HttpResponse::NotFound, - HttpContentType::kText, - "oplog {} not found in project {}"_format(OplogId, ProjectId)); - } - - ProjectStore::Oplog& Log = *OplogIt; - - CbObjectWriter Cb; - Cb << "id"sv << Log.OplogId() << "project"sv << Prj.Identifier << "tempdir"sv << Log.TempDir(); - - Req.ServerRequest().WriteResponse(HttpResponse::OK, Cb.Save()); - } - break; - - case HttpVerb::kPost: - { - ProjectStore::Oplog* OplogIt = Prj.OpenOplog(OplogId); - - if (!OplogIt) - { - if (!Prj.NewOplog(OplogId)) - { - // TODO: indicate why the operation failed! - return Req.ServerRequest().WriteResponse(HttpResponse::InternalServerError); - } - - m_Log.info("established oplog {} / {}", ProjectId, OplogId); - - return Req.ServerRequest().WriteResponse(HttpResponse::Created); - } - - // I guess this should ultimately be used to execute RPCs but for now, it - // does absolutely nothing - - return Req.ServerRequest().WriteResponse(HttpResponse::BadRequest); - } - break; - - case HttpVerb::kDelete: - { - spdlog::info("deleting oplog {}/{}", ProjectId, OplogId); - - ProjectIt->DeleteOplog(OplogId); - - return Req.ServerRequest().WriteResponse(HttpResponse::OK); - } - break; - } - }, - HttpVerb::kPost | HttpVerb::kGet | HttpVerb::kDelete); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/entries", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); - - if (FoundLog == nullptr) - { - return HttpReq.WriteResponse(HttpResponse::NotFound); - } - - CbObjectWriter Response; - - if (FoundLog->OplogCount() > 0) - { - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - - if (auto OpKey = Params.GetValue("opkey"); !OpKey.empty()) - { - XXH3_128Stream KeyHasher; - KeyHasher.Append(OpKey.data(), OpKey.size()); - XXH3_128 OpKeyHash = KeyHasher.GetHash(); - - Oid OpId; - memcpy(OpId.OidBits, &OpKeyHash, sizeof(OpId.OidBits)); - - std::optional<CbObject> Op = FoundLog->GetOplog(OpId); - - if (Op.has_value()) - { - Response << Op.value(); - } - } - else - { - Response.BeginArray("entries"sv); - - FoundLog->IterateOplog([&Response](CbObject Op) { Response << Op; }); - - Response.EndArray(); - } - } - - return HttpReq.WriteResponse(HttpResponse::OK, Response.Save()); - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "{project}", - [this](HttpRouterRequest& Req) { - const std::string ProjectId = Req.GetCapture(1); - - switch (Req.ServerRequest().RequestVerb()) - { - case HttpVerb::kPost: - { - IoBuffer Payload = Req.ServerRequest().ReadPayload(); - CbObject Params = LoadCompactBinaryObject(Payload); - std::string_view Id = Params["id"sv].AsString(); - std::string_view Root = Params["root"sv].AsString(); - std::string_view EngineRoot = Params["engine"sv].AsString(); - std::string_view ProjectRoot = Params["project"sv].AsString(); - - const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId; - m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot); - - m_Log.info("established project - {} (id: '{}', roots: '{}', '{}', '{}')", - ProjectId, - Id, - Root, - EngineRoot, - ProjectRoot); - - Req.ServerRequest().WriteResponse(HttpResponse::Created); - } - break; - - case HttpVerb::kGet: - { - ProjectStore::Project* ProjectIt = m_ProjectStore->OpenProject(ProjectId); - - if (!ProjectIt) - { - return Req.ServerRequest().WriteResponse(HttpResponse::NotFound, - HttpContentType::kText, - "project {} not found"_format(ProjectId)); - } - - const ProjectStore::Project& Prj = *ProjectIt; - - CbObjectWriter Response; - Response << "id" << Prj.Identifier << "root" << WideToUtf8(Prj.RootDir.c_str()); - - Response.BeginArray("oplogs"sv); - Prj.IterateOplogs([&](const ProjectStore::Oplog& I) { Response << "id"sv << I.OplogId(); }); - Response.EndArray(); // oplogs - - Req.ServerRequest().WriteResponse(HttpResponse::OK, Response.Save()); - } - break; - - case HttpVerb::kDelete: - { - ProjectStore::Project* ProjectIt = m_ProjectStore->OpenProject(ProjectId); - - if (!ProjectIt) - { - return Req.ServerRequest().WriteResponse(HttpResponse::NotFound, - HttpContentType::kText, - "project {} not found"_format(ProjectId)); - } - - m_ProjectStore->DeleteProject(ProjectId); - } - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kDelete); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/chunks", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - auto QueryParams = Req.ServerRequest().GetQueryParams(); - - ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); - - if (FoundLog == nullptr) - { - return HttpReq.WriteResponse(HttpResponse::NotFound); - } - - CbObjectWriter Response; - - Response.BeginArray("chunks"sv); - - FoundLog->IterateChunkMap([&Response](const Oid& Id, const IoHash& Hash) { - Response.BeginObject(); - Response << "id"sv << Id; - Response << "hash"sv << Hash; - Response.EndObject(); - }); - - Response.EndArray(); - - return HttpReq.WriteResponse(HttpResponse::OK, Response.Save()); - }, - HttpVerb::kGet); -} - -HttpProjectService::~HttpProjectService() -{ -} - -const char* -HttpProjectService::BaseUri() const -{ - return "/prj/"; -} - -void -HttpProjectService::HandleRequest(HttpServerRequest& Request) -{ - if (m_Router.HandleRequest(Request) == false) - { - m_Log.warn("No route found for {0}", Request.RelativeUri()); - } -} - -////////////////////////////////////////////////////////////////////////// - -class SecurityAttributes -{ -public: - inline SECURITY_ATTRIBUTES* Attributes() { return &m_Attributes; } - -protected: - SECURITY_ATTRIBUTES m_Attributes{}; - SECURITY_DESCRIPTOR m_Sd{}; -}; - -// Security attributes which allows any user access - -class AnyUserSecurityAttributes : public SecurityAttributes -{ -public: - AnyUserSecurityAttributes() - { - m_Attributes.nLength = sizeof m_Attributes; - m_Attributes.bInheritHandle = false; // Disable inheritance - - const BOOL success = InitializeSecurityDescriptor(&m_Sd, SECURITY_DESCRIPTOR_REVISION); - - if (success) - { - const BOOL bSetOk = SetSecurityDescriptorDacl(&m_Sd, TRUE, (PACL)NULL, FALSE); - if (bSetOk) - { - m_Attributes.lpSecurityDescriptor = &m_Sd; - } - } - } -}; - -////////////////////////////////////////////////////////////////////////// - -struct LocalProjectService::LocalProjectImpl -{ - LocalProjectImpl() : m_WorkerThreadPool(ServiceThreadCount) {} - ~LocalProjectImpl() { Stop(); } - - void Start() - { - ZEN_ASSERT(!m_IsStarted); - - for (int i = 0; i < 32; ++i) - { - PipeConnection* NewPipe = new PipeConnection(this); - m_ServicePipes.push_back(NewPipe); - m_IoContext.post([NewPipe] { NewPipe->Accept(); }); - } - - for (int i = 0; i < ServiceThreadCount; ++i) - { - asio::post(m_WorkerThreadPool, [this] { - try - { - m_IoContext.run(); - } - catch (std::exception& ex) - { - spdlog::error("exception caught in pipe project service loop: {}", ex.what()); - } - - m_ShutdownLatch.count_down(); - }); - } - - m_IsStarted = true; - } - - void Stop() - { - if (!m_IsStarted) - { - return; - } - - for (PipeConnection* Pipe : m_ServicePipes) - { - Pipe->Disconnect(); - } - - m_IoContext.stop(); - m_ShutdownLatch.wait(); - - for (PipeConnection* Pipe : m_ServicePipes) - { - delete Pipe; - } - - m_ServicePipes.clear(); - } - -private: - asio::io_context& IoContext() { return m_IoContext; } - auto PipeSecurityAttributes() { return m_AnyUserSecurityAttributes.Attributes(); } - static const int ServiceThreadCount = 4; - - std::latch m_ShutdownLatch{ServiceThreadCount}; - asio::thread_pool m_WorkerThreadPool; - asio::io_context m_IoContext; - - class PipeConnection - { - enum PipeState - { - kUninitialized, - kConnecting, - kReading, - kWriting, - kDisconnected, - kInvalid - }; - - LocalProjectImpl* m_Outer; - asio::windows::stream_handle m_PipeHandle; - std::atomic<PipeState> m_PipeState{kUninitialized}; - - public: - PipeConnection(LocalProjectImpl* Outer) : m_Outer(Outer), m_PipeHandle{m_Outer->IoContext()} {} - ~PipeConnection() {} - - void Disconnect() - { - m_PipeState = kDisconnected; - DisconnectNamedPipe(m_PipeHandle.native_handle()); - } - - void Accept() - { - StringBuilder<64> PipeName; - PipeName << "\\\\.\\pipe\\zenprj"; // TODO: this should use an instance-specific identifier! - - HANDLE hPipe = CreateNamedPipeA(PipeName.c_str(), - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, - PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, - PIPE_UNLIMITED_INSTANCES, // Max instance count - 65536, // Output buffer size - 65536, // Input buffer size - 10'000, // Default timeout (ms) - m_Outer->PipeSecurityAttributes() // Security attributes - ); - - if (hPipe == INVALID_HANDLE_VALUE) - { - spdlog::warn("failed while creating named pipe {}", PipeName.c_str()); - - // TODO: error - how to best handle? - } - - m_PipeHandle.assign(hPipe); // This now owns the handle and will close it - - m_PipeState = kConnecting; - - asio::windows::overlapped_ptr OverlappedPtr( - m_PipeHandle.get_executor(), - std::bind(&PipeConnection::OnClientConnect, this, std::placeholders::_1, std::placeholders::_2)); - - OVERLAPPED* Overlapped = OverlappedPtr.get(); - BOOL Ok = ConnectNamedPipe(hPipe, Overlapped); - DWORD LastError = GetLastError(); - - if (!Ok && LastError != ERROR_IO_PENDING) - { - m_PipeState = kInvalid; - - // The operation completed immediately, so a completion notification needs - // to be posted. When complete() is called, ownership of the OVERLAPPED- - // derived object passes to the io_service. - std::error_code Ec(LastError, asio::error::get_system_category()); - OverlappedPtr.complete(Ec, 0); - } - else - { - // The operation was successfully initiated, so ownership of the - // OVERLAPPED-derived object has now passed to the io_service. - OverlappedPtr.release(); - } - } - - private: - void OnClientConnect(const std::error_code& Ec, size_t BytesTransferred) - { - ZEN_UNUSED(BytesTransferred); - - if (Ec) - { - if (m_PipeState == kDisconnected) - { - return; - } - - spdlog::warn("pipe connection error: {}", Ec.message()); - - // TODO: should disconnect and issue a new connect - return; - } - - spdlog::debug("pipe connection established"); - - IssueRead(); - } - - void IssueRead() - { - m_PipeState = kReading; - - m_PipeHandle.async_read_some(asio::mutable_buffer(m_MsgBuffer, sizeof m_MsgBuffer), - std::bind(&PipeConnection::OnClientRead, this, std::placeholders::_1, std::placeholders::_2)); - } - - void OnClientRead(const std::error_code& Ec, size_t Bytes) - { - if (Ec) - { - if (m_PipeState == kDisconnected) - { - return; - } - - spdlog::warn("pipe read error: {}", Ec.message()); - - // TODO: should disconnect and issue a new connect - return; - } - - spdlog::debug("received message: {} bytes", Bytes); - - // TODO: Actually process request - - m_PipeState = kWriting; - - asio::async_write(m_PipeHandle, - asio::buffer(m_MsgBuffer, Bytes), - std::bind(&PipeConnection::OnWriteCompletion, this, std::placeholders::_1, std::placeholders::_2)); - } - - void OnWriteCompletion(const std::error_code& Ec, size_t Bytes) - { - ZEN_UNUSED(Bytes); - - if (Ec) - { - if (m_PipeState == kDisconnected) - { - return; - } - - spdlog::warn("pipe write error: {}", Ec.message()); - - // TODO: should disconnect and issue a new connect - return; - } - - // Go back to reading - IssueRead(); - } - - uint8_t m_MsgBuffer[16384]; - }; - - AnyUserSecurityAttributes m_AnyUserSecurityAttributes; - std::vector<PipeConnection*> m_ServicePipes; - bool m_IsStarted = false; -}; - -LocalProjectService::LocalProjectService(CasStore& Store, ProjectStore* Projects) : m_CasStore(Store), m_ProjectStore(Projects) -{ - m_Impl = std::make_unique<LocalProjectImpl>(); - m_Impl->Start(); -} - -LocalProjectService::~LocalProjectService() -{ - m_Impl->Stop(); -} - -////////////////////////////////////////////////////////////////////////// - -} // namespace zen |