// Copyright Epic Games, Inc. All Rights Reserved. #include "projectstore.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "config.h" ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS #endif // ZEN_WITH_TESTS namespace zen { namespace { bool PrepareDirectoryDelete(const std::filesystem::path& Dir, std::filesystem::path& OutDeleteDir) { int DropIndex = 0; do { if (!std::filesystem::exists(Dir)) { return true; } std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; if (std::filesystem::exists(DroppedBucketPath)) { DropIndex++; continue; } std::error_code Ec; std::filesystem::rename(Dir, DroppedBucketPath, Ec); if (!Ec) { OutDeleteDir = DroppedBucketPath; return true; } if (Ec && !std::filesystem::exists(DroppedBucketPath)) { // We can't move our folder, probably because it is busy, bail.. return false; } Sleep(100); } while (true); } } // namespace ////////////////////////////////////////////////////////////////////////// Oid OpKeyStringAsOId(std::string_view OpKey) { using namespace std::literals; CbObjectWriter Writer; Writer << "key"sv << OpKey; XXH3_128Stream KeyHasher; Writer.Save()["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); XXH3_128 KeyHash = KeyHasher.GetHash(); Oid OpId; memcpy(OpId.OidBits, &KeyHash, sizeof(OpId.OidBits)); return OpId; } ////////////////////////////////////////////////////////////////////////// struct ProjectStore::OplogStorage : public RefCounted { OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath) { } ~OplogStorage() { ZEN_INFO("closing oplog storage at {}", m_OplogStoragePath); Flush(); } [[nodiscard]] bool Exists() { return Exists(m_OplogStoragePath); } [[nodiscard]] static bool Exists(std::filesystem::path BasePath) { return std::filesystem::exists(BasePath / "ops.zlog") && std::filesystem::exists(BasePath / "ops.zops"); } static bool Delete(std::filesystem::path BasePath) { return DeleteDirectories(BasePath); } void Open(bool IsCreate) { using namespace std::literals; ZEN_INFO("initializing oplog storage at '{}'", m_OplogStoragePath); if (IsCreate) { DeleteDirectories(m_OplogStoragePath); CreateDirectories(m_OplogStoragePath); } m_Oplog.Open(m_OplogStoragePath / "ops.zlog"sv, IsCreate ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); m_Oplog.Initialize(); m_OpBlobs.Open(m_OplogStoragePath / "ops.zops"sv, IsCreate ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); ZEN_ASSERT(IsPow2(m_OpsAlign)); ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1))); } void ReplayLog(std::function&& Handler) { ZEN_TRACE_CPU("ProjectStore::OplogStorage::ReplayLog"); // This could use memory mapping or do something clever but for now it just reads the file sequentially ZEN_INFO("replaying log for '{}'", m_OplogStoragePath); Stopwatch Timer; uint64_t InvalidEntries = 0; m_Oplog.Replay( [&](const OplogEntry& LogEntry) { if (LogEntry.OpCoreSize == 0) { ++InvalidEntries; return; } IoBuffer OpBuffer(LogEntry.OpCoreSize); const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); // Verify checksum, ignore op data if incorrect const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), OpBuffer.Size()) & 0xffffFFFF); if (OpCoreHash != LogEntry.OpCoreHash) { ZEN_WARN("skipping oplog entry with bad checksum!"); return; } CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), OpBuffer.Size())); m_NextOpsOffset = Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn); Handler(Op, LogEntry); }, 0); if (InvalidEntries) { ZEN_WARN("ignored {} zero-sized oplog entries", InvalidEntries); } ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()), m_MaxLsn, m_NextOpsOffset); } void ReplayLog(const std::vector& Entries, std::function&& Handler) { for (const OplogEntryAddress& Entry : Entries) { CbObject Op = GetOp(Entry); Handler(Op); } } CbObject GetOp(const OplogEntryAddress& Entry) { IoBuffer OpBuffer(Entry.Size); const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign; m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset); return CbObject(SharedBuffer(std::move(OpBuffer))); } OplogEntry AppendOp(CbObject Op) { ZEN_TRACE_CPU("ProjectStore::OplogStorage::AppendOp"); using namespace std::literals; SharedBuffer Buffer = Op.GetBuffer(); const uint64_t WriteSize = Buffer.GetSize(); const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF); ZEN_ASSERT(WriteSize != 0); XXH3_128Stream KeyHasher; Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); XXH3_128 KeyHash = KeyHasher.GetHash(); RwLock::ExclusiveLockScope _(m_RwLock); const uint64_t WriteOffset = m_NextOpsOffset; const uint32_t OpLsn = ++m_MaxLsn; m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign); ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign)); OplogEntry Entry = {.OpLsn = OpLsn, .OpCoreOffset = gsl::narrow_cast(WriteOffset / m_OpsAlign), .OpCoreSize = uint32_t(Buffer.GetSize()), .OpCoreHash = OpCoreHash, .OpKeyHash = KeyHash}; m_Oplog.Append(Entry); m_OpBlobs.Write(Buffer.GetData(), WriteSize, WriteOffset); return Entry; } void Flush() { m_Oplog.Flush(); m_OpBlobs.Flush(); } spdlog::logger& Log() { return m_OwnerOplog->Log(); } private: ProjectStore::Oplog* m_OwnerOplog; std::filesystem::path m_OplogStoragePath; RwLock m_RwLock; TCasLogFile m_Oplog; BasicFile m_OpBlobs; std::atomic m_NextOpsOffset{0}; uint64_t m_OpsAlign = 32; std::atomic m_MaxLsn{0}; }; ////////////////////////////////////////////////////////////////////////// ProjectStore::Oplog::Oplog(std::string_view Id, Project* Project, CidStore& Store, ZenCacheStore& CacheStore, std::filesystem::path BasePath) : m_OuterProject(Project) , m_CidStore(Store) , m_CacheStore(CacheStore) , m_BasePath(BasePath) , m_OplogId(Id) { using namespace std::literals; m_Storage = new OplogStorage(this, m_BasePath); const bool StoreExists = m_Storage->Exists(); m_Storage->Open(/* IsCreate */ !StoreExists); m_TempPath = m_BasePath / "temp"sv; CleanDirectory(m_TempPath); } ProjectStore::Oplog::~Oplog() { if (m_Storage) { Flush(); } } void ProjectStore::Oplog::Flush() { ZEN_ASSERT(m_Storage); m_Storage->Flush(); } void ProjectStore::Oplog::Scrub(ScrubContext& Ctx) const { ZEN_UNUSED(Ctx); } void ProjectStore::Oplog::GatherReferences(GcContext& GcCtx) { RwLock::SharedLockScope _(m_OplogLock); std::vector Hashes; Hashes.reserve(Max(m_ChunkMap.size(), m_MetaMap.size())); for (const auto& Kv : m_ChunkMap) { Hashes.push_back(Kv.second); } GcCtx.AddRetainedCids(Hashes); Hashes.clear(); for (const auto& Kv : m_MetaMap) { Hashes.push_back(Kv.second); } GcCtx.AddRetainedCids(Hashes); } std::filesystem::path ProjectStore::Oplog::PrepareForDelete(bool MoveFolder) { RwLock::ExclusiveLockScope _(m_OplogLock); m_ChunkMap.clear(); m_MetaMap.clear(); m_FileMap.clear(); m_OpAddressMap.clear(); m_LatestOpMap.clear(); m_Storage = {}; if (!MoveFolder) { return {}; } std::filesystem::path MovedDir; if (PrepareDirectoryDelete(m_BasePath, MovedDir)) { return MovedDir; } return {}; } bool ProjectStore::Oplog::ExistsAt(std::filesystem::path BasePath) { return OplogStorage::Exists(BasePath); } void ProjectStore::Oplog::ReplayLog() { m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(Op, OpEntry, kUpdateReplay); }); } IoBuffer ProjectStore::Oplog::FindChunk(Oid ChunkId) { RwLock::SharedLockScope OplogLock(m_OplogLock); if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) { IoHash ChunkHash = ChunkIt->second; OplogLock.ReleaseNow(); IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash); Chunk.SetContentType(ZenContentType::kCompressedBinary); return Chunk; } if (auto EntryIt = m_CacheMap.find(ChunkId); EntryIt != m_CacheMap.end()) { const CacheMapEntry& Entry = EntryIt->second; if (Entry.ValueId != Oid::Zero) { IoHash Cid = Entry.Cid; if (Cid == IoHash::Zero) { Cid = ResolveCacheMapping(Entry.Namespace, Entry.CacheKey, Entry.ValueId); if (Cid != IoHash::Zero) { CacheMapEntry ResolvedEntry = Entry; ResolvedEntry.Cid = Cid; m_CacheMap.insert_or_assign(ChunkId, ResolvedEntry); } } IoBuffer Chunk = m_CidStore.FindChunkByCid(Cid); Chunk.SetContentType(ZenContentType::kCompressedBinary); return Chunk; } else { ZenCacheValue CacheValue; m_CacheStore.Get(Entry.Namespace, Entry.CacheKey.Bucket, Entry.CacheKey.Hash, CacheValue); return CacheValue.Value; } } if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) { std::filesystem::path FilePath = m_OuterProject->RootDir / FileIt->second.ServerPath; OplogLock.ReleaseNow(); IoBuffer FileChunk = IoBufferBuilder::MakeFromFile(FilePath); FileChunk.SetContentType(ZenContentType::kBinary); return FileChunk; } if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) { IoHash ChunkHash = MetaIt->second; OplogLock.ReleaseNow(); IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash); Chunk.SetContentType(ZenContentType::kCompressedBinary); return Chunk; } return {}; } void ProjectStore::Oplog::IterateFileMap( std::function&& Fn) { RwLock::SharedLockScope _(m_OplogLock); for (const auto& Kv : m_FileMap) { Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath); } } void ProjectStore::Oplog::IterateOplog(std::function&& Handler) { RwLock::SharedLockScope _(m_OplogLock); std::vector Entries; Entries.reserve(m_LatestOpMap.size()); for (const auto& Kv : m_LatestOpMap) { const auto AddressEntry = m_OpAddressMap.find(Kv.second); ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); Entries.push_back(AddressEntry->second); } std::sort(Entries.begin(), Entries.end(), [](const OplogEntryAddress& Lhs, const OplogEntryAddress& Rhs) { return Lhs.Offset < Rhs.Offset; }); m_Storage->ReplayLog(Entries, [&](CbObject Op) { Handler(Op); }); } std::optional ProjectStore::Oplog::GetOpByKey(const Oid& Key) { RwLock::SharedLockScope _(m_OplogLock); if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end()) { const auto AddressEntry = m_OpAddressMap.find(LatestOp->second); ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); return m_Storage->GetOp(AddressEntry->second); } return {}; } std::optional ProjectStore::Oplog::GetOpByIndex(int Index) { RwLock::SharedLockScope _(m_OplogLock); if (const auto AddressEntryIt = m_OpAddressMap.find(Index); AddressEntryIt != m_OpAddressMap.end()) { return m_Storage->GetOp(AddressEntryIt->second); } return {}; } bool ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, Oid FileId, IoHash Hash, std::string_view ServerPath, std::string_view ClientPath) { if (ServerPath.empty() || ClientPath.empty()) { return false; } if (Hash != IoHash::Zero) { m_ChunkMap.insert_or_assign(FileId, Hash); } FileMapEntry Entry; Entry.ServerPath = ServerPath; Entry.ClientPath = ClientPath; m_FileMap[FileId] = std::move(Entry); return true; } void ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash) { m_ChunkMap.insert_or_assign(ChunkId, Hash); } void ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash) { m_MetaMap.insert_or_assign(ChunkId, Hash); } bool ProjectStore::Oplog::AddCacheMapping(const RwLock::ExclusiveLockScope&, std::string_view Namespace, const CacheKey& CacheKey, const Oid& ValueId, const Oid& ChunkId) { using namespace std::literals; if (Namespace.empty() || CacheKey == CacheKey::Empty || ChunkId == Oid::Zero) { return false; } const IoHash Cid = ValueId != Oid::Zero ? ResolveCacheMapping(Namespace, CacheKey, ValueId) : IoHash::Zero; m_CacheMap.insert_or_assign(ChunkId, CacheMapEntry{.Namespace = std::string(Namespace), .CacheKey = CacheKey, .ValueId = ValueId, .Cid = Cid}); return true; } IoHash ProjectStore::Oplog::ResolveCacheMapping(std::string_view Namespace, const CacheKey& CacheKey, const Oid& ValueId) { using namespace std::literals; ZenCacheValue CacheValue; if (m_CacheStore.Get(Namespace, CacheKey.Bucket, CacheKey.Hash, CacheValue)) { if (CacheValue.Value.GetContentType() == ZenContentType::kCbObject) { CbObjectView CacheRecord = CbObjectView(CacheValue.Value.GetData()); CbArrayView Values = CacheRecord["Values"sv].AsArrayView(); for (CbFieldView Value : Values) { CbObjectView ValueObj = Value.AsObjectView(); const Oid Id = ValueObj["Id"sv].AsObjectId(); if (Id == ValueId) { return ValueObj["RawHash"sv].AsHash(); } } } } return IoHash::Zero; } uint32_t ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry, UpdateType TypeOfUpdate) { ZEN_TRACE_CPU("ProjectStore::Oplog::RegisterOplogEntry"); ZEN_UNUSED(TypeOfUpdate); // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however RwLock::ExclusiveLockScope OplogLock(m_OplogLock); using namespace std::literals; // Update chunk id maps if (Core["package"sv]) { CbObjectView PkgObj = Core["package"sv].AsObjectView(); Oid PackageId = PkgObj["id"sv].AsObjectId(); IoHash PackageHash = PkgObj["data"sv].AsBinaryAttachment(); AddChunkMapping(OplogLock, PackageId, PackageHash); ZEN_DEBUG("package data {} -> {}", PackageId, PackageHash); if (PkgObj["meta"]) { CbObjectView MetaObj = PkgObj["meta"sv].AsObjectView(); } } for (CbFieldView& Entry : Core["bulkdata"sv]) { CbObjectView BulkObj = Entry.AsObjectView(); Oid BulkDataId = BulkObj["id"sv].AsObjectId(); IoHash BulkDataHash = BulkObj["data"sv].AsBinaryAttachment(); AddChunkMapping(OplogLock, BulkDataId, BulkDataHash); ZEN_DEBUG("bulk data {} -> {}", BulkDataId, BulkDataHash); } if (Core["files"sv]) { Stopwatch Timer; int32_t FileCount = 0; int32_t ChunkCount = 0; for (CbFieldView& Entry : Core["files"sv]) { CbObjectView FileObj = Entry.AsObjectView(); const Oid FileId = FileObj["id"sv].AsObjectId(); IoHash FileDataHash = FileObj["data"sv].AsBinaryAttachment(); std::string_view ServerPath = FileObj["serverpath"sv].AsString(); std::string_view ClientPath = FileObj["clientpath"sv].AsString(); if (AddFileMapping(OplogLock, FileId, FileDataHash, ServerPath, ClientPath)) { if (FileDataHash != IoHash::Zero) { ZEN_DEBUG("file data {} -> '{}'", FileId, ClientPath); ++ChunkCount; } else { ZEN_DEBUG("file mapping '{}' -> '{}'", ServerPath, ClientPath); ++FileCount; } } else { ZEN_WARN("invalid file"); } } ZEN_DEBUG("added {} file(s), {} as files and {} as chunks in {}", FileCount + ChunkCount, FileCount, ChunkCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } for (CbFieldView& Entry : Core["deriveddata"sv]) { CbObjectView RefObj = Entry.AsObjectView(); std::string_view Namespace = RefObj["namespace"sv].AsString(std::string_view("ue.ddc")); CacheKey Key = CacheKey::Create(RefObj["bucket"sv].AsString(), RefObj["key"sv].AsHash()); const Oid ValueId = RefObj["valueid"sv].AsObjectId(); const Oid ChunkId = RefObj["chunkid"sv].AsObjectId(); if (AddCacheMapping(OplogLock, Namespace, Key, ValueId, ChunkId)) { ZEN_DEBUG("derived data {} -> '{}/{}/{}'", ChunkId, Namespace, Key.Bucket, Key.Hash); } else { ZEN_WARN("invalid derived data reference"); } } for (CbFieldView& Entry : Core["meta"sv]) { CbObjectView MetaObj = Entry.AsObjectView(); const Oid MetaId = MetaObj["id"sv].AsObjectId(); auto NameString = MetaObj["name"sv].AsString(); IoHash MetaDataHash = MetaObj["data"sv].AsBinaryAttachment(); AddMetaMapping(OplogLock, MetaId, MetaDataHash); ZEN_DEBUG("meta data ({}) {} -> {}", NameString, MetaId, MetaDataHash); } m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); m_LatestOpMap[OpEntry.OpKeyAsOId()] = OpEntry.OpLsn; return OpEntry.OpLsn; } uint32_t ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) { ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); ZEN_ASSERT(m_Storage); using namespace std::literals; // Persist attachments uint64_t AttachmentBytes = 0; uint64_t NewAttachmentBytes = 0; auto Attachments = OpPackage.GetAttachments(); for (const auto& Attach : Attachments) { ZEN_ASSERT(Attach.IsCompressedBinary()); CompressedBuffer AttachmentData = Attach.AsCompressedBinary(); const uint64_t AttachmentSize = AttachmentData.GetRawSize(); CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData); if (InsertResult.New) { NewAttachmentBytes += AttachmentSize; } AttachmentBytes += AttachmentSize; } const CbObject& Core = OpPackage.GetObject(); const uint32_t EntryId = AppendNewOplogEntry(Core); ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes)); return EntryId; } uint32_t ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) { ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); ZEN_ASSERT(m_Storage); using namespace std::literals; const OplogEntry OpEntry = m_Storage->AppendOp(Core); const uint32_t EntryId = RegisterOplogEntry(Core, OpEntry, kUpdateNewEntry); return EntryId; } ////////////////////////////////////////////////////////////////////////// ProjectStore::Project::Project(ProjectStore* PrjStore, CidStore& CidStore, ZenCacheStore& CacheStore, std::filesystem::path BasePath) : m_ProjectStore(PrjStore) , m_CidStore(CidStore) , m_CacheStore(CacheStore) , m_OplogStoragePath(BasePath) { } ProjectStore::Project::~Project() { } bool ProjectStore::Project::Exists(std::filesystem::path BasePath) { return std::filesystem::exists(BasePath / "Project.zcb"); } void ProjectStore::Project::Read() { using namespace std::literals; std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv; ZEN_INFO("reading config for project '{}' from {}", Identifier, ProjectStateFilePath); BasicFile Blob; Blob.Open(ProjectStateFilePath, BasicFile::Mode::kRead); IoBuffer Obj = Blob.ReadAll(); CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); if (ValidationError == CbValidateError::None) { CbObject Cfg = LoadCompactBinaryObject(Obj); Identifier = Cfg["id"sv].AsString(); RootDir = Cfg["root"sv].AsString(); ProjectRootDir = Cfg["project"sv].AsString(); EngineRootDir = Cfg["engine"sv].AsString(); ProjectFilePath = Cfg["projectfile"sv].AsString(); } else { ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), ProjectStateFilePath); } } void ProjectStore::Project::Write() { using namespace std::literals; BinaryWriter Mem; CbObjectWriter Cfg; Cfg << "id"sv << Identifier; Cfg << "root"sv << PathToUtf8(RootDir); Cfg << "project"sv << ProjectRootDir; Cfg << "engine"sv << EngineRootDir; Cfg << "projectfile"sv << ProjectFilePath; Cfg.Save(Mem); CreateDirectories(m_OplogStoragePath); std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv; ZEN_INFO("persisting config for project '{}' to {}", Identifier, ProjectStateFilePath); BasicFile Blob; Blob.Open(ProjectStateFilePath, BasicFile::Mode::kTruncate); Blob.Write(Mem.Data(), Mem.Size(), 0); Blob.Flush(); } spdlog::logger& ProjectStore::Project::Log() { return m_ProjectStore->Log(); } std::filesystem::path ProjectStore::Project::BasePathForOplog(std::string_view OplogId) { return m_OplogStoragePath / OplogId; } ProjectStore::Oplog* ProjectStore::Project::NewOplog(std::string_view OplogId) { RwLock::ExclusiveLockScope _(m_ProjectLock); std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); try { Oplog* Log = m_Oplogs .try_emplace(std::string{OplogId}, std::make_unique(OplogId, this, m_CidStore, m_CacheStore, OplogBasePath)) .first->second.get(); return Log; } catch (std::exception&) { // In case of failure we need to ensure there's no half constructed entry around // // (This is probably already ensured by the try_emplace implementation?) m_Oplogs.erase(std::string{OplogId}); return nullptr; } } ProjectStore::Oplog* ProjectStore::Project::OpenOplog(std::string_view OplogId) { { RwLock::SharedLockScope _(m_ProjectLock); auto OplogIt = m_Oplogs.find(std::string(OplogId)); if (OplogIt != m_Oplogs.end()) { return OplogIt->second.get(); } } RwLock::ExclusiveLockScope _(m_ProjectLock); std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); if (Oplog::ExistsAt(OplogBasePath)) { // Do open of existing oplog try { Oplog* Log = m_Oplogs .try_emplace(std::string{OplogId}, std::make_unique(OplogId, this, m_CidStore, m_CacheStore, OplogBasePath)) .first->second.get(); Log->ReplayLog(); return Log; } catch (std::exception& ex) { ZEN_WARN("failed to open oplog '{}' @ '{}': {}", OplogId, OplogBasePath, ex.what()); m_Oplogs.erase(std::string{OplogId}); } } return nullptr; } void ProjectStore::Project::DeleteOplog(std::string_view OplogId) { std::filesystem::path DeletePath; { RwLock::ExclusiveLockScope _(m_ProjectLock); auto OplogIt = m_Oplogs.find(std::string(OplogId)); if (OplogIt != m_Oplogs.end()) { std::unique_ptr& Oplog = OplogIt->second; DeletePath = Oplog->PrepareForDelete(true); m_DeletedOplogs.emplace_back(std::move(Oplog)); m_Oplogs.erase(OplogIt); } } // Erase content on disk if (!DeletePath.empty()) { OplogStorage::Delete(DeletePath); } } std::vector ProjectStore::Project::ScanForOplogs() const { DirectoryContent DirContent; GetDirectoryContent(m_OplogStoragePath, DirectoryContent::IncludeDirsFlag, DirContent); std::vector Oplogs; Oplogs.reserve(DirContent.Directories.size()); for (const std::filesystem::path& DirPath : DirContent.Directories) { Oplogs.push_back(DirPath.filename().string()); } return Oplogs; } void ProjectStore::Project::IterateOplogs(std::function&& Fn) const { RwLock::SharedLockScope _(m_ProjectLock); for (auto& Kv : m_Oplogs) { Fn(*Kv.second); } } void ProjectStore::Project::IterateOplogs(std::function&& Fn) { RwLock::SharedLockScope _(m_ProjectLock); for (auto& Kv : m_Oplogs) { Fn(*Kv.second); } } void ProjectStore::Project::Flush() { // We only need to flush oplogs that we have already loaded IterateOplogs([&](Oplog& Ops) { Ops.Flush(); }); } void ProjectStore::Project::Scrub(ScrubContext& Ctx) { // Scrubbing needs to check all existing oplogs std::vector OpLogs = ScanForOplogs(); for (const std::string& OpLogId : OpLogs) { OpenOplog(OpLogId); } IterateOplogs([&](const Oplog& Ops) { Ops.Scrub(Ctx); }); } void ProjectStore::Project::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("ProjectStore::Project::GatherReferences"); Stopwatch Timer; const auto Guard = MakeGuard( [&] { ZEN_INFO("gathered references from project store project {} in {}", Identifier, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); // GatherReferences needs to check all existing oplogs std::vector OpLogs = ScanForOplogs(); for (const std::string& OpLogId : OpLogs) { OpenOplog(OpLogId); } IterateOplogs([&](Oplog& Ops) { Ops.GatherReferences(GcCtx); }); } bool ProjectStore::Project::PrepareForDelete(std::filesystem::path& OutDeletePath) { RwLock::ExclusiveLockScope _(m_ProjectLock); for (auto& It : m_Oplogs) { // We don't care about the moved folder It.second->PrepareForDelete(false); m_DeletedOplogs.emplace_back(std::move(It.second)); } m_Oplogs.clear(); bool Success = PrepareDirectoryDelete(m_OplogStoragePath, OutDeletePath); if (!Success) { return false; } m_OplogStoragePath.clear(); return true; } bool ProjectStore::Project::IsExpired() const { if (ProjectFilePath.empty()) { return false; } return !std::filesystem::exists(ProjectFilePath); } ////////////////////////////////////////////////////////////////////////// ProjectStore::ProjectStore(CidStore& CidStore, ZenCacheStore& CacheStore, std::filesystem::path BasePath, GcManager& Gc) : GcStorage(Gc) , GcContributor(Gc) , m_Log(logging::Get("project")) , m_CidStore(CidStore) , m_CacheStore(CacheStore) , m_ProjectBasePath(BasePath) { ZEN_INFO("initializing project store at '{}'", BasePath); // m_Log.set_level(spdlog::level::debug); } ProjectStore::~ProjectStore() { ZEN_INFO("closing project store ('{}')", m_ProjectBasePath); } std::filesystem::path ProjectStore::BasePathForProject(std::string_view ProjectId) { return m_ProjectBasePath / ProjectId; } void ProjectStore::DiscoverProjects() { if (!std::filesystem::exists(m_ProjectBasePath)) { return; } DirectoryContent DirContent; GetDirectoryContent(m_ProjectBasePath, DirectoryContent::IncludeDirsFlag, DirContent); for (const std::filesystem::path& DirPath : DirContent.Directories) { std::string DirName = PathToUtf8(DirPath.filename()); OpenProject(DirName); } } void ProjectStore::IterateProjects(std::function&& Fn) { RwLock::SharedLockScope _(m_ProjectsLock); for (auto& Kv : m_Projects) { Fn(*Kv.second.Get()); } } void ProjectStore::Flush() { std::vector> Projects; { RwLock::SharedLockScope _(m_ProjectsLock); Projects.reserve(m_Projects.size()); for (auto& Kv : m_Projects) { Projects.push_back(Kv.second); } } for (const Ref& Project : Projects) { Project->Flush(); } } void ProjectStore::Scrub(ScrubContext& Ctx) { DiscoverProjects(); std::vector> Projects; { RwLock::SharedLockScope _(m_ProjectsLock); Projects.reserve(m_Projects.size()); for (auto& Kv : m_Projects) { if (Kv.second->IsExpired()) { continue; } Projects.push_back(Kv.second); } } for (const Ref& Project : Projects) { Project->Scrub(Ctx); } } void ProjectStore::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("ProjectStore::GatherReferences"); size_t ProjectCount = 0; size_t ExpiredProjectCount = 0; Stopwatch Timer; const auto Guard = MakeGuard([&] { ZEN_INFO("gathered references from '{}' in {}, found {} active projects and {} expired projects", m_ProjectBasePath.string(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()), ProjectCount, ExpiredProjectCount); }); DiscoverProjects(); std::vector> Projects; { RwLock::SharedLockScope _(m_ProjectsLock); Projects.reserve(m_Projects.size()); for (auto& Kv : m_Projects) { if (Kv.second->IsExpired()) { ExpiredProjectCount++; continue; } Projects.push_back(Kv.second); } } ProjectCount = Projects.size(); for (const Ref& Project : Projects) { Project->GatherReferences(GcCtx); } } void ProjectStore::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("ProjectStore::CollectGarbage"); size_t ProjectCount = 0; size_t ExpiredProjectCount = 0; Stopwatch Timer; const auto Guard = MakeGuard([&] { ZEN_INFO("garbage collect from '{}' DONE after {}, found {} active projects and {} expired projects", m_ProjectBasePath.string(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()), ProjectCount, ExpiredProjectCount); }); std::vector> ExpiredProjects; { RwLock::SharedLockScope _(m_ProjectsLock); for (auto& Kv : m_Projects) { if (Kv.second->IsExpired()) { ExpiredProjects.push_back(Kv.second); ExpiredProjectCount++; continue; } ProjectCount++; } } if (ExpiredProjects.empty()) { ZEN_INFO("garbage collect SKIPPED, for '{}', no expired projects found", m_ProjectBasePath.string()); return; } if (!GcCtx.IsDeletionMode()) { ZEN_INFO("garbage collect DISABLED, for '{}' ", m_ProjectBasePath.string()); return; } for (const Ref& Project : ExpiredProjects) { std::filesystem::path PathToRemove; std::string ProjectId; { RwLock::ExclusiveLockScope _(m_ProjectsLock); if (!Project->IsExpired()) { ZEN_INFO("ProjectStore::CollectGarbage skipped garbage collect of project '{}'. Project no longer expired.", ProjectId); continue; } bool Success = Project->PrepareForDelete(PathToRemove); if (!Success) { ZEN_INFO("ProjectStore::CollectGarbage skipped garbage collect of project '{}'. Project folder is locked.", ProjectId); continue; } m_Projects.erase(Project->Identifier); ProjectId = Project->Identifier; } ZEN_INFO("ProjectStore::CollectGarbage garbage collected project '{}'. Removing storage on disk", ProjectId); if (PathToRemove.empty()) { continue; } DeleteDirectories(PathToRemove); } } GcStorageSize ProjectStore::StorageSize() const { return {0, 0}; } Ref ProjectStore::OpenProject(std::string_view ProjectId) { { RwLock::SharedLockScope _(m_ProjectsLock); auto ProjIt = m_Projects.find(std::string{ProjectId}); if (ProjIt != m_Projects.end()) { return ProjIt->second; } } RwLock::ExclusiveLockScope _(m_ProjectsLock); std::filesystem::path BasePath = BasePathForProject(ProjectId); if (Project::Exists(BasePath)) { try { ZEN_INFO("opening project {} @ {}", ProjectId, BasePath); Ref& Prj = m_Projects .try_emplace(std::string{ProjectId}, Ref(new ProjectStore::Project(this, m_CidStore, m_CacheStore, BasePath))) .first->second; Prj->Identifier = ProjectId; Prj->Read(); return Prj; } catch (std::exception& e) { ZEN_WARN("failed to open {} @ {} ({})", ProjectId, BasePath, e.what()); m_Projects.erase(std::string{ProjectId}); } } return nullptr; } Ref ProjectStore::NewProject(std::filesystem::path BasePath, std::string_view ProjectId, std::string_view RootDir, std::string_view EngineRootDir, std::string_view ProjectRootDir, std::string_view ProjectFilePath) { RwLock::ExclusiveLockScope _(m_ProjectsLock); Ref& Prj = m_Projects .try_emplace(std::string{ProjectId}, Ref(new ProjectStore::Project(this, m_CidStore, m_CacheStore, BasePath))) .first->second; Prj->Identifier = ProjectId; Prj->RootDir = RootDir; Prj->EngineRootDir = EngineRootDir; Prj->ProjectRootDir = ProjectRootDir; Prj->ProjectFilePath = ProjectFilePath; Prj->Write(); return Prj; } bool ProjectStore::DeleteProject(std::string_view ProjectId) { ZEN_INFO("deleting project {}", ProjectId); RwLock::ExclusiveLockScope ProjectsLock(m_ProjectsLock); auto ProjIt = m_Projects.find(std::string{ProjectId}); if (ProjIt == m_Projects.end()) { return true; } std::filesystem::path DeletePath; bool Success = ProjIt->second->PrepareForDelete(DeletePath); if (!Success) { return false; } m_Projects.erase(ProjIt); ProjectsLock.ReleaseNow(); if (!DeletePath.empty()) { DeleteDirectories(DeletePath); } return true; } bool ProjectStore::Exists(std::string_view ProjectId) { return Project::Exists(BasePathForProject(ProjectId)); } CbArray ProjectStore::GetProjectsList() { using namespace std::literals; DiscoverProjects(); CbWriter Response; Response.BeginArray(); IterateProjects([&Response](ProjectStore::Project& Prj) { Response.BeginObject(); Response << "Id"sv << Prj.Identifier; Response << "RootDir"sv << Prj.RootDir.string(); Response << "ProjectRootDir"sv << Prj.ProjectRootDir; Response << "EngineRootDir"sv << Prj.EngineRootDir; Response << "ProjectFilePath"sv << Prj.ProjectFilePath; Response.EndObject(); }); Response.EndArray(); return Response.Save().AsArray(); } HttpResponseCode ProjectStore::GetProjectFiles(const std::string_view ProjectId, const std::string_view OplogId, bool FilterClient, CbObject& OutPayload) { using namespace std::literals; Ref Project = OpenProject(ProjectId); if (!Project) { ZEN_INFO("Project file request for unknown project '{}'", ProjectId); return HttpResponseCode::NotFound; } ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); if (!FoundLog) { ZEN_INFO("Project file for unknown oplog '{}/{}'", ProjectId, OplogId); return HttpResponseCode::NotFound; } CbObjectWriter Response; Response.BeginArray("files"sv); FoundLog->IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) { Response.BeginObject(); Response << "id"sv << Id; Response << "clientpath"sv << ClientPath; if (!FilterClient) { Response << "serverpath"sv << ServerPath; } Response.EndObject(); }); Response.EndArray(); OutPayload = Response.Save(); return HttpResponseCode::OK; } HttpResponseCode ProjectStore::GetChunkInfo(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view ChunkId, CbObject& OutPayload) { using namespace std::literals; Ref Project = OpenProject(ProjectId); if (!Project) { ZEN_INFO("Chunk info request for unknown project '{}'", ProjectId); return HttpResponseCode::NotFound; } ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); if (!FoundLog) { ZEN_INFO("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId); return HttpResponseCode::NotFound; } if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) { ZEN_INFO("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId); return HttpResponseCode::BadRequest; } const Oid Obj = Oid::FromHexString(ChunkId); IoBuffer Chunk = FoundLog->FindChunk(Obj); if (!Chunk) { ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); return HttpResponseCode::NotFound; } uint64_t ChunkSize = Chunk.GetSize(); if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); ZEN_ASSERT(!Compressed.IsNull()); ChunkSize = Compressed.GetRawSize(); } CbObjectWriter Response; Response << "size"sv << ChunkSize; OutPayload = Response.Save(); return HttpResponseCode::OK; } HttpResponseCode ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view ChunkId, uint64_t Offset, uint64_t Size, ZenContentType AcceptType, IoBuffer& OutChunk) { bool IsOffset = Offset != 0 || Size != ~(0ull); Ref Project = OpenProject(ProjectId); if (!Project) { ZEN_INFO("Chunk request for unknown project '{}'", ProjectId); return HttpResponseCode::NotFound; } ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); if (!FoundLog) { ZEN_INFO("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId); return HttpResponseCode::NotFound; } if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) { ZEN_INFO("Chunk request for invalid chunk id '{}/{}/{}'", ProjectId, OplogId, ChunkId); return HttpResponseCode::BadRequest; } const Oid Obj = Oid::FromHexString(ChunkId); IoBuffer Chunk = FoundLog->FindChunk(Obj); if (!Chunk) { ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); return HttpResponseCode::NotFound; } OutChunk = Chunk; HttpContentType ContentType = Chunk.GetContentType(); if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); ZEN_ASSERT(!Compressed.IsNull()); if (IsOffset) { uint64_t RawSize = Compressed.GetRawSize(); if ((Offset + Size) > RawSize) { Size = RawSize - Offset; } if (AcceptType == HttpContentType::kBinary) { OutChunk = Compressed.Decompress(Offset, Size).AsIoBuffer(); OutChunk.SetContentType(HttpContentType::kBinary); } else { // Value will be a range of compressed blocks that covers the requested range // The client will have to compensate for any offsets that do not land on an even block size multiple OutChunk = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer(); OutChunk.SetContentType(HttpContentType::kCompressedBinary); } } else { if (AcceptType == HttpContentType::kBinary) { OutChunk = Compressed.Decompress().AsIoBuffer(); OutChunk.SetContentType(HttpContentType::kBinary); } else { OutChunk = Compressed.GetCompressed().Flatten().AsIoBuffer(); OutChunk.SetContentType(HttpContentType::kCompressedBinary); } } } else if (IsOffset) { if ((Offset + Size) > Chunk.GetSize()) { Size = Chunk.GetSize() - Offset; } OutChunk = IoBuffer(Chunk, Offset, Size); OutChunk.SetContentType(ContentType); } ZEN_DEBUG("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(ContentType)); return HttpResponseCode::OK; } HttpResponseCode ProjectStore::GetChunk(const std::string_view Cid, ZenContentType AcceptType, IoBuffer& OutChunk) { using namespace std::literals; if (Cid.length() != IoHash::StringLength) { ZEN_INFO("Chunk request for invalid chunk hash '{}'", Cid); return HttpResponseCode::BadRequest; } const IoHash Hash = IoHash::FromHexString(Cid); OutChunk = m_CidStore.FindChunkByCid(Hash); if (!OutChunk) { ZEN_DEBUG("chunk - '{}' MISSING", Cid); return HttpResponseCode::NotFound; } if (AcceptType == HttpContentType::kBinary) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(OutChunk)); OutChunk = Compressed.Decompress().AsIoBuffer(); OutChunk.SetContentType(HttpContentType::kBinary); } else { OutChunk.SetContentType(HttpContentType::kCompressedBinary); } return HttpResponseCode::OK; } ////////////////////////////////////////////////////////////////////////// HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) : m_Log(logging::Get("project")) , m_CidStore(Store) , m_ProjectStore(Projects) { using namespace std::literals; m_Router.AddPattern("project", "([[:alnum:]_.]+)"); m_Router.AddPattern("log", "([[:alnum:]_.]+)"); m_Router.AddPattern("op", "([[:digit:]]+?)"); m_Router.AddPattern("chunk", "([[:xdigit:]]{24})"); m_Router.AddPattern("hash", "([[:xdigit:]]{40})"); // This would ideally just be the response for the root /prj endpoint but this is // currently not possible for (arbitrary, external) technical reasons m_Router.RegisterRoute( "list", [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_ProjectStore->GetProjectsList()); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}/batch", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } // Parse Request IoBuffer Payload = HttpReq.ReadPayload(); BinaryReader Reader(Payload); struct RequestHeader { enum { kMagic = 0xAAAA'77AC }; uint32_t Magic; uint32_t ChunkCount; uint32_t Reserved1; uint32_t Reserved2; }; struct RequestChunkEntry { Oid ChunkId; uint32_t CorrelationId; uint64_t Offset; uint64_t RequestBytes; }; if (Payload.Size() <= sizeof(RequestHeader)) { HttpReq.WriteResponse(HttpResponseCode::BadRequest); } RequestHeader RequestHdr; Reader.Read(&RequestHdr, sizeof RequestHdr); if (RequestHdr.Magic != RequestHeader::kMagic) { HttpReq.WriteResponse(HttpResponseCode::BadRequest); } std::vector RequestedChunks; RequestedChunks.resize(RequestHdr.ChunkCount); Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount); // Make Response struct ResponseHeader { uint32_t Magic = 0xbada'b00f; uint32_t ChunkCount; uint32_t Reserved1 = 0; uint32_t Reserved2 = 0; }; struct ResponseChunkEntry { uint32_t CorrelationId; uint32_t Flags = 0; uint64_t ChunkSize; }; std::vector OutBlobs; OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry)); for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) { const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; IoBuffer FoundChunk = FoundLog->FindChunk(RequestedChunk.ChunkId); if (FoundChunk) { if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1)) { uint64_t Offset = RequestedChunk.Offset; if (Offset > FoundChunk.Size()) { Offset = FoundChunk.Size(); } uint64_t Size = RequestedChunk.RequestBytes; if ((Offset + Size) > FoundChunk.Size()) { Size = FoundChunk.Size() - Offset; } FoundChunk = IoBuffer(FoundChunk, Offset, Size); } } OutBlobs.emplace_back(std::move(FoundChunk)); } uint8_t* ResponsePtr = reinterpret_cast(OutBlobs[0].MutableData()); ResponseHeader ResponseHdr; ResponseHdr.ChunkCount = RequestHdr.ChunkCount; memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr)); ResponsePtr += sizeof(ResponseHdr); for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) { // const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]); ResponseChunkEntry ResponseChunk; ResponseChunk.CorrelationId = ChunkIndex; if (FoundChunk) { ResponseChunk.ChunkSize = FoundChunk.Size(); } else { ResponseChunk.ChunkSize = uint64_t(-1); } memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk)); ResponsePtr += sizeof(ResponseChunk); } return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs); }, HttpVerb::kPost); m_Router.RegisterRoute( "{project}/oplog/{log}/files", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); // File manifest fetch, returns the client file list const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); const bool FilterClient = Params.GetValue("filter"sv) == "client"sv; CbObject ResponsePayload; HttpResponseCode Response = m_ProjectStore->GetProjectFiles(ProjectId, OplogId, FilterClient, ResponsePayload); if (Response != HttpResponseCode::OK) { return HttpReq.WriteResponse(Response); } return HttpReq.WriteResponse(Response, ResponsePayload); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}/{chunk}/info", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); const auto& ChunkId = Req.GetCapture(3); CbObject ResponsePayload; HttpResponseCode Response = m_ProjectStore->GetChunkInfo(ProjectId, OplogId, ChunkId, ResponsePayload); if (Response != HttpResponseCode::OK) { return HttpReq.WriteResponse(Response); } HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}/{chunk}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); const auto& ChunkId = Req.GetCapture(3); uint64_t Offset = 0; uint64_t Size = ~(0ull); auto QueryParms = Req.ServerRequest().GetQueryParams(); if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false) { if (auto OffsetVal = ParseInt(OffsetParm)) { Offset = OffsetVal.value(); } else { return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } } if (auto SizeParm = QueryParms.GetValue("size"); SizeParm.empty() == false) { if (auto SizeVal = ParseInt(SizeParm)) { Size = SizeVal.value(); } else { return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } } HttpContentType AcceptType = HttpReq.AcceptContentType(); IoBuffer Chunk; HttpResponseCode Response = m_ProjectStore->GetChunk(ProjectId, OplogId, ChunkId, Offset, Size, AcceptType, Chunk); if (Response != HttpResponseCode::OK) { return HttpReq.WriteResponse(Response); } m_Log.debug("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(Chunk.GetContentType())); return HttpReq.WriteResponse(HttpResponseCode::OK, Chunk.GetContentType(), Chunk); }, HttpVerb::kGet | HttpVerb::kHead); m_Router.RegisterRoute( "{project}/oplog/{log}/{hash}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& HashString = Req.GetCapture(3); IoHash Hash = IoHash::FromHexString(HashString); HttpContentType AcceptType = HttpReq.AcceptContentType(); if (AcceptType == HttpContentType::kUnknownContentType) { AcceptType = HttpContentType::kBinary; } HttpContentType ContentType = HttpContentType::kCompressedBinary; IoBuffer Value = m_CidStore.FindChunkByCid(Hash); if (!Value) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } if (AcceptType == HttpContentType::kBinary) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Value)); Value = Compressed.Decompress().AsIoBuffer(); ContentType = HttpContentType::kBinary; } return HttpReq.WriteResponse(HttpResponseCode::OK, ContentType, Value); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}/prep", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } // This operation takes a list of referenced hashes and decides which // chunks are not present on this server. This list is then returned in // the "need" list in the response IoBuffer Payload = HttpReq.ReadPayload(); CbObject RequestObject = LoadCompactBinaryObject(Payload); std::vector NeedList; for (auto Entry : RequestObject["have"sv]) { const IoHash FileHash = Entry.AsHash(); if (!m_CidStore.ContainsChunk(FileHash)) { ZEN_DEBUG("prep - NEED: {}", FileHash); NeedList.push_back(FileHash); } } CbObjectWriter Cbo; Cbo.BeginArray("need"); for (const IoHash& Hash : NeedList) { Cbo << Hash; } Cbo.EndArray(); CbObject Response = Cbo.Save(); return HttpReq.WriteResponse(HttpResponseCode::OK, Response); }, HttpVerb::kPost); m_Router.RegisterRoute( "{project}/oplog/{log}/new", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); bool IsUsingSalt = false; IoHash SaltHash = IoHash::Zero; if (std::string_view SaltParam = Params.GetValue("salt"); SaltParam.empty() == false) { const uint32_t Salt = std::stoi(std::string(SaltParam)); SaltHash = IoHash::HashBuffer(&Salt, sizeof Salt); IsUsingSalt = true; } Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Oplog& Oplog = *FoundLog; IoBuffer Payload = HttpReq.ReadPayload(); // This will attempt to open files which may not exist for the case where // the prep step rejected the chunk. This should be fixed since there's // a performance cost associated with any file system activity bool IsValid = true; std::vector MissingChunks; CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer { if (m_CidStore.ContainsChunk(Hash)) { // Return null attachment as we already have it, no point in reading it and storing it again return {}; } IoHash AttachmentId; if (IsUsingSalt) { IoHash AttachmentSpec[]{SaltHash, Hash}; AttachmentId = IoHash::HashBuffer(MakeMemoryView(AttachmentSpec)); } else { AttachmentId = Hash; } std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString(); if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath)) { return SharedBuffer(std::move(Data)); } else { IsValid = false; MissingChunks.push_back(Hash); return {}; } }; CbPackage Package; if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver)) { std::filesystem::path BadPackagePath = Oplog.TempPath() / "bad_packages"sv / fmt::format("session{}_request{}"sv, HttpReq.SessionId(), HttpReq.RequestId()); ZEN_WARN("Received malformed package! Saving payload to '{}'", BadPackagePath); WriteFile(BadPackagePath, Payload); return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"); } if (!IsValid) { // TODO: emit diagnostics identifying missing chunks return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing chunk reference"); } CbObject Core = Package.GetObject(); if (!Core["key"sv]) { return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified"); } // Write core to oplog const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Package); if (OpLsn == ProjectStore::Oplog::kInvalidOp) { return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Payload.Size()), Core["key"sv].AsString()); HttpReq.WriteResponse(HttpResponseCode::Created); }, HttpVerb::kPost); m_Router.RegisterRoute( "{project}/oplog/{log}/{op}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const std::string& ProjectId = Req.GetCapture(1); const std::string& OplogId = Req.GetCapture(2); const std::string& OpIdString = Req.GetCapture(3); Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Oplog& Oplog = *FoundLog; if (const std::optional OpId = ParseInt(OpIdString)) { if (std::optional MaybeOp = Oplog.GetOpByIndex(OpId.value())) { CbObject& Op = MaybeOp.value(); if (Req.ServerRequest().AcceptContentType() == ZenContentType::kCbPackage) { CbPackage Package; Package.SetObject(Op); Op.IterateAttachments([&](CbFieldView FieldView) { const IoHash AttachmentHash = FieldView.AsAttachment(); IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash); // We force this for now as content type is not consistently tracked (will // be fixed in CidStore refactor) Payload.SetContentType(ZenContentType::kCompressedBinary); if (Payload) { switch (Payload.GetContentType()) { case ZenContentType::kCbObject: if (CbObject Object = LoadCompactBinaryObject(Payload)) { Package.AddAttachment(CbAttachment(Object)); } else { // Error - malformed object ZEN_WARN("malformed object returned for {}", AttachmentHash); } break; case ZenContentType::kCompressedBinary: if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload))) { Package.AddAttachment(CbAttachment(Compressed, AttachmentHash)); } else { // Error - not compressed! ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash); } break; default: Package.AddAttachment(CbAttachment(SharedBuffer(Payload))); break; } } }); return HttpReq.WriteResponse(HttpResponseCode::Accepted, Package); } else { // Client cannot accept a package, so we only send the core object return HttpReq.WriteResponse(HttpResponseCode::Accepted, Op); } } } return HttpReq.WriteResponse(HttpResponseCode::NotFound); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}/archive", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } switch (Req.ServerRequest().RequestVerb()) { case HttpVerb::kGet: { CbObjectWriter Response; Response.BeginArray("entries"sv); std::unordered_set AttachementHashes; size_t OpCount = 0; IoHashStream Hasher; FoundLog->IterateOplog([this, &Hasher, &Response, &AttachementHashes, &OpCount](CbObject Op) { SharedBuffer Buffer = Op.GetBuffer(); Hasher.Append(Buffer.GetView()); Response << Op; Op.IterateAttachments([this, &AttachementHashes, &OpCount](CbFieldView FieldView) { const IoHash AttachmentHash = FieldView.AsAttachment(); AttachementHashes.emplace(AttachmentHash); }); OpCount++; }); Response.EndArray(); IoHash Checksum = Hasher.GetHash(); Response.AddHash("checksum"sv, Checksum); ZEN_INFO("Exporting {} ops and {} chunks from '{}/{}' with checksum '{}'", OpCount, AttachementHashes.size(), ProjectId, OplogId, Checksum); CbPackage ResponsePackage; ResponsePackage.SetObject(Response.Save()); std::vector Attachments; Attachments.reserve(AttachementHashes.size()); for (const IoHash& AttachmentHash : AttachementHashes) { IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash); if (Payload) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload)); ZEN_ASSERT(Compressed); Attachments.emplace_back(CbAttachment(Compressed, AttachmentHash)); } } ResponsePackage.AddAttachments(Attachments); std::vector ResponsePayload = FormatPackageMessage(ResponsePackage, FormatFlags::kAllowLocalReferences); const ZenContentType AcceptType = HttpReq.AcceptContentType(); if (AcceptType == ZenContentType::kCompressedBinary) { std::vector Parts; Parts.reserve(ResponsePayload.size()); for (const auto& I : ResponsePayload) { Parts.emplace_back(SharedBuffer(I)); } CompositeBuffer Cmp(std::move(Parts)); CompressedBuffer CompressedResponse = CompressedBuffer::Compress(Cmp); HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCompressedBinary, CompressedResponse.GetCompressed().Flatten().AsIoBuffer()); } else if (AcceptType == ZenContentType::kCbPackage) { HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponsePayload); } else { return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } } break; case HttpVerb::kPost: { ZEN_INFO("Importing oplog '{}/{}'", ProjectId, OplogId); IoBuffer CompressedPayload = HttpReq.ReadPayload(); IoBuffer Payload = CompressedBuffer::FromCompressed(SharedBuffer(CompressedPayload)).Decompress().AsIoBuffer(); CbPackage RequestPackage = ParsePackageMessage(Payload); CbObject Request = RequestPackage.GetObject(); IoHash Checksum = Request["checksum"sv].AsHash(); std::span Attachments = RequestPackage.GetAttachments(); zen ::CbArrayView Entries = Request["entries"sv].AsArrayView(); ZEN_INFO("Importing oplog with {} ops and {} attachments with checksum '{}' to '{}/{}'", Entries.Num(), Attachments.size(), Checksum, ProjectId, OplogId); std::vector Ops; Ops.reserve(Entries.Num()); IoHashStream Hasher; for (auto& OpEntry : Entries) { CbObjectView Core = OpEntry.AsObjectView(); if (!Core["key"sv]) { return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified"); } BinaryWriter Writer; Core.CopyTo(Writer); MemoryView OpView = Writer.GetView(); Hasher.Append(OpView); IoBuffer OpBuffer(IoBuffer::Clone, OpView.GetData(), OpView.GetSize()); CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); Ops.emplace_back(Op); } IoHash CalculatedChecksum = Hasher.GetHash(); if (CalculatedChecksum != Checksum) { ZEN_WARN("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum); return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); } ZEN_INFO("Writing {} attachments for '{}/{}'", Attachments.size(), ProjectId, OplogId); WorkerThreadPool WorkerPool(Min(std::thread::hardware_concurrency(), 16u)); std::atomic_int64_t JobCount = 0; for (const CbAttachment& Attachment : Attachments) { JobCount.fetch_add(1); WorkerPool.ScheduleWork([this, &Attachment, &JobCount]() { CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary(); m_CidStore.AddChunk(AttachmentBody, CidStore::InsertMode::kCopyOnly); JobCount.fetch_add(-1); }); } while (JobCount.load()) { Sleep(1); } ZEN_INFO("Writing {} ops for '{}/{}'", Ops.size(), ProjectId, OplogId); for (const CbObject& Op : Ops) { const uint32_t OpLsn = FoundLog->AppendNewOplogEntry(Op); ZEN_DEBUG("oplog entry #{}", OpLsn); if (OpLsn == ProjectStore::Oplog::kInvalidOp) { return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Op.GetSize()), Op["key"sv].AsString()); } ZEN_INFO("Imported {} ops and {} attachments to '{}/{}'", Entries.Num(), Attachments.size(), ProjectId, OplogId); return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); } break; default: break; } }, HttpVerb::kPost | HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}", [this](HttpRouterRequest& Req) { const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("project {} not found", ProjectId)); } switch (Req.ServerRequest().RequestVerb()) { case HttpVerb::kGet: { ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId); if (!OplogIt) { return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("oplog {} not found in project {}", OplogId, ProjectId)); } ProjectStore::Oplog& Log = *OplogIt; CbObjectWriter Cb; Cb << "id"sv << Log.OplogId() << "project"sv << Project->Identifier << "tempdir"sv << Log.TempPath().c_str(); Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cb.Save()); } break; case HttpVerb::kPost: { ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId); if (!OplogIt) { if (!Project->NewOplog(OplogId)) { // TODO: indicate why the operation failed! return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError); } ZEN_INFO("established oplog '{}/{}'", ProjectId, OplogId); return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); } // I guess this should ultimately be used to execute RPCs but for now, it // does absolutely nothing return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); } break; case HttpVerb::kDelete: { ZEN_INFO("deleting oplog '{}/{}'", ProjectId, OplogId); Project->DeleteOplog(OplogId); return Req.ServerRequest().WriteResponse(HttpResponseCode::OK); } break; default: break; } }, HttpVerb::kPost | HttpVerb::kGet | HttpVerb::kDelete); m_Router.RegisterRoute( "{project}/oplog/{log}/entries", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } CbObjectWriter Response; if (FoundLog->OplogCount() > 0) { HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); if (auto OpKey = Params.GetValue("opkey"); !OpKey.empty()) { Oid OpKeyId = OpKeyStringAsOId(OpKey); std::optional Op = FoundLog->GetOpByKey(OpKeyId); if (Op.has_value()) { Response << "entry"sv << Op.value(); } else { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } } else { Response.BeginArray("entries"sv); FoundLog->IterateOplog([&Response](CbObject Op) { Response << Op; }); Response.EndArray(); } } return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}", [this](HttpRouterRequest& Req) { const std::string ProjectId = Req.GetCapture(1); switch (Req.ServerRequest().RequestVerb()) { case HttpVerb::kPost: { IoBuffer Payload = Req.ServerRequest().ReadPayload(); CbObject Params = LoadCompactBinaryObject(Payload); std::string_view Id = Params["id"sv].AsString(); std::string_view Root = Params["root"sv].AsString(); std::string_view EngineRoot = Params["engine"sv].AsString(); std::string_view ProjectRoot = Params["project"sv].AsString(); std::string_view ProjectFilePath = Params["projectfile"sv].AsString(); const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId; m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath); ZEN_INFO("established project - {} (id: '{}', roots: '{}', '{}', '{}', '{}'{})", ProjectId, Id, Root, EngineRoot, ProjectRoot, ProjectFilePath, ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : ""); Req.ServerRequest().WriteResponse(HttpResponseCode::Created); } break; case HttpVerb::kGet: { Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("project {} not found", ProjectId)); } std::vector OpLogs = Project->ScanForOplogs(); CbObjectWriter Response; Response << "id"sv << Project->Identifier; Response << "root"sv << PathToUtf8(Project->RootDir); Response << "engine"sv << PathToUtf8(Project->EngineRootDir); Response << "project"sv << PathToUtf8(Project->ProjectRootDir); Response << "projectfile"sv << PathToUtf8(Project->ProjectFilePath); Response.BeginArray("oplogs"sv); for (const std::string& OplogId : OpLogs) { Response.BeginObject(); Response << "id"sv << OplogId; Response.EndObject(); } Response.EndArray(); // oplogs Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save()); } break; case HttpVerb::kDelete: { Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("project {} not found", ProjectId)); } if (!m_ProjectStore->DeleteProject(ProjectId)) { return Req.ServerRequest().WriteResponse(HttpResponseCode::Locked, HttpContentType::kText, fmt::format("project {} is in use", ProjectId)); } return Req.ServerRequest().WriteResponse(HttpResponseCode::NoContent); } break; default: break; } }, HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kDelete); } HttpProjectService::~HttpProjectService() { } const char* HttpProjectService::BaseUri() const { return "/prj/"; } void HttpProjectService::HandleRequest(HttpServerRequest& Request) { if (m_Router.HandleRequest(Request) == false) { ZEN_WARN("No route found for {0}", Request.RelativeUri()); } } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS namespace testutils { using namespace std::literals; std::string OidAsString(const Oid& Id) { StringBuilder<25> OidStringBuilder; Id.ToString(OidStringBuilder); return OidStringBuilder.ToString(); } CbPackage CreateOplogPackage(const Oid& Id, const std::span>& Attachments) { CbPackage Package; CbObjectWriter Object; Object << "key"sv << OidAsString(Id); if (!Attachments.empty()) { Object.BeginArray("bulkdata"); for (const auto& Attachment : Attachments) { CbAttachment Attach(Attachment.second, IoHash::FromBLAKE3(Attachment.second.GetRawHash())); Object.BeginObject(); Object << "id"sv << Attachment.first; Object << "type"sv << "Standard"sv; Object << "data"sv << Attach; Object.EndObject(); Package.AddAttachment(Attach); } Object.EndArray(); } Package.SetObject(Object.Save()); return Package; }; std::vector> CreateAttachments(const std::span& Sizes) { std::vector> Result; Result.reserve(Sizes.size()); for (size_t Size : Sizes) { std::vector Data; Data.resize(Size); for (size_t Idx = 0; Idx < Size; ++Idx) { Data[Idx] = Idx % 255; } CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); Result.emplace_back(std::pair(Oid::NewOid(), Compressed)); } return Result; } uint64 GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset) { if (RawOffset > 0) { uint64 BlockSize = 0; OodleCompressor Compressor; OodleCompressionLevel CompressionLevel; if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) { return 0; } return BlockSize > 0 ? RawOffset % BlockSize : 0; } return 0; } } // namespace testutils TEST_CASE("project.store.create") { using namespace std::literals; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); ZenCacheStore CacheStore(Gc, {.BasePath = TempDir.Path() / "cache"}); std::string_view ProjectName("proj1"sv); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; ProjectStore ProjectStore(CidStore, CacheStore, BasePath, Gc); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; Ref Project(ProjectStore.NewProject(BasePath / ProjectName, ProjectName, RootDir.string(), EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); CHECK(ProjectStore.DeleteProject(ProjectName)); CHECK(!Project->Exists(BasePath)); } TEST_CASE("project.store.lifetimes") { using namespace std::literals; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); ZenCacheStore CacheStore(Gc, {.BasePath = TempDir.Path() / "cache"}); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; ProjectStore ProjectStore(CidStore, CacheStore, BasePath, Gc); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; Ref Project(ProjectStore.NewProject(BasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1"); CHECK(Oplog != nullptr); std::filesystem::path DeletePath; CHECK(Project->PrepareForDelete(DeletePath)); CHECK(!DeletePath.empty()); CHECK(Project->OpenOplog("oplog1") == nullptr); // Oplog is now invalid, but pointer can still be accessed since we store old oplog pointers CHECK(Oplog->OplogCount() == 0); // Project is still valid since we have a Ref to it CHECK(Project->Identifier == "proj1"sv); } TEST_CASE("project.store.gc") { using namespace std::literals; using namespace testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); ZenCacheStore CacheStore(Gc, {.BasePath = TempDir.Path() / "cache"}); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; ProjectStore ProjectStore(CidStore, CacheStore, BasePath, Gc); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path Project1RootDir = TempDir.Path() / "game1"; std::filesystem::path Project1FilePath = TempDir.Path() / "game1" / "game.uproject"; { CreateDirectories(Project1FilePath.parent_path()); BasicFile ProjectFile; ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); } std::filesystem::path Project2RootDir = TempDir.Path() / "game2"; std::filesystem::path Project2FilePath = TempDir.Path() / "game2" / "game.uproject"; { CreateDirectories(Project2FilePath.parent_path()); BasicFile ProjectFile; ProjectFile.Open(Project2FilePath, BasicFile::Mode::kTruncate); } { Ref Project1(ProjectStore.NewProject(BasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"); CHECK(Oplog != nullptr); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{77}))); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{7123, 583, 690, 99}))); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{55, 122}))); } { Ref Project2(ProjectStore.NewProject(BasePath / "proj2"sv, "proj2"sv, RootDir.string(), EngineRootDir.string(), Project2RootDir.string(), Project2FilePath.string())); ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog1"); CHECK(Oplog != nullptr); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{177}))); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{9123, 383, 590, 96}))); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{535, 221}))); } { GcContext GcCtx; ProjectStore.GatherReferences(GcCtx); size_t RefCount = 0; GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; }); CHECK(RefCount == 14); ProjectStore.CollectGarbage(GcCtx); CHECK(ProjectStore.OpenProject("proj1"sv)); CHECK(ProjectStore.OpenProject("proj2"sv)); } std::filesystem::remove(Project1FilePath); { GcContext GcCtx; ProjectStore.GatherReferences(GcCtx); size_t RefCount = 0; GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; }); CHECK(RefCount == 7); ProjectStore.CollectGarbage(GcCtx); CHECK(!ProjectStore.OpenProject("proj1"sv)); CHECK(ProjectStore.OpenProject("proj2"sv)); } } TEST_CASE("project.store.partial.read") { using namespace std::literals; using namespace testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); ZenCacheStore CacheStore(Gc, {.BasePath = TempDir.Path() / "cache"}); std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; ProjectStore ProjectStore(CidStore, CacheStore, BasePath, Gc); std::filesystem::path RootDir = TempDir.Path() / "root"sv; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv; std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv; { CreateDirectories(Project1FilePath.parent_path()); BasicFile ProjectFile; ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); } std::vector OpIds; OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()}); std::unordered_map>, Oid::Hasher> Attachments; { Ref Project1(ProjectStore.NewProject(BasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv); CHECK(Oplog != nullptr); Attachments[OpIds[0]] = {}; Attachments[OpIds[1]] = CreateAttachments(std::initializer_list{77}); Attachments[OpIds[2]] = CreateAttachments(std::initializer_list{7123, 9583, 690, 99}); Attachments[OpIds[3]] = CreateAttachments(std::initializer_list{55, 122}); for (auto It : Attachments) { Oplog->AppendNewOplogEntry(CreateOplogPackage(It.first, It.second)); } } { IoBuffer Chunk; CHECK(ProjectStore.GetChunk(IoHash::FromBLAKE3(Attachments[OpIds[1]][0].second.GetRawHash()).ToHexString(), HttpContentType::kCompressedBinary, Chunk) == HttpResponseCode::OK); CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); CHECK(Attachment.GetRawSize() == Attachments[OpIds[1]][0].second.GetRawSize()); } IoBuffer ChunkResult; CHECK(ProjectStore.GetChunk("proj1"sv, "oplog1"sv, OidAsString(Attachments[OpIds[2]][1].first), 0, ~0ull, HttpContentType::kCompressedBinary, ChunkResult) == HttpResponseCode::OK); CHECK(ChunkResult); CHECK(CompressedBuffer::FromCompressed(SharedBuffer(ChunkResult)).GetRawSize() == Attachments[OpIds[2]][1].second.GetRawSize()); IoBuffer PartialChunkResult; CHECK(ProjectStore.GetChunk("proj1"sv, "oplog1"sv, OidAsString(Attachments[OpIds[2]][1].first), 5, 1773, HttpContentType::kCompressedBinary, PartialChunkResult) == HttpResponseCode::OK); CHECK(PartialChunkResult); CompressedBuffer PartialCompressedResult = CompressedBuffer::FromCompressed(SharedBuffer(PartialChunkResult)); CHECK(PartialCompressedResult.GetRawSize() >= 1773); uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5); SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed); SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress(); const uint8_t* FullDataPtr = &(reinterpret_cast(FullDecompressed.GetView().GetData())[5]); const uint8_t* PartialDataPtr = reinterpret_cast(PartialDecompressed.GetView().GetData()); CHECK(FullDataPtr[0] == PartialDataPtr[0]); } #endif void prj_forcelink() { } } // namespace zen