// Copyright Epic Games, Inc. All Rights Reserved. #include "projectstore.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "fileremoteprojectstore.h" #include "jupiterremoteprojectstore.h" #include "remoteprojectstore.h" #include "zenremoteprojectstore.h" ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include # include #endif // ZEN_WITH_TESTS namespace zen { namespace { bool PrepareDirectoryDelete(const std::filesystem::path& Dir, std::filesystem::path& OutDeleteDir) { int DropIndex = 0; do { if (!std::filesystem::exists(Dir)) { return true; } std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; if (std::filesystem::exists(DroppedBucketPath)) { DropIndex++; continue; } std::error_code Ec; std::filesystem::rename(Dir, DroppedBucketPath, Ec); if (!Ec) { OutDeleteDir = DroppedBucketPath; return true; } if (Ec && !std::filesystem::exists(DroppedBucketPath)) { // We can't move our folder, probably because it is busy, bail.. return false; } Sleep(100); } while (true); } std::pair, std::string> CreateRemoteStore(CbObjectView Params, AuthMgr& AuthManager, size_t MaxBlockSize, size_t MaxChunkEmbedSize) { using namespace std::literals; std::unique_ptr RemoteStore; if (CbObjectView File = Params["file"sv].AsObjectView(); File) { std::filesystem::path FolderPath(File["path"sv].AsString()); if (FolderPath.empty()) { return {nullptr, "Missing file path"}; } std::string_view Name(File["name"sv].AsString()); if (Name.empty()) { return {nullptr, "Missing file name"}; } bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false); bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false); FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, FolderPath, std::string(Name), ForceDisableBlocks, ForceEnableTempBlocks}; RemoteStore = CreateFileRemoteStore(Options); } if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud) { std::string_view CloudServiceUrl = Cloud["url"sv].AsString(); if (CloudServiceUrl.empty()) { return {nullptr, "Missing service url"}; } std::string Url = cpr::util::urlDecode(std::string(CloudServiceUrl)); std::string_view Namespace = Cloud["namespace"sv].AsString(); if (Namespace.empty()) { return {nullptr, "Missing namespace"}; } std::string_view Bucket = Cloud["bucket"sv].AsString(); if (Bucket.empty()) { return {nullptr, "Missing bucket"}; } std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString(); std::string AccessToken = std::string(Cloud["access-token"sv].AsString()); if (AccessToken.empty()) { std::string_view AccessTokenEnvVariable = Cloud["access-token-env"].AsString(); if (!AccessTokenEnvVariable.empty()) { AccessToken = GetEnvVariable(AccessTokenEnvVariable); } } std::string_view KeyParam = Cloud["key"sv].AsString(); if (KeyParam.empty()) { return {nullptr, "Missing key"}; } if (KeyParam.length() != IoHash::StringLength) { return {nullptr, "Invalid key"}; } IoHash Key = IoHash::FromHexString(KeyParam); if (Key == IoHash::Zero) { return {nullptr, "Invalid key string"}; } bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false); bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false); JupiterRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, Url, std::string(Namespace), std::string(Bucket), Key, std::string(OpenIdProvider), AccessToken, AuthManager, ForceDisableBlocks, ForceDisableTempBlocks}; RemoteStore = CreateJupiterRemoteStore(Options); } if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen) { std::string_view Url = Zen["url"sv].AsString(); std::string_view Project = Zen["project"sv].AsString(); if (Project.empty()) { return {nullptr, "Missing project"}; } std::string_view Oplog = Zen["oplog"sv].AsString(); if (Oplog.empty()) { return {nullptr, "Missing oplog"}; } ZenRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, std::string(Url), std::string(Project), std::string(Oplog)}; RemoteStore = CreateZenRemoteStore(Options); } if (!RemoteStore) { return {nullptr, "Unknown remote store type"}; } return {std::move(RemoteStore), ""}; } std::pair ConvertResult(const RemoteProjectStore::Result& Result) { if (Result.ErrorCode == 0) { return {HttpResponseCode::OK, Result.Text}; } return {static_cast(Result.ErrorCode), Result.Reason.empty() ? Result.Text : Result.Text.empty() ? Result.Reason : fmt::format("{}. Reason: '{}'", Result.Text, Result.Reason)}; } } // namespace ////////////////////////////////////////////////////////////////////////// struct ProjectStore::OplogStorage : public RefCounted { OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath) { } ~OplogStorage() { ZEN_INFO("closing oplog storage at {}", m_OplogStoragePath); Flush(); } [[nodiscard]] bool Exists() const { return Exists(m_OplogStoragePath); } [[nodiscard]] static bool Exists(const std::filesystem::path& BasePath) { return std::filesystem::exists(BasePath / "ops.zlog") && std::filesystem::exists(BasePath / "ops.zops"); } static bool Delete(const std::filesystem::path& BasePath) { return DeleteDirectories(BasePath); } uint64_t OpBlobsSize() const { return OpBlobsSize(m_OplogStoragePath); } static uint64_t OpBlobsSize(const std::filesystem::path& BasePath) { using namespace std::literals; if (Exists(BasePath)) { return std::filesystem::file_size(BasePath / "ops.zlog"sv) + std::filesystem::file_size(BasePath / "ops.zops"sv); } return 0; } void Open(bool IsCreate) { using namespace std::literals; ZEN_INFO("initializing oplog storage at '{}'", m_OplogStoragePath); if (IsCreate) { DeleteDirectories(m_OplogStoragePath); CreateDirectories(m_OplogStoragePath); } m_Oplog.Open(m_OplogStoragePath / "ops.zlog"sv, IsCreate ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); m_Oplog.Initialize(); m_OpBlobs.Open(m_OplogStoragePath / "ops.zops"sv, IsCreate ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); ZEN_ASSERT(IsPow2(m_OpsAlign)); ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1))); } void ReplayLog(std::function&& Handler) { ZEN_TRACE_CPU("ProjectStore::OplogStorage::ReplayLog"); // This could use memory mapping or do something clever but for now it just reads the file sequentially ZEN_INFO("replaying log for '{}'", m_OplogStoragePath); Stopwatch Timer; uint64_t InvalidEntries = 0; IoBuffer OpBuffer; m_Oplog.Replay( [&](const OplogEntry& LogEntry) { if (LogEntry.OpCoreSize == 0) { ++InvalidEntries; return; } if (OpBuffer.GetSize() < LogEntry.OpCoreSize) { OpBuffer = IoBuffer(LogEntry.OpCoreSize); } const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); // Verify checksum, ignore op data if incorrect const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), LogEntry.OpCoreSize) & 0xffffFFFF); if (OpCoreHash != LogEntry.OpCoreHash) { ZEN_WARN("skipping oplog entry with bad checksum!"); return; } CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), LogEntry.OpCoreSize)); m_NextOpsOffset = Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn); Handler(Op, LogEntry); }, 0); if (InvalidEntries) { ZEN_WARN("ignored {} zero-sized oplog entries", InvalidEntries); } ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()), m_MaxLsn, m_NextOpsOffset); } void ReplayLog(const std::span Entries, std::function&& Handler) { for (const OplogEntryAddress& Entry : Entries) { CbObject Op = GetOp(Entry); Handler(Op); } } CbObject GetOp(const OplogEntryAddress& Entry) { IoBuffer OpBuffer(Entry.Size); const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign; m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset); return CbObject(SharedBuffer(std::move(OpBuffer))); } OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, XXH3_128 KeyHash) { ZEN_TRACE_CPU("ProjectStore::OplogStorage::AppendOp"); using namespace std::literals; uint64_t WriteSize = Buffer.GetSize(); RwLock::ExclusiveLockScope Lock(m_RwLock); const uint64_t WriteOffset = m_NextOpsOffset; const uint32_t OpLsn = ++m_MaxLsn; m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign); Lock.ReleaseNow(); ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign)); OplogEntry Entry = {.OpLsn = OpLsn, .OpCoreOffset = gsl::narrow_cast(WriteOffset / m_OpsAlign), .OpCoreSize = uint32_t(Buffer.GetSize()), .OpCoreHash = OpCoreHash, .OpKeyHash = KeyHash}; m_Oplog.Append(Entry); m_OpBlobs.Write(Buffer.GetData(), WriteSize, WriteOffset); return Entry; } void Flush() { m_Oplog.Flush(); m_OpBlobs.Flush(); } spdlog::logger& Log() { return m_OwnerOplog->Log(); } private: ProjectStore::Oplog* m_OwnerOplog; std::filesystem::path m_OplogStoragePath; mutable RwLock m_RwLock; TCasLogFile m_Oplog; BasicFile m_OpBlobs; std::atomic m_NextOpsOffset{0}; uint64_t m_OpsAlign = 32; std::atomic m_MaxLsn{0}; }; ////////////////////////////////////////////////////////////////////////// ProjectStore::Oplog::Oplog(std::string_view Id, Project* Project, CidStore& Store, std::filesystem::path BasePath, const std::filesystem::path& MarkerPath) : m_OuterProject(Project) , m_CidStore(Store) , m_BasePath(BasePath) , m_MarkerPath(MarkerPath) , m_OplogId(Id) { using namespace std::literals; m_Storage = new OplogStorage(this, m_BasePath); const bool StoreExists = m_Storage->Exists(); m_Storage->Open(/* IsCreate */ !StoreExists); m_TempPath = m_BasePath / "temp"sv; CleanDirectory(m_TempPath); } ProjectStore::Oplog::~Oplog() { if (m_Storage) { Flush(); } } void ProjectStore::Oplog::Flush() { ZEN_ASSERT(m_Storage); m_Storage->Flush(); } void ProjectStore::Oplog::ScrubStorage(ScrubContext& Ctx) const { ZEN_UNUSED(Ctx); } void ProjectStore::Oplog::GatherReferences(GcContext& GcCtx) { RwLock::SharedLockScope _(m_OplogLock); std::vector Hashes; Hashes.reserve(Max(m_ChunkMap.size(), m_MetaMap.size())); for (const auto& Kv : m_ChunkMap) { Hashes.push_back(Kv.second); } GcCtx.AddRetainedCids(Hashes); Hashes.clear(); for (const auto& Kv : m_MetaMap) { Hashes.push_back(Kv.second); } GcCtx.AddRetainedCids(Hashes); } uint64_t ProjectStore::Oplog::TotalSize(const std::filesystem::path& BasePath) { using namespace std::literals; uint64_t Size = OplogStorage::OpBlobsSize(BasePath); std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; if (std::filesystem::exists(StateFilePath)) { Size += std::filesystem::file_size(StateFilePath); } return Size; } uint64_t ProjectStore::Oplog::TotalSize() const { return TotalSize(m_BasePath); } std::filesystem::path ProjectStore::Oplog::PrepareForDelete(bool MoveFolder) { RwLock::ExclusiveLockScope _(m_OplogLock); m_ChunkMap.clear(); m_MetaMap.clear(); m_FileMap.clear(); m_OpAddressMap.clear(); m_LatestOpMap.clear(); m_Storage = {}; if (!MoveFolder) { return {}; } std::filesystem::path MovedDir; if (PrepareDirectoryDelete(m_BasePath, MovedDir)) { return MovedDir; } return {}; } bool ProjectStore::Oplog::ExistsAt(const std::filesystem::path& BasePath) { using namespace std::literals; std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; return std::filesystem::is_regular_file(StateFilePath); } void ProjectStore::Oplog::Read() { using namespace std::literals; std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; if (std::filesystem::is_regular_file(StateFilePath)) { ZEN_INFO("reading config for oplog '{}' in project '{}' from {}", m_OplogId, m_OuterProject->Identifier, StateFilePath); BasicFile Blob; Blob.Open(StateFilePath, BasicFile::Mode::kRead); IoBuffer Obj = Blob.ReadAll(); CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); if (ValidationError != CbValidateError::None) { ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), StateFilePath); return; } CbObject Cfg = LoadCompactBinaryObject(Obj); m_MarkerPath = Cfg["gcpath"sv].AsString(); } else { ZEN_INFO("config for oplog '{}' in project '{}' not found at {}. Assuming legacy store", m_OplogId, m_OuterProject->Identifier, StateFilePath); } ReplayLog(); } void ProjectStore::Oplog::Write() { using namespace std::literals; BinaryWriter Mem; CbObjectWriter Cfg; Cfg << "gcpath"sv << PathToUtf8(m_MarkerPath); Cfg.Save(Mem); std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; ZEN_INFO("persisting config for oplog '{}' in project '{}' to {}", m_OplogId, m_OuterProject->Identifier, StateFilePath); BasicFile Blob; Blob.Open(StateFilePath, BasicFile::Mode::kTruncate); Blob.Write(Mem.Data(), Mem.Size(), 0); Blob.Flush(); } void ProjectStore::Oplog::ReplayLog() { RwLock::ExclusiveLockScope OplogLock(m_OplogLock); if (!m_Storage) { return; } m_Storage->ReplayLog( [&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, GetMapping(Op), OpEntry, kUpdateReplay); }); } IoBuffer ProjectStore::Oplog::FindChunk(Oid ChunkId) { RwLock::SharedLockScope OplogLock(m_OplogLock); if (!m_Storage) { return IoBuffer{}; } if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) { IoHash ChunkHash = ChunkIt->second; OplogLock.ReleaseNow(); IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash); Chunk.SetContentType(ZenContentType::kCompressedBinary); return Chunk; } if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) { std::filesystem::path FilePath = m_OuterProject->RootDir / FileIt->second.ServerPath; OplogLock.ReleaseNow(); IoBuffer FileChunk = IoBufferBuilder::MakeFromFile(FilePath); FileChunk.SetContentType(ZenContentType::kBinary); return FileChunk; } if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) { IoHash ChunkHash = MetaIt->second; OplogLock.ReleaseNow(); IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash); Chunk.SetContentType(ZenContentType::kCompressedBinary); return Chunk; } return {}; } void ProjectStore::Oplog::IterateFileMap( std::function&& Fn) { RwLock::SharedLockScope _(m_OplogLock); if (!m_Storage) { return; } for (const auto& Kv : m_FileMap) { Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath); } } void ProjectStore::Oplog::IterateOplog(std::function&& Handler) { RwLock::SharedLockScope _(m_OplogLock); if (!m_Storage) { return; } std::vector Entries; Entries.reserve(m_LatestOpMap.size()); for (const auto& Kv : m_LatestOpMap) { const auto AddressEntry = m_OpAddressMap.find(Kv.second); ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); Entries.push_back(AddressEntry->second); } std::sort(Entries.begin(), Entries.end(), [](const OplogEntryAddress& Lhs, const OplogEntryAddress& Rhs) { return Lhs.Offset < Rhs.Offset; }); m_Storage->ReplayLog(Entries, [&](CbObject Op) { Handler(Op); }); } void ProjectStore::Oplog::IterateOplogWithKey(std::function&& Handler) { RwLock::SharedLockScope _(m_OplogLock); if (!m_Storage) { return; } std::vector EntryIndexes; std::vector Entries; std::vector Keys; std::vector LSNs; Entries.reserve(m_LatestOpMap.size()); EntryIndexes.reserve(m_LatestOpMap.size()); Keys.reserve(m_LatestOpMap.size()); LSNs.reserve(m_LatestOpMap.size()); for (const auto& Kv : m_LatestOpMap) { const auto AddressEntry = m_OpAddressMap.find(Kv.second); ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); Entries.push_back(AddressEntry->second); Keys.push_back(Kv.first); LSNs.push_back(Kv.second); EntryIndexes.push_back(EntryIndexes.size()); } std::sort(EntryIndexes.begin(), EntryIndexes.end(), [&Entries](const size_t& Lhs, const size_t& Rhs) { const OplogEntryAddress& LhsEntry = Entries[Lhs]; const OplogEntryAddress& RhsEntry = Entries[Rhs]; return LhsEntry.Offset < RhsEntry.Offset; }); std::vector SortedEntries; SortedEntries.reserve(EntryIndexes.size()); for (size_t Index : EntryIndexes) { SortedEntries.push_back(Entries[Index]); } size_t EntryIndex = 0; m_Storage->ReplayLog(SortedEntries, [&](CbObject Op) { Handler(LSNs[EntryIndex], Keys[EntryIndex], Op); EntryIndex++; }); } int ProjectStore::Oplog::GetOpIndexByKey(const Oid& Key) { RwLock::SharedLockScope _(m_OplogLock); if (!m_Storage) { return {}; } if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end()) { return LatestOp->second; } return -1; } std::optional ProjectStore::Oplog::GetOpByKey(const Oid& Key) { RwLock::SharedLockScope _(m_OplogLock); if (!m_Storage) { return {}; } if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end()) { const auto AddressEntry = m_OpAddressMap.find(LatestOp->second); ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); return m_Storage->GetOp(AddressEntry->second); } return {}; } std::optional ProjectStore::Oplog::GetOpByIndex(int Index) { RwLock::SharedLockScope _(m_OplogLock); if (!m_Storage) { return {}; } if (const auto AddressEntryIt = m_OpAddressMap.find(Index); AddressEntryIt != m_OpAddressMap.end()) { return m_Storage->GetOp(AddressEntryIt->second); } return {}; } void ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, Oid FileId, IoHash Hash, std::string_view ServerPath, std::string_view ClientPath) { if (Hash != IoHash::Zero) { m_ChunkMap.insert_or_assign(FileId, Hash); } FileMapEntry Entry; Entry.ServerPath = ServerPath; Entry.ClientPath = ClientPath; m_FileMap[FileId] = std::move(Entry); if (Hash != IoHash::Zero) { m_ChunkMap.insert_or_assign(FileId, Hash); } } void ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash) { m_ChunkMap.insert_or_assign(ChunkId, Hash); } void ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash) { m_MetaMap.insert_or_assign(ChunkId, Hash); } ProjectStore::Oplog::OplogEntryMapping ProjectStore::Oplog::GetMapping(CbObject Core) { using namespace std::literals; OplogEntryMapping Result; // Update chunk id maps CbObjectView PackageObj = Core["package"sv].AsObjectView(); CbArrayView BulkDataArray = Core["bulkdata"sv].AsArrayView(); CbArrayView PackageDataArray = Core["packagedata"sv].AsArrayView(); Result.Chunks.reserve(PackageObj ? 1 : 0 + BulkDataArray.Num() + PackageDataArray.Num()); if (PackageObj) { Oid Id = PackageObj["id"sv].AsObjectId(); IoHash Hash = PackageObj["data"sv].AsBinaryAttachment(); Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); ZEN_DEBUG("package data {} -> {}", Id, Hash); } for (CbFieldView& Entry : PackageDataArray) { CbObjectView PackageDataObj = Entry.AsObjectView(); Oid Id = PackageDataObj["id"sv].AsObjectId(); IoHash Hash = PackageDataObj["data"sv].AsBinaryAttachment(); Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); ZEN_DEBUG("package {} -> {}", Id, Hash); } for (CbFieldView& Entry : BulkDataArray) { CbObjectView BulkObj = Entry.AsObjectView(); Oid Id = BulkObj["id"sv].AsObjectId(); IoHash Hash = BulkObj["data"sv].AsBinaryAttachment(); Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); ZEN_DEBUG("bulkdata {} -> {}", Id, Hash); } CbArrayView FilesArray = Core["files"sv].AsArrayView(); Result.Files.reserve(FilesArray.Num()); for (CbFieldView& Entry : FilesArray) { CbObjectView FileObj = Entry.AsObjectView(); std::string_view ServerPath = FileObj["serverpath"sv].AsString(); std::string_view ClientPath = FileObj["clientpath"sv].AsString(); if (ServerPath.empty() || ClientPath.empty()) { ZEN_WARN("invalid file"); continue; } Oid Id = FileObj["id"sv].AsObjectId(); IoHash Hash = FileObj["data"sv].AsBinaryAttachment(); Result.Files.emplace_back( OplogEntryMapping::FileMapping{OplogEntryMapping::Mapping{Id, Hash}, std::string(ServerPath), std::string(ClientPath)}); ZEN_DEBUG("file {} -> {}, ServerPath: {}, ClientPath: {}", Id, Hash, ServerPath, ClientPath); } CbArrayView MetaArray = Core["meta"sv].AsArrayView(); Result.Meta.reserve(MetaArray.Num()); for (CbFieldView& Entry : MetaArray) { CbObjectView MetaObj = Entry.AsObjectView(); Oid Id = MetaObj["id"sv].AsObjectId(); IoHash Hash = MetaObj["data"sv].AsBinaryAttachment(); Result.Meta.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); auto NameString = MetaObj["name"sv].AsString(); ZEN_DEBUG("meta data ({}) {} -> {}", NameString, Id, Hash); } return Result; } uint32_t ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, const OplogEntryMapping& OpMapping, const OplogEntry& OpEntry, UpdateType TypeOfUpdate) { ZEN_TRACE_CPU("ProjectStore::Oplog::RegisterOplogEntry"); ZEN_UNUSED(TypeOfUpdate); // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however using namespace std::literals; // Update chunk id maps for (const OplogEntryMapping::Mapping& Chunk : OpMapping.Chunks) { AddChunkMapping(OplogLock, Chunk.Id, Chunk.Hash); } for (const OplogEntryMapping::FileMapping& File : OpMapping.Files) { AddFileMapping(OplogLock, File.Id, File.Hash, File.ServerPath, File.ClientPath); } for (const OplogEntryMapping::Mapping& Meta : OpMapping.Meta) { AddMetaMapping(OplogLock, Meta.Id, Meta.Hash); } m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); m_LatestOpMap[OpEntry.OpKeyAsOId()] = OpEntry.OpLsn; return OpEntry.OpLsn; } uint32_t ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) { ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); const CbObject& Core = OpPackage.GetObject(); const uint32_t EntryId = AppendNewOplogEntry(Core); if (EntryId == 0xffffffffu) { // The oplog has been deleted so just drop this return EntryId; } // Persist attachments after oplog entry so GC won't find attachments without references uint64_t AttachmentBytes = 0; uint64_t NewAttachmentBytes = 0; auto Attachments = OpPackage.GetAttachments(); for (const auto& Attach : Attachments) { ZEN_ASSERT(Attach.IsCompressedBinary()); CompressedBuffer AttachmentData = Attach.AsCompressedBinary(); const uint64_t AttachmentSize = AttachmentData.DecodeRawSize(); CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData.GetCompressed().Flatten().AsIoBuffer(), Attach.GetHash()); if (InsertResult.New) { NewAttachmentBytes += AttachmentSize; } AttachmentBytes += AttachmentSize; } ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes)); return EntryId; } uint32_t ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) { ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); using namespace std::literals; OplogEntryMapping Mapping = GetMapping(Core); SharedBuffer Buffer = Core.GetBuffer(); const uint64_t WriteSize = Buffer.GetSize(); const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF); ZEN_ASSERT(WriteSize != 0); XXH3_128Stream KeyHasher; Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); XXH3_128 KeyHash = KeyHasher.GetHash(); RefPtr Storage; { RwLock::SharedLockScope _(m_OplogLock); Storage = m_Storage; } if (!m_Storage) { return 0xffffffffu; } const OplogEntry OpEntry = m_Storage->AppendOp(Buffer, OpCoreHash, KeyHash); RwLock::ExclusiveLockScope OplogLock(m_OplogLock); const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry, kUpdateNewEntry); return EntryId; } ////////////////////////////////////////////////////////////////////////// ProjectStore::Project::Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath) : m_ProjectStore(PrjStore) , m_CidStore(Store) , m_OplogStoragePath(BasePath) , m_LastAccessTimes({std::make_pair(std::string(), GcClock::TickCount())}) { } ProjectStore::Project::~Project() { // Only write access times if we have not been explicitly deleted if (!m_OplogStoragePath.empty()) { WriteAccessTimes(); } } bool ProjectStore::Project::Exists(const std::filesystem::path& BasePath) { return std::filesystem::exists(BasePath / "Project.zcb"); } void ProjectStore::Project::Read() { ZEN_TRACE_CPU("ProjectStore::Project::Read"); using namespace std::literals; std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv; ZEN_INFO("reading config for project '{}' from {}", Identifier, ProjectStateFilePath); BasicFile Blob; Blob.Open(ProjectStateFilePath, BasicFile::Mode::kRead); IoBuffer Obj = Blob.ReadAll(); CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); if (ValidationError == CbValidateError::None) { CbObject Cfg = LoadCompactBinaryObject(Obj); Identifier = Cfg["id"sv].AsString(); RootDir = Cfg["root"sv].AsString(); ProjectRootDir = Cfg["project"sv].AsString(); EngineRootDir = Cfg["engine"sv].AsString(); ProjectFilePath = Cfg["projectfile"sv].AsString(); } else { ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), ProjectStateFilePath); } ReadAccessTimes(); } void ProjectStore::Project::Write() { using namespace std::literals; BinaryWriter Mem; CbObjectWriter Cfg; Cfg << "id"sv << Identifier; Cfg << "root"sv << PathToUtf8(RootDir); Cfg << "project"sv << ProjectRootDir; Cfg << "engine"sv << EngineRootDir; Cfg << "projectfile"sv << ProjectFilePath; Cfg.Save(Mem); CreateDirectories(m_OplogStoragePath); std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv; ZEN_INFO("persisting config for project '{}' to {}", Identifier, ProjectStateFilePath); BasicFile Blob; Blob.Open(ProjectStateFilePath, BasicFile::Mode::kTruncate); Blob.Write(Mem.Data(), Mem.Size(), 0); Blob.Flush(); } void ProjectStore::Project::ReadAccessTimes() { using namespace std::literals; RwLock::SharedLockScope _(m_ProjectLock); std::filesystem::path ProjectAccessTimesFilePath = m_OplogStoragePath / "AccessTimes.zcb"sv; if (!std::filesystem::exists(ProjectAccessTimesFilePath)) { return; } ZEN_INFO("reading access times for project '{}' from {}", Identifier, ProjectAccessTimesFilePath); BasicFile Blob; Blob.Open(ProjectAccessTimesFilePath, BasicFile::Mode::kRead); IoBuffer Obj = Blob.ReadAll(); CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); if (ValidationError == CbValidateError::None) { CbObject Reader = LoadCompactBinaryObject(Obj); CbArrayView LastAccessTimes = Reader["lastaccess"sv].AsArrayView(); for (CbFieldView& Entry : LastAccessTimes) { CbObjectView AccessTime = Entry.AsObjectView(); std::string_view Id = AccessTime["id"sv].AsString(); GcClock::Tick AccessTick = AccessTime["tick"sv].AsUInt64(); m_LastAccessTimes.insert_or_assign(std::string(Id), AccessTick); } } else { ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), ProjectAccessTimesFilePath); } } void ProjectStore::Project::WriteAccessTimes() { using namespace std::literals; RwLock::ExclusiveLockScope _(m_ProjectLock); BinaryWriter Mem; CbObjectWriter Writer; Writer.BeginArray("lastaccess"); { for (const auto& It : m_LastAccessTimes) { Writer.BeginObject(); { Writer << "id"sv << It.first; Writer << "tick"sv << gsl::narrow(It.second); } Writer.EndObject(); } } Writer.EndArray(); Writer.Save(Mem); try { CreateDirectories(m_OplogStoragePath); std::filesystem::path ProjectAccessTimesFilePath = m_OplogStoragePath / "AccessTimes.zcb"sv; ZEN_INFO("persisting access times for project '{}' to {}", Identifier, ProjectAccessTimesFilePath); BasicFile Blob; Blob.Open(ProjectAccessTimesFilePath, BasicFile::Mode::kTruncate); Blob.Write(Mem.Data(), Mem.Size(), 0); Blob.Flush(); } catch (std::exception& Err) { ZEN_WARN("writing access times FAILED, reason: '{}'", Err.what()); } } spdlog::logger& ProjectStore::Project::Log() { return m_ProjectStore->Log(); } std::filesystem::path ProjectStore::Project::BasePathForOplog(std::string_view OplogId) { return m_OplogStoragePath / OplogId; } ProjectStore::Oplog* ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath) { RwLock::ExclusiveLockScope _(m_ProjectLock); std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); try { Oplog* Log = m_Oplogs .try_emplace(std::string{OplogId}, std::make_unique(OplogId, this, m_CidStore, OplogBasePath, MarkerPath)) .first->second.get(); Log->Write(); return Log; } catch (std::exception&) { // In case of failure we need to ensure there's no half constructed entry around // // (This is probably already ensured by the try_emplace implementation?) m_Oplogs.erase(std::string{OplogId}); return nullptr; } } ProjectStore::Oplog* ProjectStore::Project::OpenOplog(std::string_view OplogId) { { RwLock::SharedLockScope _(m_ProjectLock); auto OplogIt = m_Oplogs.find(std::string(OplogId)); if (OplogIt != m_Oplogs.end()) { return OplogIt->second.get(); } } RwLock::ExclusiveLockScope _(m_ProjectLock); std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); if (Oplog::ExistsAt(OplogBasePath)) { // Do open of existing oplog try { Oplog* Log = m_Oplogs .try_emplace(std::string{OplogId}, std::make_unique(OplogId, this, m_CidStore, OplogBasePath, std::filesystem::path{})) .first->second.get(); Log->Read(); return Log; } catch (std::exception& ex) { ZEN_WARN("failed to open oplog '{}' @ '{}': {}", OplogId, OplogBasePath, ex.what()); m_Oplogs.erase(std::string{OplogId}); } } return nullptr; } void ProjectStore::Project::DeleteOplog(std::string_view OplogId) { std::filesystem::path DeletePath; { RwLock::ExclusiveLockScope _(m_ProjectLock); if (auto OplogIt = m_Oplogs.find(std::string(OplogId)); OplogIt == m_Oplogs.end()) { std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); if (Oplog::ExistsAt(OplogBasePath)) { DeletePath = OplogBasePath; } } else { std::unique_ptr& Oplog = OplogIt->second; DeletePath = Oplog->PrepareForDelete(true); m_DeletedOplogs.emplace_back(std::move(Oplog)); m_Oplogs.erase(OplogIt); } m_LastAccessTimes.erase(std::string(OplogId)); } // Erase content on disk if (!DeletePath.empty()) { OplogStorage::Delete(DeletePath); } } std::vector ProjectStore::Project::ScanForOplogs() const { DirectoryContent DirContent; GetDirectoryContent(m_OplogStoragePath, DirectoryContent::IncludeDirsFlag, DirContent); std::vector Oplogs; Oplogs.reserve(DirContent.Directories.size()); for (const std::filesystem::path& DirPath : DirContent.Directories) { Oplogs.push_back(DirPath.filename().string()); } return Oplogs; } void ProjectStore::Project::IterateOplogs(std::function&& Fn) const { RwLock::SharedLockScope _(m_ProjectLock); for (auto& Kv : m_Oplogs) { Fn(*Kv.second); } } void ProjectStore::Project::IterateOplogs(std::function&& Fn) { RwLock::SharedLockScope _(m_ProjectLock); for (auto& Kv : m_Oplogs) { Fn(*Kv.second); } } void ProjectStore::Project::Flush() { // We only need to flush oplogs that we have already loaded IterateOplogs([&](Oplog& Ops) { Ops.Flush(); }); WriteAccessTimes(); } void ProjectStore::Project::ScrubStorage(ScrubContext& Ctx) { // Scrubbing needs to check all existing oplogs std::vector OpLogs = ScanForOplogs(); for (const std::string& OpLogId : OpLogs) { OpenOplog(OpLogId); } IterateOplogs([&](const Oplog& Ops) { if (!IsExpired(GcClock::TimePoint::max(), Ops)) { Ops.ScrubStorage(Ctx); } }); } void ProjectStore::Project::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("ProjectStore::Project::GatherReferences"); Stopwatch Timer; const auto Guard = MakeGuard([&] { ZEN_DEBUG("gathered references from project store project {} in {}", Identifier, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); // GatherReferences needs to check all existing oplogs std::vector OpLogs = ScanForOplogs(); for (const std::string& OpLogId : OpLogs) { OpenOplog(OpLogId); } { // Make sure any oplog at least have a last access time so they eventually will be GC:d if not touched RwLock::ExclusiveLockScope _(m_ProjectLock); for (const std::string& OpId : OpLogs) { if (auto It = m_LastAccessTimes.find(OpId); It == m_LastAccessTimes.end()) { m_LastAccessTimes[OpId] = GcClock::TickCount(); } } } IterateOplogs([&](Oplog& Ops) { if (!IsExpired(GcCtx.ProjectStoreExpireTime(), Ops)) { Ops.GatherReferences(GcCtx); } }); } uint64_t ProjectStore::Project::TotalSize() const { uint64_t Result = 0; { std::vector OpLogs = ScanForOplogs(); for (const std::string& OpLogId : OpLogs) { std::filesystem::path OplogBasePath = m_OplogStoragePath / OpLogId; Result += Oplog::TotalSize(OplogBasePath); } } return Result; } bool ProjectStore::Project::PrepareForDelete(std::filesystem::path& OutDeletePath) { RwLock::ExclusiveLockScope _(m_ProjectLock); for (auto& It : m_Oplogs) { // We don't care about the moved folder It.second->PrepareForDelete(false); m_DeletedOplogs.emplace_back(std::move(It.second)); } m_Oplogs.clear(); bool Success = PrepareDirectoryDelete(m_OplogStoragePath, OutDeletePath); if (!Success) { return false; } m_OplogStoragePath.clear(); return true; } bool ProjectStore::Project::IsExpired(const std::string& EntryName, const std::filesystem::path& MarkerPath, const GcClock::TimePoint ExpireTime) const { if (!MarkerPath.empty()) { if (std::filesystem::exists(MarkerPath)) { return false; } } const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); RwLock::SharedLockScope _(m_ProjectLock); if (auto It = m_LastAccessTimes.find(EntryName); It != m_LastAccessTimes.end()) { if (It->second <= ExpireTicks) { return true; } } return false; } bool ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime) const { return IsExpired(std::string(), ProjectFilePath, ExpireTime); } bool ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog) const { return IsExpired(Oplog.OplogId(), Oplog.MarkerPath(), ExpireTime); } void ProjectStore::Project::TouchProject() const { RwLock::ExclusiveLockScope _(m_ProjectLock); m_LastAccessTimes.insert_or_assign(std::string(), GcClock::TickCount()); }; void ProjectStore::Project::TouchOplog(std::string_view Oplog) const { ZEN_ASSERT(!Oplog.empty()); RwLock::ExclusiveLockScope _(m_ProjectLock); m_LastAccessTimes.insert_or_assign(std::string(Oplog), GcClock::TickCount()); }; ////////////////////////////////////////////////////////////////////////// ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc) : GcStorage(Gc) , GcContributor(Gc) , m_Log(logging::Get("project")) , m_CidStore(Store) , m_ProjectBasePath(BasePath) , m_DiskWriteBlocker(Gc.GetDiskWriteBlocker()) { ZEN_INFO("initializing project store at '{}'", BasePath); // m_Log.set_level(spdlog::level::debug); } ProjectStore::~ProjectStore() { ZEN_INFO("closing project store ('{}')", m_ProjectBasePath); } std::filesystem::path ProjectStore::BasePathForProject(std::string_view ProjectId) { return m_ProjectBasePath / ProjectId; } void ProjectStore::DiscoverProjects() { if (!std::filesystem::exists(m_ProjectBasePath)) { return; } DirectoryContent DirContent; GetDirectoryContent(m_ProjectBasePath, DirectoryContent::IncludeDirsFlag, DirContent); for (const std::filesystem::path& DirPath : DirContent.Directories) { std::string DirName = PathToUtf8(DirPath.filename()); OpenProject(DirName); } } void ProjectStore::IterateProjects(std::function&& Fn) { RwLock::SharedLockScope _(m_ProjectsLock); for (auto& Kv : m_Projects) { Fn(*Kv.second.Get()); } } void ProjectStore::Flush() { std::vector> Projects; { RwLock::SharedLockScope _(m_ProjectsLock); Projects.reserve(m_Projects.size()); for (auto& Kv : m_Projects) { Projects.push_back(Kv.second); } } for (const Ref& Project : Projects) { Project->Flush(); } } void ProjectStore::ScrubStorage(ScrubContext& Ctx) { ZEN_INFO("scrubbing '{}'", m_ProjectBasePath); DiscoverProjects(); std::vector> Projects; { RwLock::SharedLockScope _(m_ProjectsLock); Projects.reserve(m_Projects.size()); for (auto& Kv : m_Projects) { if (Kv.second->IsExpired(GcClock::TimePoint::max())) { continue; } Projects.push_back(Kv.second); } } for (const Ref& Project : Projects) { Project->ScrubStorage(Ctx); } } void ProjectStore::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("ProjectStore::GatherReferences"); size_t ProjectCount = 0; size_t ExpiredProjectCount = 0; Stopwatch Timer; const auto Guard = MakeGuard([&] { ZEN_DEBUG("gathered references from '{}' in {}, found {} active projects and {} expired projects", m_ProjectBasePath.string(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()), ProjectCount, ExpiredProjectCount); }); DiscoverProjects(); std::vector> Projects; { RwLock::SharedLockScope _(m_ProjectsLock); Projects.reserve(m_Projects.size()); for (auto& Kv : m_Projects) { if (Kv.second->IsExpired(GcCtx.ProjectStoreExpireTime())) { ExpiredProjectCount++; continue; } Projects.push_back(Kv.second); } } ProjectCount = Projects.size(); for (const Ref& Project : Projects) { Project->GatherReferences(GcCtx); } } void ProjectStore::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("ProjectStore::CollectGarbage"); size_t ProjectCount = 0; size_t ExpiredProjectCount = 0; Stopwatch Timer; const auto Guard = MakeGuard([&] { ZEN_DEBUG("garbage collect from '{}' DONE after {}, found {} active projects and {} expired projects", m_ProjectBasePath.string(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()), ProjectCount, ExpiredProjectCount); }); std::vector> ExpiredProjects; std::vector> Projects; { RwLock::SharedLockScope _(m_ProjectsLock); for (auto& Kv : m_Projects) { if (Kv.second->IsExpired(GcCtx.ProjectStoreExpireTime())) { ExpiredProjects.push_back(Kv.second); ExpiredProjectCount++; continue; } Projects.push_back(Kv.second); ProjectCount++; } } if (!GcCtx.IsDeletionMode()) { ZEN_DEBUG("garbage collect DISABLED, for '{}' ", m_ProjectBasePath.string()); return; } for (const Ref& Project : Projects) { std::vector ExpiredOplogs; { RwLock::ExclusiveLockScope _(m_ProjectsLock); Project->IterateOplogs([&GcCtx, &Project, &ExpiredOplogs](ProjectStore::Oplog& Oplog) { if (Project->IsExpired(GcCtx.ProjectStoreExpireTime(), Oplog)) { ExpiredOplogs.push_back(Oplog.OplogId()); } }); } for (const std::string& OplogId : ExpiredOplogs) { ZEN_DEBUG("ProjectStore::CollectGarbage garbage collected oplog '{}' in project '{}'. Removing storage on disk", OplogId, Project->Identifier); Project->DeleteOplog(OplogId); } Project->Flush(); } if (ExpiredProjects.empty()) { ZEN_DEBUG("garbage collect for '{}', no expired projects found", m_ProjectBasePath.string()); return; } for (const Ref& Project : ExpiredProjects) { std::filesystem::path PathToRemove; std::string ProjectId; { RwLock::ExclusiveLockScope _(m_ProjectsLock); if (!Project->IsExpired(GcCtx.ProjectStoreExpireTime())) { ZEN_DEBUG("ProjectStore::CollectGarbage skipped garbage collect of project '{}'. Project no longer expired.", ProjectId); continue; } bool Success = Project->PrepareForDelete(PathToRemove); if (!Success) { ZEN_DEBUG("ProjectStore::CollectGarbage skipped garbage collect of project '{}'. Project folder is locked.", ProjectId); continue; } m_Projects.erase(Project->Identifier); ProjectId = Project->Identifier; } ZEN_DEBUG("ProjectStore::CollectGarbage garbage collected project '{}'. Removing storage on disk", ProjectId); if (PathToRemove.empty()) { continue; } DeleteDirectories(PathToRemove); } } GcStorageSize ProjectStore::StorageSize() const { ZEN_TRACE_CPU("ProjectStore::StorageSize"); using namespace std::literals; GcStorageSize Result; { if (std::filesystem::exists(m_ProjectBasePath)) { DirectoryContent ProjectsFolderContent; GetDirectoryContent(m_ProjectBasePath, DirectoryContent::IncludeDirsFlag, ProjectsFolderContent); for (const std::filesystem::path& ProjectBasePath : ProjectsFolderContent.Directories) { std::filesystem::path ProjectStateFilePath = ProjectBasePath / "Project.zcb"sv; if (std::filesystem::exists(ProjectStateFilePath)) { Result.DiskSize += std::filesystem::file_size(ProjectStateFilePath); DirectoryContent DirContent; GetDirectoryContent(ProjectBasePath, DirectoryContent::IncludeDirsFlag, DirContent); for (const std::filesystem::path& OplogBasePath : DirContent.Directories) { Result.DiskSize += Oplog::TotalSize(OplogBasePath); } } } } } return Result; } Ref ProjectStore::OpenProject(std::string_view ProjectId) { ZEN_TRACE_CPU("ProjectStore::OpenProject"); { RwLock::SharedLockScope _(m_ProjectsLock); auto ProjIt = m_Projects.find(std::string{ProjectId}); if (ProjIt != m_Projects.end()) { return ProjIt->second; } } RwLock::ExclusiveLockScope _(m_ProjectsLock); std::filesystem::path BasePath = BasePathForProject(ProjectId); if (Project::Exists(BasePath)) { try { ZEN_INFO("opening project {} @ {}", ProjectId, BasePath); Ref& Prj = m_Projects .try_emplace(std::string{ProjectId}, Ref(new ProjectStore::Project(this, m_CidStore, BasePath))) .first->second; Prj->Identifier = ProjectId; Prj->Read(); return Prj; } catch (std::exception& e) { ZEN_WARN("failed to open {} @ {} ({})", ProjectId, BasePath, e.what()); m_Projects.erase(std::string{ProjectId}); } } return {}; } Ref ProjectStore::NewProject(const std::filesystem::path& BasePath, std::string_view ProjectId, std::string_view RootDir, std::string_view EngineRootDir, std::string_view ProjectRootDir, std::string_view ProjectFilePath) { ZEN_TRACE_CPU("ProjectStore::NewProject"); RwLock::ExclusiveLockScope _(m_ProjectsLock); Ref& Prj = m_Projects.try_emplace(std::string{ProjectId}, Ref(new ProjectStore::Project(this, m_CidStore, BasePath))) .first->second; Prj->Identifier = ProjectId; Prj->RootDir = RootDir; Prj->EngineRootDir = EngineRootDir; Prj->ProjectRootDir = ProjectRootDir; Prj->ProjectFilePath = ProjectFilePath; Prj->Write(); return Prj; } bool ProjectStore::DeleteProject(std::string_view ProjectId) { ZEN_TRACE_CPU("ProjectStore::DeleteProject"); ZEN_INFO("deleting project {}", ProjectId); RwLock::ExclusiveLockScope ProjectsLock(m_ProjectsLock); auto ProjIt = m_Projects.find(std::string{ProjectId}); if (ProjIt == m_Projects.end()) { return true; } std::filesystem::path DeletePath; bool Success = ProjIt->second->PrepareForDelete(DeletePath); if (!Success) { return false; } m_Projects.erase(ProjIt); ProjectsLock.ReleaseNow(); if (!DeletePath.empty()) { DeleteDirectories(DeletePath); } return true; } bool ProjectStore::Exists(std::string_view ProjectId) { return Project::Exists(BasePathForProject(ProjectId)); } CbArray ProjectStore::GetProjectsList() { ZEN_TRACE_CPU("ProjectStore::GetProjectsList"); using namespace std::literals; DiscoverProjects(); CbWriter Response; Response.BeginArray(); IterateProjects([&Response](ProjectStore::Project& Prj) { Response.BeginObject(); Response << "Id"sv << Prj.Identifier; Response << "RootDir"sv << Prj.RootDir.string(); Response << "ProjectRootDir"sv << Prj.ProjectRootDir; Response << "EngineRootDir"sv << Prj.EngineRootDir; Response << "ProjectFilePath"sv << Prj.ProjectFilePath; Response.EndObject(); }); Response.EndArray(); return Response.Save().AsArray(); } std::pair ProjectStore::GetProjectFiles(const std::string_view ProjectId, const std::string_view OplogId, bool FilterClient, CbObject& OutPayload) { ZEN_TRACE_CPU("ProjectStore::GetProjectFiles"); using namespace std::literals; Ref Project = OpenProject(ProjectId); if (!Project) { return {HttpResponseCode::NotFound, fmt::format("Project files request for unknown project '{}'", ProjectId)}; } Project->TouchProject(); ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); if (!FoundLog) { return {HttpResponseCode::NotFound, fmt::format("Project files for unknown oplog '{}/{}'", ProjectId, OplogId)}; } Project->TouchOplog(OplogId); CbObjectWriter Response; Response.BeginArray("files"sv); FoundLog->IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) { Response.BeginObject(); Response << "id"sv << Id; Response << "clientpath"sv << ClientPath; if (!FilterClient) { Response << "serverpath"sv << ServerPath; } Response.EndObject(); }); Response.EndArray(); OutPayload = Response.Save(); return {HttpResponseCode::OK, {}}; } std::pair ProjectStore::GetChunkInfo(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view ChunkId, CbObject& OutPayload) { using namespace std::literals; Ref Project = OpenProject(ProjectId); if (!Project) { return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown project '{}'", ProjectId)}; } Project->TouchProject(); ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); if (!FoundLog) { return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)}; } Project->TouchOplog(OplogId); if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) { return {HttpResponseCode::BadRequest, fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)}; } const Oid Obj = Oid::FromHexString(ChunkId); IoBuffer Chunk = FoundLog->FindChunk(Obj); if (!Chunk) { return {HttpResponseCode::NotFound, {}}; } uint64_t ChunkSize = Chunk.GetSize(); if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) { IoHash RawHash; uint64_t RawSize; bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize); ZEN_ASSERT(IsCompressed); ChunkSize = RawSize; } CbObjectWriter Response; Response << "size"sv << ChunkSize; OutPayload = Response.Save(); return {HttpResponseCode::OK, {}}; } std::pair ProjectStore::GetChunkRange(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view ChunkId, uint64_t Offset, uint64_t Size, ZenContentType AcceptType, IoBuffer& OutChunk) { bool IsOffset = Offset != 0 || Size != ~(0ull); Ref Project = OpenProject(ProjectId); if (!Project) { return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)}; } Project->TouchProject(); ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); if (!FoundLog) { return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; } Project->TouchOplog(OplogId); if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) { return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)}; } const Oid Obj = Oid::FromHexString(ChunkId); IoBuffer Chunk = FoundLog->FindChunk(Obj); if (!Chunk) { return {HttpResponseCode::NotFound, {}}; } OutChunk = Chunk; HttpContentType ContentType = Chunk.GetContentType(); if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) { IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize); ZEN_ASSERT(!Compressed.IsNull()); if (IsOffset) { if ((Offset + Size) > RawSize) { Size = RawSize - Offset; } if (AcceptType == HttpContentType::kBinary) { OutChunk = Compressed.Decompress(Offset, Size).AsIoBuffer(); OutChunk.SetContentType(HttpContentType::kBinary); } else { // Value will be a range of compressed blocks that covers the requested range // The client will have to compensate for any offsets that do not land on an even block size multiple OutChunk = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer(); OutChunk.SetContentType(HttpContentType::kCompressedBinary); } } else { if (AcceptType == HttpContentType::kBinary) { OutChunk = Compressed.Decompress().AsIoBuffer(); OutChunk.SetContentType(HttpContentType::kBinary); } else { OutChunk = Compressed.GetCompressed().Flatten().AsIoBuffer(); OutChunk.SetContentType(HttpContentType::kCompressedBinary); } } } else if (IsOffset) { if ((Offset + Size) > Chunk.GetSize()) { Size = Chunk.GetSize() - Offset; } OutChunk = IoBuffer(std::move(Chunk), Offset, Size); OutChunk.SetContentType(ContentType); } return {HttpResponseCode::OK, {}}; } std::pair ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view Cid, ZenContentType AcceptType, IoBuffer& OutChunk) { Ref Project = OpenProject(ProjectId); if (!Project) { return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)}; } Project->TouchProject(); ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); if (!FoundLog) { return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; } Project->TouchOplog(OplogId); if (Cid.length() != IoHash::StringLength) { return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, Cid)}; } const IoHash Hash = IoHash::FromHexString(Cid); OutChunk = m_CidStore.FindChunkByCid(Hash); if (!OutChunk) { return {HttpResponseCode::NotFound, fmt::format("chunk - '{}' MISSING", Cid)}; } if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(OutChunk)); OutChunk = Compressed.Decompress().AsIoBuffer(); OutChunk.SetContentType(ZenContentType::kBinary); } else { OutChunk.SetContentType(ZenContentType::kCompressedBinary); } return {HttpResponseCode::OK, {}}; } std::pair ProjectStore::PutChunk(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view Cid, ZenContentType ContentType, IoBuffer&& Chunk) { Ref Project = OpenProject(ProjectId); if (!Project) { return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown project '{}'", ProjectId)}; } Project->TouchProject(); ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); if (!FoundLog) { return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown oplog '{}/{}'", ProjectId, OplogId)}; } Project->TouchOplog(OplogId); if (Cid.length() != IoHash::StringLength) { return {HttpResponseCode::BadRequest, fmt::format("Chunk put request for invalid chunk hash '{}'", Cid)}; } const IoHash Hash = IoHash::FromHexString(Cid); if (ContentType != HttpContentType::kCompressedBinary) { return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid content type for chunk '{}'", Cid)}; } IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); if (RawHash != Hash) { return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid payload format for chunk '{}'", Cid)}; } CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk, Hash); return {Result.New ? HttpResponseCode::Created : HttpResponseCode::OK, {}}; } std::pair ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload, CbObject& OutResponse) { ZEN_TRACE_CPU("ProjectStore::WriteOplog"); Ref Project = OpenProject(ProjectId); if (!Project) { return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown project '{}'", ProjectId)}; } Project->TouchProject(); ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); if (!Oplog) { return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)}; } Project->TouchOplog(OplogId); CbObject ContainerObject = LoadCompactBinaryObject(Payload); if (!ContainerObject) { return {HttpResponseCode::BadRequest, "Invalid payload format"}; } CidStore& ChunkStore = m_CidStore; RwLock AttachmentsLock; std::unordered_set Attachments; auto HasAttachment = [&ChunkStore](const IoHash& RawHash) { return ChunkStore.ContainsChunk(RawHash); }; auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector&& ChunkHashes) { RwLock::ExclusiveLockScope _(AttachmentsLock); if (BlockHash != IoHash::Zero) { Attachments.insert(BlockHash); } else { Attachments.insert(ChunkHashes.begin(), ChunkHashes.end()); } }; auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) { RwLock::ExclusiveLockScope _(AttachmentsLock); Attachments.insert(RawHash); }; RemoteProjectStore::Result RemoteResult = SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); if (RemoteResult.ErrorCode) { return ConvertResult(RemoteResult); } CbObjectWriter Cbo; Cbo.BeginArray("need"); { for (const IoHash& Hash : Attachments) { ZEN_DEBUG("Need attachment {}", Hash); Cbo << Hash; } } Cbo.EndArray(); // "need" OutResponse = Cbo.Save(); return {HttpResponseCode::OK, {}}; } std::pair ProjectStore::ReadOplog(const std::string_view ProjectId, const std::string_view OplogId, const HttpServerRequest::QueryParams& Params, CbObject& OutResponse) { ZEN_TRACE_CPU("ProjectStore::ReadOplog"); Ref Project = OpenProject(ProjectId); if (!Project) { return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown project '{}'", ProjectId)}; } Project->TouchProject(); ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); if (!Oplog) { return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)}; } Project->TouchOplog(OplogId); size_t MaxBlockSize = 128u * 1024u * 1024u; if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false) { if (auto Value = ParseInt(Param)) { MaxBlockSize = Value.value(); } } size_t MaxChunkEmbedSize = 1024u * 1024u; if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false) { if (auto Value = ParseInt(Param)) { MaxChunkEmbedSize = Value.value(); } } CidStore& ChunkStore = m_CidStore; RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer( ChunkStore, *Oplog, MaxBlockSize, MaxChunkEmbedSize, false, [](CompressedBuffer&&, const IoHash) {}, [](const IoHash&) {}, [](const std::unordered_set) {}); OutResponse = std::move(ContainerResult.ContainerObject); return ConvertResult(ContainerResult); } std::pair ProjectStore::WriteBlock(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload) { ZEN_TRACE_CPU("ProjectStore::WriteBlock"); Ref Project = OpenProject(ProjectId); if (!Project) { return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown project '{}'", ProjectId)}; } Project->TouchProject(); ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); if (!Oplog) { return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown oplog '{}/{}'", ProjectId, OplogId)}; } Project->TouchOplog(OplogId); if (!IterateBlock(std::move(Payload), [this](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { IoBuffer Compressed = Chunk.GetCompressed().Flatten().AsIoBuffer(); m_CidStore.AddChunk(Compressed, AttachmentRawHash); ZEN_DEBUG("Saved attachment {} from block, size {}", AttachmentRawHash, Compressed.GetSize()); })) { return {HttpResponseCode::BadRequest, "Invalid chunk in block"}; } return {HttpResponseCode::OK, {}}; } void ProjectStore::Rpc(HttpServerRequest& HttpReq, const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload, AuthMgr& AuthManager) { ZEN_TRACE_CPU("ProjectStore::Rpc"); using namespace std::literals; HttpContentType PayloadContentType = HttpReq.RequestContentType(); CbPackage Package; CbObject Cb; switch (PayloadContentType) { case HttpContentType::kJSON: case HttpContentType::kUnknownContentType: case HttpContentType::kText: { std::string JsonText(reinterpret_cast(Payload.GetData()), Payload.GetSize()); Cb = LoadCompactBinaryFromJson(JsonText).AsObject(); if (!Cb) { return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content format not supported, expected JSON format"); } } break; case HttpContentType::kCbObject: Cb = LoadCompactBinaryObject(Payload); if (!Cb) { return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content format not supported, expected compact binary format"); } break; case HttpContentType::kCbPackage: Package = ParsePackageMessage(Payload); Cb = Package.GetObject(); if (!Cb) { return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content format not supported, expected package message format"); } break; default: return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type"); } Ref Project = OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Rpc oplog request for unknown project '{}'", ProjectId)); } Project->TouchProject(); ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); if (!Oplog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)); } Project->TouchOplog(OplogId); std::string_view Method = Cb["method"sv].AsString(); if (Method == "import"sv) { if (!AreDiskWritesAllowed()) { return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); } std::pair Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); if (Result.second.empty()) { return HttpReq.WriteResponse(Result.first); } return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); } else if (Method == "export"sv) { std::pair Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); if (Result.second.empty()) { return HttpReq.WriteResponse(Result.first); } return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); } else if (Method == "getchunks"sv) { CbPackage ResponsePackage; { CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); CbObjectWriter ResponseWriter; ResponseWriter.BeginArray("chunks"sv); for (CbFieldView FieldView : ChunksArray) { IoHash RawHash = FieldView.AsHash(); IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash); if (ChunkBuffer) { ResponseWriter.AddHash(RawHash); ResponsePackage.AddAttachment( CbAttachment(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)), RawHash)); } } ResponseWriter.EndArray(); ResponsePackage.SetObject(ResponseWriter.Save()); } CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault); return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } else if (Method == "putchunks"sv) { if (!AreDiskWritesAllowed()) { return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); } std::span Attachments = Package.GetAttachments(); for (const CbAttachment& Attachment : Attachments) { IoHash RawHash = Attachment.GetHash(); CompressedBuffer Compressed = Attachment.AsCompressedBinary(); m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), RawHash, CidStore::InsertMode::kCopyOnly); } return HttpReq.WriteResponse(HttpResponseCode::OK); } else if (Method == "snapshot"sv) { if (!AreDiskWritesAllowed()) { return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); } // Snapshot all referenced files. This brings the content of all // files into the CID store int OpCount = 0; uint64_t InlinedBytes = 0; uint64_t InlinedFiles = 0; uint64_t TotalBytes = 0; uint64_t TotalFiles = 0; std::vector NewOps; Oplog->IterateOplog([&](CbObject Op) { bool OpRewritten = false; bool AllOk = true; CbWriter Cbo; Cbo.BeginArray("files"sv); for (CbFieldView& Field : Op["files"sv]) { bool CopyField = true; if (CbObjectView View = Field.AsObjectView()) { const IoHash DataHash = View["data"sv].AsHash(); if (DataHash == IoHash::Zero) { std::string_view ServerPath = View["serverpath"sv].AsString(); std::filesystem::path FilePath = Project->RootDir / ServerPath; BasicFile DataFile; std::error_code Ec; DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec); if (Ec) { // Error... ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message()); AllOk = false; } else { // Read file contents into memory, compress and store in CidStore IoBuffer FileIoBuffer = DataFile.ReadAll(); CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(FileIoBuffer)); const IoHash RawHash = Compressed.DecodeRawHash(); const uint64_t RawSize = Compressed.DecodeRawSize(); IoBuffer CompressedBuffer = Compressed.GetCompressed().Flatten().AsIoBuffer(); CidStore::InsertResult Result = m_CidStore.AddChunk(CompressedBuffer, RawHash); TotalBytes += RawSize; ++TotalFiles; if (Result.New) { InlinedBytes += RawSize; ++InlinedFiles; } // Rewrite file array entry with new data reference CbObject RewrittenOp = RewriteCbObject(View, [&](CbObjectWriter& Writer, CbFieldView Field) -> bool { if (Field.GetName() == "data"sv) { Writer.AddBinaryAttachment("data"sv, RawHash); return true; } return false; }); Cbo.AddObject(std::move(RewrittenOp)); CopyField = false; } } } if (CopyField) { Cbo.AddField(Field); } else { OpRewritten = true; } } if (OpRewritten && AllOk) { Cbo.EndArray(); CbArray FilesArray = Cbo.Save().AsArray(); CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { if (Field.GetName() == "files"sv) { NewWriter.AddArray("files"sv, FilesArray); return true; } return false; }); NewOps.push_back(std::move(RewrittenOp)); } OpCount++; }); CbObjectWriter ResponseObj; // Persist rewritten oplog entries if (!NewOps.empty()) { ResponseObj.BeginArray("rewritten_ops"); for (CbObject& NewOp : NewOps) { uint32_t NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp)); ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn); ResponseObj.AddInteger(NewLsn); } ResponseObj.EndArray(); } ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles; ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles; ZEN_INFO("rewrote {} oplog entries (out of {})", NewOps.size(), OpCount); return HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save()); } return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method)); } std::pair ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) { ZEN_TRACE_CPU("ProjectStore::Export"); using namespace std::literals; size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u); size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); bool Force = Params["force"sv].AsBool(false); std::pair, std::string> RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize); if (RemoteStoreResult.first == nullptr) { return {HttpResponseCode::BadRequest, RemoteStoreResult.second}; } std::unique_ptr RemoteStore = std::move(RemoteStoreResult.first); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); ZEN_INFO("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description, NiceBytes(MaxBlockSize), NiceBytes(MaxChunkEmbedSize)); RemoteProjectStore::Result Result = SaveOplog(m_CidStore, *RemoteStore, Oplog, MaxBlockSize, MaxChunkEmbedSize, StoreInfo.CreateBlocks, StoreInfo.UseTempBlockFiles, Force); return ConvertResult(Result); } std::pair ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) { ZEN_TRACE_CPU("ProjectStore::Import"); using namespace std::literals; size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u); size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); bool Force = Params["force"sv].AsBool(false); std::pair, std::string> RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize); if (RemoteStoreResult.first == nullptr) { return {HttpResponseCode::BadRequest, RemoteStoreResult.second}; } std::unique_ptr RemoteStore = std::move(RemoteStoreResult.first); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *RemoteStore, Oplog, Force); return ConvertResult(Result); } bool ProjectStore::AreDiskWritesAllowed() const { return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS namespace testutils { using namespace std::literals; std::string OidAsString(const Oid& Id) { StringBuilder<25> OidStringBuilder; Id.ToString(OidStringBuilder); return OidStringBuilder.ToString(); } CbPackage CreateOplogPackage(const Oid& Id, const std::span>& Attachments) { CbPackage Package; CbObjectWriter Object; Object << "key"sv << OidAsString(Id); if (!Attachments.empty()) { Object.BeginArray("bulkdata"); for (const auto& Attachment : Attachments) { CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); Object.BeginObject(); Object << "id"sv << Attachment.first; Object << "type"sv << "Standard"sv; Object << "data"sv << Attach; Object.EndObject(); Package.AddAttachment(Attach); } Object.EndArray(); } Package.SetObject(Object.Save()); return Package; }; std::vector> CreateAttachments(const std::span& Sizes) { std::vector> Result; Result.reserve(Sizes.size()); for (size_t Size : Sizes) { std::vector Data; Data.resize(Size); uint16_t* DataPtr = reinterpret_cast(Data.data()); for (size_t Idx = 0; Idx < Size / 2; ++Idx) { DataPtr[Idx] = static_cast(Idx % 0xffffu); } if (Size & 1) { Data[Size - 1] = static_cast((Size - 1) & 0xff); } CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); Result.emplace_back(std::pair(Oid::NewOid(), Compressed)); } return Result; } uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset) { if (RawOffset > 0) { uint64 BlockSize = 0; OodleCompressor Compressor; OodleCompressionLevel CompressionLevel; if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) { return 0; } return BlockSize > 0 ? RawOffset % BlockSize : 0; } return 0; } } // namespace testutils TEST_CASE("project.store.create") { using namespace std::literals; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::string_view ProjectName("proj1"sv); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; ProjectStore ProjectStore(CidStore, BasePath, Gc); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; Ref Project(ProjectStore.NewProject(BasePath / ProjectName, ProjectName, RootDir.string(), EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); CHECK(ProjectStore.DeleteProject(ProjectName)); CHECK(!Project->Exists(BasePath)); } TEST_CASE("project.store.lifetimes") { using namespace std::literals; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; ProjectStore ProjectStore(CidStore, BasePath, Gc); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; Ref Project(ProjectStore.NewProject(BasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {}); CHECK(Oplog != nullptr); std::filesystem::path DeletePath; CHECK(Project->PrepareForDelete(DeletePath)); CHECK(!DeletePath.empty()); CHECK(Project->OpenOplog("oplog1") == nullptr); // Oplog is now invalid, but pointer can still be accessed since we store old oplog pointers CHECK(Oplog->OplogCount() == 0); // Project is still valid since we have a Ref to it CHECK(Project->Identifier == "proj1"sv); } TEST_CASE("project.store.gc") { using namespace std::literals; using namespace testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; ProjectStore ProjectStore(CidStore, BasePath, Gc); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path Project1RootDir = TempDir.Path() / "game1"; std::filesystem::path Project1FilePath = TempDir.Path() / "game1" / "game.uproject"; { CreateDirectories(Project1FilePath.parent_path()); BasicFile ProjectFile; ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); } std::filesystem::path Project1OplogPath = TempDir.Path() / "game1" / "saves" / "cooked" / ".projectstore"; { CreateDirectories(Project1OplogPath.parent_path()); BasicFile OplogFile; OplogFile.Open(Project1OplogPath, BasicFile::Mode::kTruncate); } std::filesystem::path Project2RootDir = TempDir.Path() / "game2"; std::filesystem::path Project2FilePath = TempDir.Path() / "game2" / "game.uproject"; { CreateDirectories(Project2FilePath.parent_path()); BasicFile ProjectFile; ProjectFile.Open(Project2FilePath, BasicFile::Mode::kTruncate); } std::filesystem::path Project2OplogPath = TempDir.Path() / "game1" / "saves" / "cooked" / ".projectstore"; { CreateDirectories(Project2OplogPath.parent_path()); BasicFile OplogFile; OplogFile.Open(Project2OplogPath, BasicFile::Mode::kTruncate); } { Ref Project1(ProjectStore.NewProject(BasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1", Project1OplogPath); CHECK(Oplog != nullptr); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{77}))); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{7123, 583, 690, 99}))); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{55, 122}))); } { Ref Project2(ProjectStore.NewProject(BasePath / "proj2"sv, "proj2"sv, RootDir.string(), EngineRootDir.string(), Project2RootDir.string(), Project2FilePath.string())); ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog2", Project2OplogPath); CHECK(Oplog != nullptr); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{177}))); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{9123, 383, 590, 96}))); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{535, 221}))); } { GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); ProjectStore.GatherReferences(GcCtx); size_t RefCount = 0; GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; }); CHECK(RefCount == 14); ProjectStore.CollectGarbage(GcCtx); CHECK(ProjectStore.OpenProject("proj1"sv)); CHECK(ProjectStore.OpenProject("proj2"sv)); } { GcContext GcCtx(GcClock::Now() + std::chrono::hours(24), GcClock::Now() + std::chrono::hours(24)); ProjectStore.GatherReferences(GcCtx); size_t RefCount = 0; GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; }); CHECK(RefCount == 14); ProjectStore.CollectGarbage(GcCtx); CHECK(ProjectStore.OpenProject("proj1"sv)); CHECK(ProjectStore.OpenProject("proj2"sv)); } std::filesystem::remove(Project1FilePath); { GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); ProjectStore.GatherReferences(GcCtx); size_t RefCount = 0; GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; }); CHECK(RefCount == 14); ProjectStore.CollectGarbage(GcCtx); CHECK(ProjectStore.OpenProject("proj1"sv)); CHECK(ProjectStore.OpenProject("proj2"sv)); } { GcContext GcCtx(GcClock::Now() + std::chrono::hours(24), GcClock::Now() + std::chrono::hours(24)); ProjectStore.GatherReferences(GcCtx); size_t RefCount = 0; GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; }); CHECK(RefCount == 7); ProjectStore.CollectGarbage(GcCtx); CHECK(!ProjectStore.OpenProject("proj1"sv)); CHECK(ProjectStore.OpenProject("proj2"sv)); } std::filesystem::remove(Project2OplogPath); { GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); ProjectStore.GatherReferences(GcCtx); size_t RefCount = 0; GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; }); CHECK(RefCount == 7); ProjectStore.CollectGarbage(GcCtx); CHECK(!ProjectStore.OpenProject("proj1"sv)); CHECK(ProjectStore.OpenProject("proj2"sv)); } { GcContext GcCtx(GcClock::Now() + std::chrono::hours(24), GcClock::Now() + std::chrono::hours(24)); ProjectStore.GatherReferences(GcCtx); size_t RefCount = 0; GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; }); CHECK(RefCount == 0); ProjectStore.CollectGarbage(GcCtx); CHECK(!ProjectStore.OpenProject("proj1"sv)); CHECK(ProjectStore.OpenProject("proj2"sv)); } std::filesystem::remove(Project2FilePath); { GcContext GcCtx(GcClock::Now() + std::chrono::hours(24), GcClock::Now() + std::chrono::hours(24)); ProjectStore.GatherReferences(GcCtx); size_t RefCount = 0; GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; }); CHECK(RefCount == 0); ProjectStore.CollectGarbage(GcCtx); CHECK(!ProjectStore.OpenProject("proj1"sv)); CHECK(!ProjectStore.OpenProject("proj2"sv)); } } TEST_CASE("project.store.partial.read") { using namespace std::literals; using namespace testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; ProjectStore ProjectStore(CidStore, BasePath, Gc); std::filesystem::path RootDir = TempDir.Path() / "root"sv; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv; std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv; { CreateDirectories(Project1FilePath.parent_path()); BasicFile ProjectFile; ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); } std::vector OpIds; OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()}); std::unordered_map>, Oid::Hasher> Attachments; { Ref Project1(ProjectStore.NewProject(BasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, {}); CHECK(Oplog != nullptr); Attachments[OpIds[0]] = {}; Attachments[OpIds[1]] = CreateAttachments(std::initializer_list{77}); Attachments[OpIds[2]] = CreateAttachments(std::initializer_list{7123, 9583, 690, 99}); Attachments[OpIds[3]] = CreateAttachments(std::initializer_list{55, 122}); for (auto It : Attachments) { Oplog->AppendNewOplogEntry(CreateOplogPackage(It.first, It.second)); } } { IoBuffer Chunk; CHECK(ProjectStore .GetChunk("proj1"sv, "oplog1"sv, Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(), HttpContentType::kCompressedBinary, Chunk) .first == HttpResponseCode::OK); IoHash RawHash; uint64_t RawSize; CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); CHECK(RawSize == Attachments[OpIds[1]][0].second.DecodeRawSize()); } IoBuffer ChunkResult; CHECK(ProjectStore .GetChunkRange("proj1"sv, "oplog1"sv, OidAsString(Attachments[OpIds[2]][1].first), 0, ~0ull, HttpContentType::kCompressedBinary, ChunkResult) .first == HttpResponseCode::OK); CHECK(ChunkResult); CHECK(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult)).DecodeRawSize() == Attachments[OpIds[2]][1].second.DecodeRawSize()); IoBuffer PartialChunkResult; CHECK(ProjectStore .GetChunkRange("proj1"sv, "oplog1"sv, OidAsString(Attachments[OpIds[2]][1].first), 5, 1773, HttpContentType::kCompressedBinary, PartialChunkResult) .first == HttpResponseCode::OK); CHECK(PartialChunkResult); IoHash PartialRawHash; uint64_t PartialRawSize; CompressedBuffer PartialCompressedResult = CompressedBuffer::FromCompressed(SharedBuffer(PartialChunkResult), PartialRawHash, PartialRawSize); CHECK(PartialRawSize >= 1773); uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5); SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed); SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress(); const uint8_t* FullDataPtr = &(reinterpret_cast(FullDecompressed.GetView().GetData())[5]); const uint8_t* PartialDataPtr = reinterpret_cast(PartialDecompressed.GetView().GetData()); CHECK(FullDataPtr[0] == PartialDataPtr[0]); } TEST_CASE("project.store.block") { using namespace std::literals; using namespace testutils; std::vector AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, 2257, 3685, 3489, 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251, 491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225}); std::vector> AttachmentsWithId = CreateAttachments(AttachmentSizes); std::vector Chunks; Chunks.reserve(AttachmentSizes.size()); for (const auto& It : AttachmentsWithId) { Chunks.push_back(It.second.GetCompressed().Flatten()); } CompressedBuffer Block = GenerateBlock(std::move(Chunks)); IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer(); CHECK(IterateBlock(std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {})); } #endif void prj_forcelink() { } } // namespace zen