diff options
| author | Dan Engelbrecht <[email protected]> | 2023-02-01 01:00:23 -0800 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-02-01 01:00:23 -0800 |
| commit | 149e8636b2965ec0cea0e25285a7d90e312d2b71 (patch) | |
| tree | 243b56d4b15c50e29c323d87c624dd2026e63a58 /zenserver/projectstore/projectstore.cpp | |
| parent | fix formatting of zenutil/include/zenutil/zenserverprocess.h (diff) | |
| download | zen-149e8636b2965ec0cea0e25285a7d90e312d2b71.tar.xz zen-149e8636b2965ec0cea0e25285a7d90e312d2b71.zip | |
Clean up project store file structure (#218)
* move project store to separate folder
* moved import/export project commands into projectstore cmd files
Diffstat (limited to 'zenserver/projectstore/projectstore.cpp')
| -rw-r--r-- | zenserver/projectstore/projectstore.cpp | 3097 |
1 files changed, 3097 insertions, 0 deletions
diff --git a/zenserver/projectstore/projectstore.cpp b/zenserver/projectstore/projectstore.cpp new file mode 100644 index 000000000..216a8cbf6 --- /dev/null +++ b/zenserver/projectstore/projectstore.cpp @@ -0,0 +1,3097 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "projectstore.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/compactbinaryvalue.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/stream.h> +#include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> +#include <zencore/timer.h> +#include <zencore/trace.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/httpshared.h> +#include <zenstore/caslog.h> +#include <zenstore/scrubcontext.h> +#include <zenutil/basicfile.h> + +#include "config.h" + +#include <latch> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <xxh3.h> +ZEN_THIRD_PARTY_INCLUDES_END + +#if ZEN_WITH_TESTS +#endif // ZEN_WITH_TESTS + +namespace zen { + +namespace { + bool PrepareDirectoryDelete(const std::filesystem::path& Dir, std::filesystem::path& OutDeleteDir) + { + int DropIndex = 0; + do + { + if (!std::filesystem::exists(Dir)) + { + return true; + } + + std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); + std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; + if (std::filesystem::exists(DroppedBucketPath)) + { + DropIndex++; + continue; + } + + std::error_code Ec; + std::filesystem::rename(Dir, DroppedBucketPath, Ec); + if (!Ec) + { + OutDeleteDir = DroppedBucketPath; + return true; + } + if (Ec && !std::filesystem::exists(DroppedBucketPath)) + { + // We can't move our folder, probably because it is busy, bail.. + return false; + } + Sleep(100); + } while (true); + } +} // namespace + +////////////////////////////////////////////////////////////////////////// + +Oid +OpKeyStringAsOId(std::string_view OpKey) +{ + using namespace std::literals; + + CbObjectWriter Writer; + Writer << "key"sv << OpKey; + + XXH3_128Stream KeyHasher; + Writer.Save()["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + XXH3_128 KeyHash = KeyHasher.GetHash(); + + Oid OpId; + memcpy(OpId.OidBits, &KeyHash, sizeof(OpId.OidBits)); + + return OpId; +} + +////////////////////////////////////////////////////////////////////////// + +struct ProjectStore::OplogStorage : public RefCounted +{ + OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath) + { + } + + ~OplogStorage() + { + ZEN_INFO("closing oplog storage at {}", m_OplogStoragePath); + Flush(); + } + + [[nodiscard]] bool Exists() { return Exists(m_OplogStoragePath); } + [[nodiscard]] static bool Exists(std::filesystem::path BasePath) + { + return std::filesystem::exists(BasePath / "ops.zlog") && std::filesystem::exists(BasePath / "ops.zops"); + } + + static bool Delete(std::filesystem::path BasePath) { return DeleteDirectories(BasePath); } + + uint64_t OpBlobsSize() const + { + RwLock::SharedLockScope _(m_RwLock); + return m_NextOpsOffset; + } + + void Open(bool IsCreate) + { + using namespace std::literals; + + ZEN_INFO("initializing oplog storage at '{}'", m_OplogStoragePath); + + if (IsCreate) + { + DeleteDirectories(m_OplogStoragePath); + CreateDirectories(m_OplogStoragePath); + } + + m_Oplog.Open(m_OplogStoragePath / "ops.zlog"sv, IsCreate ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); + m_Oplog.Initialize(); + + m_OpBlobs.Open(m_OplogStoragePath / "ops.zops"sv, IsCreate ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); + + ZEN_ASSERT(IsPow2(m_OpsAlign)); + ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1))); + } + + void ReplayLog(std::function<void(CbObject, const OplogEntry&)>&& Handler) + { + ZEN_TRACE_CPU("ProjectStore::OplogStorage::ReplayLog"); + + // This could use memory mapping or do something clever but for now it just reads the file sequentially + + ZEN_INFO("replaying log for '{}'", m_OplogStoragePath); + + Stopwatch Timer; + + uint64_t InvalidEntries = 0; + + IoBuffer OpBuffer; + m_Oplog.Replay( + [&](const OplogEntry& LogEntry) { + if (LogEntry.OpCoreSize == 0) + { + ++InvalidEntries; + + return; + } + + if (OpBuffer.GetSize() < LogEntry.OpCoreSize) + { + OpBuffer = IoBuffer(LogEntry.OpCoreSize); + } + + const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; + + m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); + + // Verify checksum, ignore op data if incorrect + const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), LogEntry.OpCoreSize) & 0xffffFFFF); + + if (OpCoreHash != LogEntry.OpCoreHash) + { + ZEN_WARN("skipping oplog entry with bad checksum!"); + return; + } + + CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), LogEntry.OpCoreSize)); + + m_NextOpsOffset = + Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); + m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn); + + Handler(Op, LogEntry); + }, + 0); + + if (InvalidEntries) + { + ZEN_WARN("ignored {} zero-sized oplog entries", InvalidEntries); + } + + ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}", + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + m_MaxLsn, + m_NextOpsOffset); + } + + void ReplayLog(const std::vector<OplogEntryAddress>& Entries, std::function<void(CbObject)>&& Handler) + { + for (const OplogEntryAddress& Entry : Entries) + { + CbObject Op = GetOp(Entry); + Handler(Op); + } + } + + CbObject GetOp(const OplogEntryAddress& Entry) + { + IoBuffer OpBuffer(Entry.Size); + + const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign; + m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset); + + return CbObject(SharedBuffer(std::move(OpBuffer))); + } + + OplogEntry AppendOp(CbObject Op) + { + ZEN_TRACE_CPU("ProjectStore::OplogStorage::AppendOp"); + + using namespace std::literals; + + SharedBuffer Buffer = Op.GetBuffer(); + const uint64_t WriteSize = Buffer.GetSize(); + const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF); + + ZEN_ASSERT(WriteSize != 0); + + XXH3_128Stream KeyHasher; + Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + XXH3_128 KeyHash = KeyHasher.GetHash(); + + RwLock::ExclusiveLockScope _(m_RwLock); + const uint64_t WriteOffset = m_NextOpsOffset; + const uint32_t OpLsn = ++m_MaxLsn; + + m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign); + + ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign)); + + OplogEntry Entry = {.OpLsn = OpLsn, + .OpCoreOffset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign), + .OpCoreSize = uint32_t(Buffer.GetSize()), + .OpCoreHash = OpCoreHash, + .OpKeyHash = KeyHash}; + + m_Oplog.Append(Entry); + + m_OpBlobs.Write(Buffer.GetData(), WriteSize, WriteOffset); + + return Entry; + } + + void Flush() + { + m_Oplog.Flush(); + m_OpBlobs.Flush(); + } + + spdlog::logger& Log() { return m_OwnerOplog->Log(); } + +private: + ProjectStore::Oplog* m_OwnerOplog; + std::filesystem::path m_OplogStoragePath; + mutable RwLock m_RwLock; + TCasLogFile<OplogEntry> m_Oplog; + BasicFile m_OpBlobs; + std::atomic<uint64_t> m_NextOpsOffset{0}; + uint64_t m_OpsAlign = 32; + std::atomic<uint32_t> m_MaxLsn{0}; +}; + +////////////////////////////////////////////////////////////////////////// + +ProjectStore::Oplog::Oplog(std::string_view Id, + Project* Project, + CidStore& Store, + std::filesystem::path BasePath, + const std::filesystem::path& MarkerPath) +: m_OuterProject(Project) +, m_CidStore(Store) +, m_BasePath(BasePath) +, m_MarkerPath(MarkerPath) +, m_OplogId(Id) +{ + using namespace std::literals; + + m_Storage = new OplogStorage(this, m_BasePath); + const bool StoreExists = m_Storage->Exists(); + m_Storage->Open(/* IsCreate */ !StoreExists); + + m_TempPath = m_BasePath / "temp"sv; + + CleanDirectory(m_TempPath); +} + +ProjectStore::Oplog::~Oplog() +{ + if (m_Storage) + { + Flush(); + } +} + +void +ProjectStore::Oplog::Flush() +{ + ZEN_ASSERT(m_Storage); + m_Storage->Flush(); +} + +void +ProjectStore::Oplog::Scrub(ScrubContext& Ctx) const +{ + ZEN_UNUSED(Ctx); +} + +void +ProjectStore::Oplog::GatherReferences(GcContext& GcCtx) +{ + RwLock::SharedLockScope _(m_OplogLock); + + std::vector<IoHash> Hashes; + Hashes.reserve(Max(m_ChunkMap.size(), m_MetaMap.size())); + + for (const auto& Kv : m_ChunkMap) + { + Hashes.push_back(Kv.second); + } + + GcCtx.AddRetainedCids(Hashes); + + Hashes.clear(); + + for (const auto& Kv : m_MetaMap) + { + Hashes.push_back(Kv.second); + } + + GcCtx.AddRetainedCids(Hashes); +} + +uint64_t +ProjectStore::Oplog::TotalSize() const +{ + RwLock::SharedLockScope _(m_OplogLock); + if (m_Storage) + { + return m_Storage->OpBlobsSize(); + } + return 0; +} + +bool +ProjectStore::Oplog::IsExpired() const +{ + if (m_MarkerPath.empty()) + { + return false; + } + return !std::filesystem::exists(m_MarkerPath); +} + +std::filesystem::path +ProjectStore::Oplog::PrepareForDelete(bool MoveFolder) +{ + RwLock::ExclusiveLockScope _(m_OplogLock); + m_ChunkMap.clear(); + m_MetaMap.clear(); + m_FileMap.clear(); + m_OpAddressMap.clear(); + m_LatestOpMap.clear(); + m_Storage = {}; + if (!MoveFolder) + { + return {}; + } + std::filesystem::path MovedDir; + if (PrepareDirectoryDelete(m_BasePath, MovedDir)) + { + return MovedDir; + } + return {}; +} + +bool +ProjectStore::Oplog::ExistsAt(std::filesystem::path BasePath) +{ + using namespace std::literals; + + std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; + return std::filesystem::is_regular_file(StateFilePath); +} + +void +ProjectStore::Oplog::Read() +{ + using namespace std::literals; + + std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; + if (std::filesystem::is_regular_file(StateFilePath)) + { + ZEN_INFO("reading config for oplog '{}' in project '{}' from {}", m_OplogId, m_OuterProject->Identifier, StateFilePath); + + BasicFile Blob; + Blob.Open(StateFilePath, BasicFile::Mode::kRead); + + IoBuffer Obj = Blob.ReadAll(); + CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); + + if (ValidationError != CbValidateError::None) + { + ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), StateFilePath); + return; + } + + CbObject Cfg = LoadCompactBinaryObject(Obj); + + m_MarkerPath = Cfg["gcpath"sv].AsString(); + } + else + { + ZEN_INFO("config for oplog '{}' in project '{}' not found at {}. Assuming legacy store", + m_OplogId, + m_OuterProject->Identifier, + StateFilePath); + } + ReplayLog(); +} + +void +ProjectStore::Oplog::Write() +{ + using namespace std::literals; + + BinaryWriter Mem; + + CbObjectWriter Cfg; + + Cfg << "gcpath"sv << PathToUtf8(m_MarkerPath); + + Cfg.Save(Mem); + + std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; + + ZEN_INFO("persisting config for oplog '{}' in project '{}' to {}", m_OplogId, m_OuterProject->Identifier, StateFilePath); + + BasicFile Blob; + Blob.Open(StateFilePath, BasicFile::Mode::kTruncate); + Blob.Write(Mem.Data(), Mem.Size(), 0); + Blob.Flush(); +} + +void +ProjectStore::Oplog::ReplayLog() +{ + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + if (!m_Storage) + { + return; + } + m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, Op, OpEntry, kUpdateReplay); }); +} + +IoBuffer +ProjectStore::Oplog::FindChunk(Oid ChunkId) +{ + RwLock::SharedLockScope OplogLock(m_OplogLock); + if (!m_Storage) + { + return IoBuffer{}; + } + + if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) + { + IoHash ChunkHash = ChunkIt->second; + OplogLock.ReleaseNow(); + + IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash); + Chunk.SetContentType(ZenContentType::kCompressedBinary); + + return Chunk; + } + + if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) + { + std::filesystem::path FilePath = m_OuterProject->RootDir / FileIt->second.ServerPath; + + OplogLock.ReleaseNow(); + + IoBuffer FileChunk = IoBufferBuilder::MakeFromFile(FilePath); + FileChunk.SetContentType(ZenContentType::kBinary); + + return FileChunk; + } + + if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) + { + IoHash ChunkHash = MetaIt->second; + OplogLock.ReleaseNow(); + + IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash); + Chunk.SetContentType(ZenContentType::kCompressedBinary); + + return Chunk; + } + + return {}; +} + +void +ProjectStore::Oplog::IterateFileMap( + std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn) +{ + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return; + } + + for (const auto& Kv : m_FileMap) + { + Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath); + } +} + +void +ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler) +{ + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return; + } + + std::vector<OplogEntryAddress> Entries; + Entries.reserve(m_LatestOpMap.size()); + + for (const auto& Kv : m_LatestOpMap) + { + const auto AddressEntry = m_OpAddressMap.find(Kv.second); + ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); + + Entries.push_back(AddressEntry->second); + } + + std::sort(Entries.begin(), Entries.end(), [](const OplogEntryAddress& Lhs, const OplogEntryAddress& Rhs) { + return Lhs.Offset < Rhs.Offset; + }); + + m_Storage->ReplayLog(Entries, [&](CbObject Op) { Handler(Op); }); +} + +std::optional<CbObject> +ProjectStore::Oplog::GetOpByKey(const Oid& Key) +{ + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return {}; + } + + if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end()) + { + const auto AddressEntry = m_OpAddressMap.find(LatestOp->second); + ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); + + return m_Storage->GetOp(AddressEntry->second); + } + + return {}; +} + +std::optional<CbObject> +ProjectStore::Oplog::GetOpByIndex(int Index) +{ + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return {}; + } + + if (const auto AddressEntryIt = m_OpAddressMap.find(Index); AddressEntryIt != m_OpAddressMap.end()) + { + return m_Storage->GetOp(AddressEntryIt->second); + } + + return {}; +} + +bool +ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, + Oid FileId, + IoHash Hash, + std::string_view ServerPath, + std::string_view ClientPath) +{ + if (ServerPath.empty() || ClientPath.empty()) + { + return false; + } + + if (Hash != IoHash::Zero) + { + m_ChunkMap.insert_or_assign(FileId, Hash); + } + + FileMapEntry Entry; + Entry.ServerPath = ServerPath; + Entry.ClientPath = ClientPath; + + m_FileMap[FileId] = std::move(Entry); + + if (Hash != IoHash::Zero) + { + m_ChunkMap.insert_or_assign(FileId, Hash); + } + + return true; +} + +void +ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash) +{ + m_ChunkMap.insert_or_assign(ChunkId, Hash); +} + +void +ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash) +{ + m_MetaMap.insert_or_assign(ChunkId, Hash); +} + +uint32_t +ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, + CbObject Core, + const OplogEntry& OpEntry, + UpdateType TypeOfUpdate) +{ + ZEN_TRACE_CPU("ProjectStore::Oplog::RegisterOplogEntry"); + + ZEN_UNUSED(TypeOfUpdate); + + // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing + // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however + + using namespace std::literals; + + // Update chunk id maps + + if (Core["package"sv]) + { + CbObjectView PkgObj = Core["package"sv].AsObjectView(); + Oid PackageId = PkgObj["id"sv].AsObjectId(); + IoHash PackageHash = PkgObj["data"sv].AsBinaryAttachment(); + + AddChunkMapping(OplogLock, PackageId, PackageHash); + + ZEN_DEBUG("package data {} -> {}", PackageId, PackageHash); + } + + for (CbFieldView& Entry : Core["bulkdata"sv]) + { + CbObjectView BulkObj = Entry.AsObjectView(); + + Oid BulkDataId = BulkObj["id"sv].AsObjectId(); + IoHash BulkDataHash = BulkObj["data"sv].AsBinaryAttachment(); + + AddChunkMapping(OplogLock, BulkDataId, BulkDataHash); + + ZEN_DEBUG("bulkdata {} -> {}", BulkDataId, BulkDataHash); + } + + if (Core["files"sv]) + { + Stopwatch Timer; + int32_t FileCount = 0; + int32_t ChunkCount = 0; + + for (CbFieldView& Entry : Core["files"sv]) + { + CbObjectView FileObj = Entry.AsObjectView(); + const Oid FileId = FileObj["id"sv].AsObjectId(); + IoHash FileDataHash = FileObj["data"sv].AsBinaryAttachment(); + std::string_view ServerPath = FileObj["serverpath"sv].AsString(); + std::string_view ClientPath = FileObj["clientpath"sv].AsString(); + + if (AddFileMapping(OplogLock, FileId, FileDataHash, ServerPath, ClientPath)) + { + ++FileCount; + } + else + { + ZEN_WARN("invalid file"); + } + } + + ZEN_DEBUG("added {} file(s), {} as files and {} as chunks in {}", + FileCount + ChunkCount, + FileCount, + ChunkCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + + for (CbFieldView& Entry : Core["meta"sv]) + { + CbObjectView MetaObj = Entry.AsObjectView(); + const Oid MetaId = MetaObj["id"sv].AsObjectId(); + auto NameString = MetaObj["name"sv].AsString(); + IoHash MetaDataHash = MetaObj["data"sv].AsBinaryAttachment(); + + AddMetaMapping(OplogLock, MetaId, MetaDataHash); + + ZEN_DEBUG("meta data ({}) {} -> {}", NameString, MetaId, MetaDataHash); + } + + m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); + m_LatestOpMap[OpEntry.OpKeyAsOId()] = OpEntry.OpLsn; + + return OpEntry.OpLsn; +} + +uint32_t +ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) +{ + ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); + + using namespace std::literals; + + const CbObject& Core = OpPackage.GetObject(); + const uint32_t EntryId = AppendNewOplogEntry(Core); + if (EntryId == 0xffffffffu) + { + // The oplog has been deleted so just drop this + return EntryId; + } + + // Persist attachments after oplog entry so GC won't find attachments without references + + uint64_t AttachmentBytes = 0; + uint64_t NewAttachmentBytes = 0; + + auto Attachments = OpPackage.GetAttachments(); + + for (const auto& Attach : Attachments) + { + ZEN_ASSERT(Attach.IsCompressedBinary()); + + CompressedBuffer AttachmentData = Attach.AsCompressedBinary(); + const uint64_t AttachmentSize = AttachmentData.DecodeRawSize(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData.GetCompressed().Flatten().AsIoBuffer(), Attach.GetHash()); + + if (InsertResult.New) + { + NewAttachmentBytes += AttachmentSize; + } + AttachmentBytes += AttachmentSize; + } + + ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes)); + + return EntryId; +} + +uint32_t +ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) +{ + ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); + + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + if (!m_Storage) + { + return 0xffffffffu; + } + + using namespace std::literals; + + const OplogEntry OpEntry = m_Storage->AppendOp(Core); + const uint32_t EntryId = RegisterOplogEntry(OplogLock, Core, OpEntry, kUpdateNewEntry); + + return EntryId; +} + +////////////////////////////////////////////////////////////////////////// + +ProjectStore::Project::Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath) +: m_ProjectStore(PrjStore) +, m_CidStore(Store) +, m_OplogStoragePath(BasePath) +{ +} + +ProjectStore::Project::~Project() +{ +} + +bool +ProjectStore::Project::Exists(std::filesystem::path BasePath) +{ + return std::filesystem::exists(BasePath / "Project.zcb"); +} + +void +ProjectStore::Project::Read() +{ + using namespace std::literals; + + std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv; + + ZEN_INFO("reading config for project '{}' from {}", Identifier, ProjectStateFilePath); + + BasicFile Blob; + Blob.Open(ProjectStateFilePath, BasicFile::Mode::kRead); + + IoBuffer Obj = Blob.ReadAll(); + CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); + + if (ValidationError == CbValidateError::None) + { + CbObject Cfg = LoadCompactBinaryObject(Obj); + + Identifier = Cfg["id"sv].AsString(); + RootDir = Cfg["root"sv].AsString(); + ProjectRootDir = Cfg["project"sv].AsString(); + EngineRootDir = Cfg["engine"sv].AsString(); + ProjectFilePath = Cfg["projectfile"sv].AsString(); + } + else + { + ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), ProjectStateFilePath); + } +} + +void +ProjectStore::Project::Write() +{ + using namespace std::literals; + + BinaryWriter Mem; + + CbObjectWriter Cfg; + Cfg << "id"sv << Identifier; + Cfg << "root"sv << PathToUtf8(RootDir); + Cfg << "project"sv << ProjectRootDir; + Cfg << "engine"sv << EngineRootDir; + Cfg << "projectfile"sv << ProjectFilePath; + + Cfg.Save(Mem); + + CreateDirectories(m_OplogStoragePath); + + std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv; + + ZEN_INFO("persisting config for project '{}' to {}", Identifier, ProjectStateFilePath); + + BasicFile Blob; + Blob.Open(ProjectStateFilePath, BasicFile::Mode::kTruncate); + Blob.Write(Mem.Data(), Mem.Size(), 0); + Blob.Flush(); +} + +spdlog::logger& +ProjectStore::Project::Log() +{ + return m_ProjectStore->Log(); +} + +std::filesystem::path +ProjectStore::Project::BasePathForOplog(std::string_view OplogId) +{ + return m_OplogStoragePath / OplogId; +} + +ProjectStore::Oplog* +ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath) +{ + RwLock::ExclusiveLockScope _(m_ProjectLock); + + std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); + + try + { + Oplog* Log = m_Oplogs + .try_emplace(std::string{OplogId}, + std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, MarkerPath)) + .first->second.get(); + + Log->Write(); + return Log; + } + catch (std::exception&) + { + // In case of failure we need to ensure there's no half constructed entry around + // + // (This is probably already ensured by the try_emplace implementation?) + + m_Oplogs.erase(std::string{OplogId}); + + return nullptr; + } +} + +ProjectStore::Oplog* +ProjectStore::Project::OpenOplog(std::string_view OplogId) +{ + { + RwLock::SharedLockScope _(m_ProjectLock); + + auto OplogIt = m_Oplogs.find(std::string(OplogId)); + + if (OplogIt != m_Oplogs.end()) + { + return OplogIt->second.get(); + } + } + + RwLock::ExclusiveLockScope _(m_ProjectLock); + + std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); + + if (Oplog::ExistsAt(OplogBasePath)) + { + // Do open of existing oplog + + try + { + Oplog* Log = + m_Oplogs + .try_emplace(std::string{OplogId}, + std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, std::filesystem::path{})) + .first->second.get(); + Log->Read(); + + return Log; + } + catch (std::exception& ex) + { + ZEN_WARN("failed to open oplog '{}' @ '{}': {}", OplogId, OplogBasePath, ex.what()); + + m_Oplogs.erase(std::string{OplogId}); + } + } + + return nullptr; +} + +void +ProjectStore::Project::DeleteOplog(std::string_view OplogId) +{ + std::filesystem::path DeletePath; + { + RwLock::ExclusiveLockScope _(m_ProjectLock); + + auto OplogIt = m_Oplogs.find(std::string(OplogId)); + + if (OplogIt != m_Oplogs.end()) + { + std::unique_ptr<Oplog>& Oplog = OplogIt->second; + DeletePath = Oplog->PrepareForDelete(true); + m_DeletedOplogs.emplace_back(std::move(Oplog)); + m_Oplogs.erase(OplogIt); + } + } + + // Erase content on disk + if (!DeletePath.empty()) + { + OplogStorage::Delete(DeletePath); + } +} + +std::vector<std::string> +ProjectStore::Project::ScanForOplogs() const +{ + DirectoryContent DirContent; + GetDirectoryContent(m_OplogStoragePath, DirectoryContent::IncludeDirsFlag, DirContent); + std::vector<std::string> Oplogs; + Oplogs.reserve(DirContent.Directories.size()); + for (const std::filesystem::path& DirPath : DirContent.Directories) + { + Oplogs.push_back(DirPath.filename().string()); + } + return Oplogs; +} + +void +ProjectStore::Project::IterateOplogs(std::function<void(const Oplog&)>&& Fn) const +{ + RwLock::SharedLockScope _(m_ProjectLock); + + for (auto& Kv : m_Oplogs) + { + Fn(*Kv.second); + } +} + +void +ProjectStore::Project::IterateOplogs(std::function<void(Oplog&)>&& Fn) +{ + RwLock::SharedLockScope _(m_ProjectLock); + + for (auto& Kv : m_Oplogs) + { + Fn(*Kv.second); + } +} + +void +ProjectStore::Project::Flush() +{ + // We only need to flush oplogs that we have already loaded + IterateOplogs([&](Oplog& Ops) { Ops.Flush(); }); +} + +void +ProjectStore::Project::Scrub(ScrubContext& Ctx) +{ + // Scrubbing needs to check all existing oplogs + std::vector<std::string> OpLogs = ScanForOplogs(); + for (const std::string& OpLogId : OpLogs) + { + OpenOplog(OpLogId); + } + IterateOplogs([&](const Oplog& Ops) { + if (!Ops.IsExpired()) + { + Ops.Scrub(Ctx); + } + }); +} + +void +ProjectStore::Project::GatherReferences(GcContext& GcCtx) +{ + ZEN_TRACE_CPU("ProjectStore::Project::GatherReferences"); + + Stopwatch Timer; + const auto Guard = MakeGuard([&] { + ZEN_DEBUG("gathered references from project store project {} in {}", Identifier, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + // GatherReferences needs to check all existing oplogs + std::vector<std::string> OpLogs = ScanForOplogs(); + for (const std::string& OpLogId : OpLogs) + { + OpenOplog(OpLogId); + } + IterateOplogs([&](Oplog& Ops) { + if (!Ops.IsExpired()) + { + Ops.GatherReferences(GcCtx); + } + }); +} + +uint64_t +ProjectStore::Project::TotalSize() const +{ + uint64_t Result = 0; + { + RwLock::SharedLockScope _(m_ProjectLock); + for (const auto& It : m_Oplogs) + { + Result += It.second->TotalSize(); + } + } + return Result; +} + +bool +ProjectStore::Project::PrepareForDelete(std::filesystem::path& OutDeletePath) +{ + RwLock::ExclusiveLockScope _(m_ProjectLock); + + for (auto& It : m_Oplogs) + { + // We don't care about the moved folder + It.second->PrepareForDelete(false); + m_DeletedOplogs.emplace_back(std::move(It.second)); + } + + m_Oplogs.clear(); + + bool Success = PrepareDirectoryDelete(m_OplogStoragePath, OutDeletePath); + if (!Success) + { + return false; + } + m_OplogStoragePath.clear(); + return true; +} + +bool +ProjectStore::Project::IsExpired() const +{ + if (ProjectFilePath.empty()) + { + return false; + } + return !std::filesystem::exists(ProjectFilePath); +} + +////////////////////////////////////////////////////////////////////////// + +ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc) +: GcStorage(Gc) +, GcContributor(Gc) +, m_Log(logging::Get("project")) +, m_CidStore(Store) +, m_ProjectBasePath(BasePath) +{ + ZEN_INFO("initializing project store at '{}'", BasePath); + // m_Log.set_level(spdlog::level::debug); +} + +ProjectStore::~ProjectStore() +{ + ZEN_INFO("closing project store ('{}')", m_ProjectBasePath); +} + +std::filesystem::path +ProjectStore::BasePathForProject(std::string_view ProjectId) +{ + return m_ProjectBasePath / ProjectId; +} + +void +ProjectStore::DiscoverProjects() +{ + if (!std::filesystem::exists(m_ProjectBasePath)) + { + return; + } + + DirectoryContent DirContent; + GetDirectoryContent(m_ProjectBasePath, DirectoryContent::IncludeDirsFlag, DirContent); + + for (const std::filesystem::path& DirPath : DirContent.Directories) + { + std::string DirName = PathToUtf8(DirPath.filename()); + OpenProject(DirName); + } +} + +void +ProjectStore::IterateProjects(std::function<void(Project& Prj)>&& Fn) +{ + RwLock::SharedLockScope _(m_ProjectsLock); + + for (auto& Kv : m_Projects) + { + Fn(*Kv.second.Get()); + } +} + +void +ProjectStore::Flush() +{ + std::vector<Ref<Project>> Projects; + { + RwLock::SharedLockScope _(m_ProjectsLock); + Projects.reserve(m_Projects.size()); + + for (auto& Kv : m_Projects) + { + Projects.push_back(Kv.second); + } + } + for (const Ref<Project>& Project : Projects) + { + Project->Flush(); + } +} + +void +ProjectStore::Scrub(ScrubContext& Ctx) +{ + DiscoverProjects(); + + std::vector<Ref<Project>> Projects; + { + RwLock::SharedLockScope _(m_ProjectsLock); + Projects.reserve(m_Projects.size()); + + for (auto& Kv : m_Projects) + { + if (Kv.second->IsExpired()) + { + continue; + } + Projects.push_back(Kv.second); + } + } + for (const Ref<Project>& Project : Projects) + { + Project->Scrub(Ctx); + } +} + +void +ProjectStore::GatherReferences(GcContext& GcCtx) +{ + ZEN_TRACE_CPU("ProjectStore::GatherReferences"); + + size_t ProjectCount = 0; + size_t ExpiredProjectCount = 0; + Stopwatch Timer; + const auto Guard = MakeGuard([&] { + ZEN_DEBUG("gathered references from '{}' in {}, found {} active projects and {} expired projects", + m_ProjectBasePath.string(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + ProjectCount, + ExpiredProjectCount); + }); + + DiscoverProjects(); + + std::vector<Ref<Project>> Projects; + { + RwLock::SharedLockScope _(m_ProjectsLock); + Projects.reserve(m_Projects.size()); + + for (auto& Kv : m_Projects) + { + if (Kv.second->IsExpired()) + { + ExpiredProjectCount++; + continue; + } + Projects.push_back(Kv.second); + } + } + ProjectCount = Projects.size(); + for (const Ref<Project>& Project : Projects) + { + Project->GatherReferences(GcCtx); + } +} + +void +ProjectStore::CollectGarbage(GcContext& GcCtx) +{ + ZEN_TRACE_CPU("ProjectStore::CollectGarbage"); + + size_t ProjectCount = 0; + size_t ExpiredProjectCount = 0; + + Stopwatch Timer; + const auto Guard = MakeGuard([&] { + ZEN_DEBUG("garbage collect from '{}' DONE after {}, found {} active projects and {} expired projects", + m_ProjectBasePath.string(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + ProjectCount, + ExpiredProjectCount); + }); + std::vector<Ref<Project>> ExpiredProjects; + std::vector<Ref<Project>> Projects; + + { + RwLock::SharedLockScope _(m_ProjectsLock); + for (auto& Kv : m_Projects) + { + if (Kv.second->IsExpired()) + { + ExpiredProjects.push_back(Kv.second); + ExpiredProjectCount++; + continue; + } + Projects.push_back(Kv.second); + ProjectCount++; + } + } + + if (!GcCtx.IsDeletionMode()) + { + ZEN_DEBUG("garbage collect DISABLED, for '{}' ", m_ProjectBasePath.string()); + return; + } + + for (const Ref<Project>& Project : Projects) + { + std::vector<std::string> ExpiredOplogs; + { + RwLock::ExclusiveLockScope _(m_ProjectsLock); + Project->IterateOplogs([&ExpiredOplogs](ProjectStore::Oplog& Oplog) { + if (Oplog.IsExpired()) + { + ExpiredOplogs.push_back(Oplog.OplogId()); + } + }); + } + for (const std::string& OplogId : ExpiredOplogs) + { + ZEN_DEBUG("ProjectStore::CollectGarbage garbage collected oplog '{}' in project '{}'. Removing storage on disk", + OplogId, + Project->Identifier); + Project->DeleteOplog(OplogId); + } + } + + if (ExpiredProjects.empty()) + { + ZEN_DEBUG("garbage collect for '{}', no expired projects found", m_ProjectBasePath.string()); + return; + } + + for (const Ref<Project>& Project : ExpiredProjects) + { + std::filesystem::path PathToRemove; + std::string ProjectId; + { + RwLock::ExclusiveLockScope _(m_ProjectsLock); + if (!Project->IsExpired()) + { + ZEN_DEBUG("ProjectStore::CollectGarbage skipped garbage collect of project '{}'. Project no longer expired.", ProjectId); + continue; + } + bool Success = Project->PrepareForDelete(PathToRemove); + if (!Success) + { + ZEN_DEBUG("ProjectStore::CollectGarbage skipped garbage collect of project '{}'. Project folder is locked.", ProjectId); + continue; + } + m_Projects.erase(Project->Identifier); + ProjectId = Project->Identifier; + } + + ZEN_DEBUG("ProjectStore::CollectGarbage garbage collected project '{}'. Removing storage on disk", ProjectId); + if (PathToRemove.empty()) + { + continue; + } + + DeleteDirectories(PathToRemove); + } +} + +GcStorageSize +ProjectStore::StorageSize() const +{ + GcStorageSize Result; + { + RwLock::SharedLockScope _(m_ProjectsLock); + for (auto& Kv : m_Projects) + { + const Ref<Project>& Project = Kv.second; + Result.DiskSize += Project->TotalSize(); + } + } + return Result; +} + +Ref<ProjectStore::Project> +ProjectStore::OpenProject(std::string_view ProjectId) +{ + { + RwLock::SharedLockScope _(m_ProjectsLock); + + auto ProjIt = m_Projects.find(std::string{ProjectId}); + + if (ProjIt != m_Projects.end()) + { + return ProjIt->second; + } + } + + RwLock::ExclusiveLockScope _(m_ProjectsLock); + + std::filesystem::path BasePath = BasePathForProject(ProjectId); + + if (Project::Exists(BasePath)) + { + try + { + ZEN_INFO("opening project {} @ {}", ProjectId, BasePath); + + Ref<Project>& Prj = + m_Projects + .try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath))) + .first->second; + Prj->Identifier = ProjectId; + Prj->Read(); + return Prj; + } + catch (std::exception& e) + { + ZEN_WARN("failed to open {} @ {} ({})", ProjectId, BasePath, e.what()); + m_Projects.erase(std::string{ProjectId}); + } + } + + return nullptr; +} + +Ref<ProjectStore::Project> +ProjectStore::NewProject(std::filesystem::path BasePath, + std::string_view ProjectId, + std::string_view RootDir, + std::string_view EngineRootDir, + std::string_view ProjectRootDir, + std::string_view ProjectFilePath) +{ + RwLock::ExclusiveLockScope _(m_ProjectsLock); + + Ref<Project>& Prj = + m_Projects.try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath))) + .first->second; + Prj->Identifier = ProjectId; + Prj->RootDir = RootDir; + Prj->EngineRootDir = EngineRootDir; + Prj->ProjectRootDir = ProjectRootDir; + Prj->ProjectFilePath = ProjectFilePath; + Prj->Write(); + + return Prj; +} + +bool +ProjectStore::DeleteProject(std::string_view ProjectId) +{ + ZEN_INFO("deleting project {}", ProjectId); + + RwLock::ExclusiveLockScope ProjectsLock(m_ProjectsLock); + + auto ProjIt = m_Projects.find(std::string{ProjectId}); + + if (ProjIt == m_Projects.end()) + { + return true; + } + + std::filesystem::path DeletePath; + bool Success = ProjIt->second->PrepareForDelete(DeletePath); + + if (!Success) + { + return false; + } + m_Projects.erase(ProjIt); + ProjectsLock.ReleaseNow(); + + if (!DeletePath.empty()) + { + DeleteDirectories(DeletePath); + } + return true; +} + +bool +ProjectStore::Exists(std::string_view ProjectId) +{ + return Project::Exists(BasePathForProject(ProjectId)); +} + +CbArray +ProjectStore::GetProjectsList() +{ + using namespace std::literals; + + DiscoverProjects(); + + CbWriter Response; + Response.BeginArray(); + + IterateProjects([&Response](ProjectStore::Project& Prj) { + Response.BeginObject(); + Response << "Id"sv << Prj.Identifier; + Response << "RootDir"sv << Prj.RootDir.string(); + Response << "ProjectRootDir"sv << Prj.ProjectRootDir; + Response << "EngineRootDir"sv << Prj.EngineRootDir; + Response << "ProjectFilePath"sv << Prj.ProjectFilePath; + Response.EndObject(); + }); + Response.EndArray(); + return Response.Save().AsArray(); +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::GetProjectFiles(const std::string_view ProjectId, const std::string_view OplogId, bool FilterClient, CbObject& OutPayload) +{ + using namespace std::literals; + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Project files request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return {HttpResponseCode::NotFound, fmt::format("Project files for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + CbObjectWriter Response; + Response.BeginArray("files"sv); + + FoundLog->IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) { + Response.BeginObject(); + Response << "id"sv << Id; + Response << "clientpath"sv << ClientPath; + if (!FilterClient) + { + Response << "serverpath"sv << ServerPath; + } + Response.EndObject(); + }); + + Response.EndArray(); + OutPayload = Response.Save(); + return {HttpResponseCode::OK, {}}; +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::GetChunkInfo(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view ChunkId, + CbObject& OutPayload) +{ + using namespace std::literals; + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) + { + return {HttpResponseCode::BadRequest, + fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)}; + } + + const Oid Obj = Oid::FromHexString(ChunkId); + + IoBuffer Chunk = FoundLog->FindChunk(Obj); + if (!Chunk) + { + return {HttpResponseCode::NotFound, {}}; + } + + uint64_t ChunkSize = Chunk.GetSize(); + if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) + { + IoHash RawHash; + uint64_t RawSize; + bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize); + ZEN_ASSERT(IsCompressed); + ChunkSize = RawSize; + } + + CbObjectWriter Response; + Response << "size"sv << ChunkSize; + OutPayload = Response.Save(); + return {HttpResponseCode::OK, {}}; +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::GetChunkRange(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view ChunkId, + uint64_t Offset, + uint64_t Size, + ZenContentType AcceptType, + IoBuffer& OutChunk) +{ + bool IsOffset = Offset != 0 || Size != ~(0ull); + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) + { + return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)}; + } + + const Oid Obj = Oid::FromHexString(ChunkId); + + IoBuffer Chunk = FoundLog->FindChunk(Obj); + if (!Chunk) + { + return {HttpResponseCode::NotFound, {}}; + } + + OutChunk = Chunk; + HttpContentType ContentType = Chunk.GetContentType(); + + if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); + ZEN_ASSERT(!Compressed.IsNull()); + + if (IsOffset) + { + if ((Offset + Size) > RawSize) + { + Size = RawSize - Offset; + } + + if (AcceptType == HttpContentType::kBinary) + { + OutChunk = Compressed.Decompress(Offset, Size).AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kBinary); + } + else + { + // Value will be a range of compressed blocks that covers the requested range + // The client will have to compensate for any offsets that do not land on an even block size multiple + OutChunk = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kCompressedBinary); + } + } + else + { + if (AcceptType == HttpContentType::kBinary) + { + OutChunk = Compressed.Decompress().AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kBinary); + } + else + { + OutChunk = Compressed.GetCompressed().Flatten().AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kCompressedBinary); + } + } + } + else if (IsOffset) + { + if ((Offset + Size) > Chunk.GetSize()) + { + Size = Chunk.GetSize() - Offset; + } + OutChunk = IoBuffer(Chunk, Offset, Size); + OutChunk.SetContentType(ContentType); + } + + return {HttpResponseCode::OK, {}}; +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::GetChunk(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view Cid, + ZenContentType AcceptType, + IoBuffer& OutChunk) +{ + using namespace std::literals; + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + if (Cid.length() != IoHash::StringLength) + { + return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, Cid)}; + } + + const IoHash Hash = IoHash::FromHexString(Cid); + OutChunk = m_CidStore.FindChunkByCid(Hash); + + if (!OutChunk) + { + return {HttpResponseCode::NotFound, fmt::format("chunk - '{}' MISSING", Cid)}; + } + + if (AcceptType == HttpContentType::kBinary) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(OutChunk)); + OutChunk = Compressed.Decompress().AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kBinary); + } + else + { + OutChunk.SetContentType(HttpContentType::kCompressedBinary); + } + return {HttpResponseCode::OK, {}}; +} + +////////////////////////////////////////////////////////////////////////// + +HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) +: m_Log(logging::Get("project")) +, m_CidStore(Store) +, m_ProjectStore(Projects) +{ + using namespace std::literals; + + m_Router.AddPattern("project", "([[:alnum:]_.]+)"); + m_Router.AddPattern("log", "([[:alnum:]_.]+)"); + m_Router.AddPattern("op", "([[:digit:]]+?)"); + m_Router.AddPattern("chunk", "([[:xdigit:]]{24})"); + m_Router.AddPattern("hash", "([[:xdigit:]]{40})"); + + m_Router.RegisterRoute( + "", + [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_ProjectStore->GetProjectsList()); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "list", + [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_ProjectStore->GetProjectsList()); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/batch", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + // Parse Request + + IoBuffer Payload = HttpReq.ReadPayload(); + BinaryReader Reader(Payload); + + struct RequestHeader + { + enum + { + kMagic = 0xAAAA'77AC + }; + uint32_t Magic; + uint32_t ChunkCount; + uint32_t Reserved1; + uint32_t Reserved2; + }; + + struct RequestChunkEntry + { + Oid ChunkId; + uint32_t CorrelationId; + uint64_t Offset; + uint64_t RequestBytes; + }; + + if (Payload.Size() <= sizeof(RequestHeader)) + { + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + RequestHeader RequestHdr; + Reader.Read(&RequestHdr, sizeof RequestHdr); + + if (RequestHdr.Magic != RequestHeader::kMagic) + { + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + std::vector<RequestChunkEntry> RequestedChunks; + RequestedChunks.resize(RequestHdr.ChunkCount); + Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount); + + // Make Response + + struct ResponseHeader + { + uint32_t Magic = 0xbada'b00f; + uint32_t ChunkCount; + uint32_t Reserved1 = 0; + uint32_t Reserved2 = 0; + }; + + struct ResponseChunkEntry + { + uint32_t CorrelationId; + uint32_t Flags = 0; + uint64_t ChunkSize; + }; + + std::vector<IoBuffer> OutBlobs; + OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry)); + for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) + { + const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; + IoBuffer FoundChunk = FoundLog->FindChunk(RequestedChunk.ChunkId); + if (FoundChunk) + { + if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1)) + { + uint64_t Offset = RequestedChunk.Offset; + if (Offset > FoundChunk.Size()) + { + Offset = FoundChunk.Size(); + } + uint64_t Size = RequestedChunk.RequestBytes; + if ((Offset + Size) > FoundChunk.Size()) + { + Size = FoundChunk.Size() - Offset; + } + FoundChunk = IoBuffer(FoundChunk, Offset, Size); + } + } + OutBlobs.emplace_back(std::move(FoundChunk)); + } + uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData()); + ResponseHeader ResponseHdr; + ResponseHdr.ChunkCount = RequestHdr.ChunkCount; + memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr)); + ResponsePtr += sizeof(ResponseHdr); + for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) + { + // const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; + const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]); + ResponseChunkEntry ResponseChunk; + ResponseChunk.CorrelationId = ChunkIndex; + if (FoundChunk) + { + ResponseChunk.ChunkSize = FoundChunk.Size(); + } + else + { + ResponseChunk.ChunkSize = uint64_t(-1); + } + memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk)); + ResponsePtr += sizeof(ResponseChunk); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/files", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + // File manifest fetch, returns the client file list + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + + const bool FilterClient = Params.GetValue("filter"sv) == "client"sv; + + CbObject ResponsePayload; + std::pair<HttpResponseCode, std::string> Result = + m_ProjectStore->GetProjectFiles(ProjectId, OplogId, FilterClient, ResponsePayload); + if (Result.first == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); + } + else + { + ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", + ToString(HttpReq.RequestVerb()), + HttpReq.QueryString(), + static_cast<int>(Result.first), + Result.second); + } + + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/{chunk}/info", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + const auto& ChunkId = Req.GetCapture(3); + + CbObject ResponsePayload; + std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->GetChunkInfo(ProjectId, OplogId, ChunkId, ResponsePayload); + if (Result.first == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); + } + else if (Result.first == HttpResponseCode::NotFound) + { + ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); + } + else + { + ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", + ToString(HttpReq.RequestVerb()), + HttpReq.QueryString(), + static_cast<int>(Result.first), + Result.second); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/{chunk}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + const auto& ChunkId = Req.GetCapture(3); + + uint64_t Offset = 0; + uint64_t Size = ~(0ull); + + auto QueryParms = Req.ServerRequest().GetQueryParams(); + + if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false) + { + if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm)) + { + Offset = OffsetVal.value(); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + } + + if (auto SizeParm = QueryParms.GetValue("size"); SizeParm.empty() == false) + { + if (auto SizeVal = ParseInt<uint64_t>(SizeParm)) + { + Size = SizeVal.value(); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + } + + HttpContentType AcceptType = HttpReq.AcceptContentType(); + + IoBuffer Chunk; + std::pair<HttpResponseCode, std::string> Result = + m_ProjectStore->GetChunkRange(ProjectId, OplogId, ChunkId, Offset, Size, AcceptType, Chunk); + if (Result.first == HttpResponseCode::OK) + { + ZEN_DEBUG("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(Chunk.GetContentType())); + return HttpReq.WriteResponse(HttpResponseCode::OK, Chunk.GetContentType(), Chunk); + } + else if (Result.first == HttpResponseCode::NotFound) + { + ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); + } + else + { + ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", + ToString(HttpReq.RequestVerb()), + HttpReq.QueryString(), + static_cast<int>(Result.first), + Result.second); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + }, + HttpVerb::kGet | HttpVerb::kHead); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/{hash}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + const auto& Cid = Req.GetCapture(3); + HttpContentType AcceptType = HttpReq.AcceptContentType(); + + IoBuffer Value; + std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value); + + if (Result.first == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value); + } + else if (Result.first == HttpResponseCode::NotFound) + { + ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid); + } + else + { + ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", + ToString(HttpReq.RequestVerb()), + HttpReq.QueryString(), + static_cast<int>(Result.first), + Result.second); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/prep", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + // This operation takes a list of referenced hashes and decides which + // chunks are not present on this server. This list is then returned in + // the "need" list in the response + + IoBuffer Payload = HttpReq.ReadPayload(); + CbObject RequestObject = LoadCompactBinaryObject(Payload); + + std::vector<IoHash> NeedList; + + for (auto Entry : RequestObject["have"sv]) + { + const IoHash FileHash = Entry.AsHash(); + + if (!m_CidStore.ContainsChunk(FileHash)) + { + ZEN_DEBUG("prep - NEED: {}", FileHash); + + NeedList.push_back(FileHash); + } + } + + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + + for (const IoHash& Hash : NeedList) + { + Cbo << Hash; + } + + Cbo.EndArray(); + CbObject Response = Cbo.Save(); + + return HttpReq.WriteResponse(HttpResponseCode::OK, Response); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/new", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + + bool IsUsingSalt = false; + IoHash SaltHash = IoHash::Zero; + + if (std::string_view SaltParam = Params.GetValue("salt"); SaltParam.empty() == false) + { + const uint32_t Salt = std::stoi(std::string(SaltParam)); + SaltHash = IoHash::HashBuffer(&Salt, sizeof Salt); + IsUsingSalt = true; + } + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog& Oplog = *FoundLog; + + IoBuffer Payload = HttpReq.ReadPayload(); + + // This will attempt to open files which may not exist for the case where + // the prep step rejected the chunk. This should be fixed since there's + // a performance cost associated with any file system activity + + bool IsValid = true; + std::vector<IoHash> MissingChunks; + + CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer { + if (m_CidStore.ContainsChunk(Hash)) + { + // Return null attachment as we already have it, no point in reading it and storing it again + return {}; + } + + IoHash AttachmentId; + if (IsUsingSalt) + { + IoHash AttachmentSpec[]{SaltHash, Hash}; + AttachmentId = IoHash::HashBuffer(MakeMemoryView(AttachmentSpec)); + } + else + { + AttachmentId = Hash; + } + + std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString(); + if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath)) + { + return SharedBuffer(std::move(Data)); + } + else + { + IsValid = false; + MissingChunks.push_back(Hash); + + return {}; + } + }; + + CbPackage Package; + + if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver)) + { + std::filesystem::path BadPackagePath = + Oplog.TempPath() / "bad_packages"sv / fmt::format("session{}_request{}"sv, HttpReq.SessionId(), HttpReq.RequestId()); + + ZEN_WARN("Received malformed package! Saving payload to '{}'", BadPackagePath); + + WriteFile(BadPackagePath, Payload); + + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"); + } + + if (!IsValid) + { + // TODO: emit diagnostics identifying missing chunks + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing chunk reference"); + } + + CbObject Core = Package.GetObject(); + + if (!Core["key"sv]) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified"); + } + + // Write core to oplog + + const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Package); + + if (OpLsn == ProjectStore::Oplog::kInvalidOp) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Payload.Size()), Core["key"sv].AsString()); + + HttpReq.WriteResponse(HttpResponseCode::Created); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/{op}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const std::string& ProjectId = Req.GetCapture(1); + const std::string& OplogId = Req.GetCapture(2); + const std::string& OpIdString = Req.GetCapture(3); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog& Oplog = *FoundLog; + + if (const std::optional<int32_t> OpId = ParseInt<uint32_t>(OpIdString)) + { + if (std::optional<CbObject> MaybeOp = Oplog.GetOpByIndex(OpId.value())) + { + CbObject& Op = MaybeOp.value(); + if (Req.ServerRequest().AcceptContentType() == ZenContentType::kCbPackage) + { + CbPackage Package; + Package.SetObject(Op); + + Op.IterateAttachments([&](CbFieldView FieldView) { + const IoHash AttachmentHash = FieldView.AsAttachment(); + IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash); + + // We force this for now as content type is not consistently tracked (will + // be fixed in CidStore refactor) + Payload.SetContentType(ZenContentType::kCompressedBinary); + + if (Payload) + { + switch (Payload.GetContentType()) + { + case ZenContentType::kCbObject: + if (CbObject Object = LoadCompactBinaryObject(Payload)) + { + Package.AddAttachment(CbAttachment(Object)); + } + else + { + // Error - malformed object + + ZEN_WARN("malformed object returned for {}", AttachmentHash); + } + break; + + case ZenContentType::kCompressedBinary: + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload))) + { + Package.AddAttachment(CbAttachment(Compressed, AttachmentHash)); + } + else + { + // Error - not compressed! + + ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash); + } + break; + + default: + Package.AddAttachment(CbAttachment(SharedBuffer(Payload))); + break; + } + } + }); + + return HttpReq.WriteResponse(HttpResponseCode::Accepted, Package); + } + else + { + // Client cannot accept a package, so we only send the core object + return HttpReq.WriteResponse(HttpResponseCode::Accepted, Op); + } + } + } + + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/archive", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + switch (Req.ServerRequest().RequestVerb()) + { + case HttpVerb::kGet: + { + CbObjectWriter Response; + Response.BeginArray("entries"sv); + std::unordered_set<IoHash> AttachementHashes; + size_t OpCount = 0; + IoHashStream Hasher; + + FoundLog->IterateOplog([this, &Hasher, &Response, &AttachementHashes, &OpCount](CbObject Op) { + SharedBuffer Buffer = Op.GetBuffer(); + Hasher.Append(Buffer.GetView()); + Response << Op; + Op.IterateAttachments([this, &AttachementHashes, &OpCount](CbFieldView FieldView) { + const IoHash AttachmentHash = FieldView.AsAttachment(); + AttachementHashes.emplace(AttachmentHash); + }); + OpCount++; + }); + Response.EndArray(); + + IoHash Checksum = Hasher.GetHash(); + Response.AddHash("checksum"sv, Checksum); + + ZEN_INFO("Exporting {} ops and {} chunks from '{}/{}' with checksum '{}'", + OpCount, + AttachementHashes.size(), + ProjectId, + OplogId, + Checksum); + + CbPackage ResponsePackage; + ResponsePackage.SetObject(Response.Save()); + + std::vector<CbAttachment> Attachments; + Attachments.reserve(AttachementHashes.size()); + for (const IoHash& AttachmentHash : AttachementHashes) + { + IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash); + if (Payload) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload)); + ZEN_ASSERT(Compressed); + Attachments.emplace_back(CbAttachment(Compressed, AttachmentHash)); + } + } + ResponsePackage.AddAttachments(Attachments); + + std::vector<IoBuffer> ResponsePayload = FormatPackageMessage(ResponsePackage, FormatFlags::kAllowLocalReferences); + const ZenContentType AcceptType = HttpReq.AcceptContentType(); + if (AcceptType == ZenContentType::kCompressedBinary) + { + std::vector<SharedBuffer> Parts; + Parts.reserve(ResponsePayload.size()); + for (const auto& I : ResponsePayload) + { + Parts.emplace_back(SharedBuffer(I)); + } + CompositeBuffer Cmp(std::move(Parts)); + CompressedBuffer CompressedResponse = CompressedBuffer::Compress(Cmp); + HttpReq.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCompressedBinary, + CompressedResponse.GetCompressed().Flatten().AsIoBuffer()); + } + else if (AcceptType == ZenContentType::kCbPackage) + { + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponsePayload); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + } + break; + case HttpVerb::kPost: + { + ZEN_INFO("Importing oplog '{}/{}'", ProjectId, OplogId); + IoBuffer CompressedPayload = HttpReq.ReadPayload(); + IoBuffer Payload = + CompressedBuffer::FromCompressedNoValidate(std::move(CompressedPayload)).Decompress().AsIoBuffer(); + + CbPackage RequestPackage = ParsePackageMessage(Payload); + CbObject Request = RequestPackage.GetObject(); + IoHash Checksum = Request["checksum"sv].AsHash(); + + std::span<const CbAttachment> Attachments = RequestPackage.GetAttachments(); + zen ::CbArrayView Entries = Request["entries"sv].AsArrayView(); + + ZEN_INFO("Importing oplog with {} ops and {} attachments with checksum '{}' to '{}/{}'", + Entries.Num(), + Attachments.size(), + Checksum, + ProjectId, + OplogId); + std::vector<CbObject> Ops; + Ops.reserve(Entries.Num()); + IoHashStream Hasher; + for (auto& OpEntry : Entries) + { + CbObjectView Core = OpEntry.AsObjectView(); + + if (!Core["key"sv]) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "No oplog entry key specified"); + } + + BinaryWriter Writer; + Core.CopyTo(Writer); + MemoryView OpView = Writer.GetView(); + Hasher.Append(OpView); + IoBuffer OpBuffer(IoBuffer::Clone, OpView.GetData(), OpView.GetSize()); + CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); + Ops.emplace_back(Op); + } + IoHash CalculatedChecksum = Hasher.GetHash(); + if (CalculatedChecksum != Checksum) + { + ZEN_WARN("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum); + return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); + } + + ZEN_INFO("Writing {} ops for '{}/{}'", Ops.size(), ProjectId, OplogId); + for (const CbObject& Op : Ops) + { + const uint32_t OpLsn = FoundLog->AppendNewOplogEntry(Op); + ZEN_DEBUG("oplog entry #{}", OpLsn); + + if (OpLsn == ProjectStore::Oplog::kInvalidOp) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", + ProjectId, + OplogId, + OpLsn, + NiceBytes(Op.GetSize()), + Op["key"sv].AsString()); + } + + // Persist attachments after oplog entry so GC won't find attachments without references + ZEN_INFO("Writing {} attachments for '{}/{}'", Attachments.size(), ProjectId, OplogId); + + // We are creating a worker thread pool here since we are storing a lot of attachments in one go + // Doing import is a rare and transient occation so we don't want to keep a WorkerThreadPool alive. + WorkerThreadPool WorkerPool(Min(std::thread::hardware_concurrency(), 16u)); + Latch JobCount{gsl::narrow_cast<std::ptrdiff_t>(Attachments.size())}; + for (const CbAttachment& Attachment : Attachments) + { + WorkerPool.ScheduleWork([this, &Attachment, &JobCount, ProjectId, OplogId]() { + try + { + CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary(); + m_CidStore.AddChunk(AttachmentBody.GetCompressed().Flatten().AsIoBuffer(), + Attachment.GetHash(), + CidStore::InsertMode::kCopyOnly); + } + catch (std::exception& e) + { + ZEN_ERROR("Failed to store attachment {} for '{}/{}', reason: '{}'", + Attachment.GetHash(), + ProjectId, + OplogId, + e.what()); + } + JobCount.CountDown(); + }); + } + JobCount.Wait(); + + ZEN_INFO("Imported {} ops and {} attachments to '{}/{}'", Entries.Num(), Attachments.size(), ProjectId, OplogId); + return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); + } + break; + default: + break; + } + }, + HttpVerb::kPost | HttpVerb::kGet); + m_Router.RegisterRoute( + "{project}/oplog/{log}", + [this](HttpRouterRequest& Req) { + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + + if (!Project) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("project {} not found", ProjectId)); + } + + switch (Req.ServerRequest().RequestVerb()) + { + case HttpVerb::kGet: + { + ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId); + + if (!OplogIt) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("oplog {} not found in project {}", OplogId, ProjectId)); + } + + ProjectStore::Oplog& Log = *OplogIt; + + CbObjectWriter Cb; + Cb << "id"sv << Log.OplogId() << "project"sv << Project->Identifier << "tempdir"sv << Log.TempPath().c_str() + << "markerpath"sv << Log.MarkerPath().c_str() << "totalsize"sv << Log.TotalSize() << "opcount" + << Log.OplogCount() << "expired"sv << Log.IsExpired(); + + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cb.Save()); + } + break; + + case HttpVerb::kPost: + { + std::filesystem::path OplogMarkerPath; + IoBuffer Payload = Req.ServerRequest().ReadPayload(); + if (Payload.GetSize() > 0) + { + CbObject Params = LoadCompactBinaryObject(Payload); + OplogMarkerPath = Params["gcpath"sv].AsString(); + } + + ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId); + + if (!OplogIt) + { + if (!Project->NewOplog(OplogId, OplogMarkerPath)) + { + // TODO: indicate why the operation failed! + return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError); + } + + ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); + + return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); + } + + // I guess this should ultimately be used to execute RPCs but for now, it + // does absolutely nothing + + return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); + } + break; + + case HttpVerb::kDelete: + { + ZEN_INFO("deleting oplog '{}/{}'", ProjectId, OplogId); + + Project->DeleteOplog(OplogId); + + return Req.ServerRequest().WriteResponse(HttpResponseCode::OK); + } + break; + + default: + break; + } + }, + HttpVerb::kPost | HttpVerb::kGet | HttpVerb::kDelete); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/entries", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + CbObjectWriter Response; + + if (FoundLog->OplogCount() > 0) + { + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + + if (auto OpKey = Params.GetValue("opkey"); !OpKey.empty()) + { + Oid OpKeyId = OpKeyStringAsOId(OpKey); + std::optional<CbObject> Op = FoundLog->GetOpByKey(OpKeyId); + + if (Op.has_value()) + { + Response << "entry"sv << Op.value(); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + } + else + { + Response.BeginArray("entries"sv); + + FoundLog->IterateOplog([&Response](CbObject Op) { Response << Op; }); + + Response.EndArray(); + } + } + + return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}", + [this](HttpRouterRequest& Req) { + const std::string ProjectId = Req.GetCapture(1); + + switch (Req.ServerRequest().RequestVerb()) + { + case HttpVerb::kPost: + { + IoBuffer Payload = Req.ServerRequest().ReadPayload(); + CbObject Params = LoadCompactBinaryObject(Payload); + std::string_view Id = Params["id"sv].AsString(); + std::string_view Root = Params["root"sv].AsString(); + std::string_view EngineRoot = Params["engine"sv].AsString(); + std::string_view ProjectRoot = Params["project"sv].AsString(); + std::string_view ProjectFilePath = Params["projectfile"sv].AsString(); + + const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId; + m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath); + + ZEN_INFO("established project - {} (id: '{}', roots: '{}', '{}', '{}', '{}'{})", + ProjectId, + Id, + Root, + EngineRoot, + ProjectRoot, + ProjectFilePath, + ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : ""); + + Req.ServerRequest().WriteResponse(HttpResponseCode::Created); + } + break; + + case HttpVerb::kGet: + { + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + + if (!Project) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("project {} not found", ProjectId)); + } + + std::vector<std::string> OpLogs = Project->ScanForOplogs(); + + CbObjectWriter Response; + Response << "id"sv << Project->Identifier; + Response << "root"sv << PathToUtf8(Project->RootDir); + Response << "engine"sv << PathToUtf8(Project->EngineRootDir); + Response << "project"sv << PathToUtf8(Project->ProjectRootDir); + Response << "projectfile"sv << PathToUtf8(Project->ProjectFilePath); + + Response.BeginArray("oplogs"sv); + for (const std::string& OplogId : OpLogs) + { + Response.BeginObject(); + Response << "id"sv << OplogId; + Response.EndObject(); + } + Response.EndArray(); // oplogs + + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save()); + } + break; + + case HttpVerb::kDelete: + { + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + + if (!Project) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("project {} not found", ProjectId)); + } + + ZEN_INFO("deleting project '{}'", ProjectId); + if (!m_ProjectStore->DeleteProject(ProjectId)) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::Locked, + HttpContentType::kText, + fmt::format("project {} is in use", ProjectId)); + } + + return Req.ServerRequest().WriteResponse(HttpResponseCode::NoContent); + } + break; + + default: + break; + } + }, + HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kDelete); +} + +HttpProjectService::~HttpProjectService() +{ +} + +const char* +HttpProjectService::BaseUri() const +{ + return "/prj/"; +} + +void +HttpProjectService::HandleRequest(HttpServerRequest& Request) +{ + if (m_Router.HandleRequest(Request) == false) + { + ZEN_WARN("No route found for {0}", Request.RelativeUri()); + } +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +namespace testutils { + using namespace std::literals; + + std::string OidAsString(const Oid& Id) + { + StringBuilder<25> OidStringBuilder; + Id.ToString(OidStringBuilder); + return OidStringBuilder.ToString(); + } + + CbPackage CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) + { + CbPackage Package; + CbObjectWriter Object; + Object << "key"sv << OidAsString(Id); + if (!Attachments.empty()) + { + Object.BeginArray("bulkdata"); + for (const auto& Attachment : Attachments) + { + CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); + Object.BeginObject(); + Object << "id"sv << Attachment.first; + Object << "type"sv + << "Standard"sv; + Object << "data"sv << Attach; + Object.EndObject(); + + Package.AddAttachment(Attach); + } + Object.EndArray(); + } + Package.SetObject(Object.Save()); + return Package; + }; + + std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes) + { + std::vector<std::pair<Oid, CompressedBuffer>> Result; + Result.reserve(Sizes.size()); + for (size_t Size : Sizes) + { + std::vector<uint8_t> Data; + Data.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Data[Idx] = Idx % 255; + } + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); + Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); + } + return Result; + } + + uint64 GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset) + { + if (RawOffset > 0) + { + uint64 BlockSize = 0; + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + { + return 0; + } + return BlockSize > 0 ? RawOffset % BlockSize : 0; + } + return 0; + } + +} // namespace testutils + +TEST_CASE("project.store.create") +{ + using namespace std::literals; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::string_view ProjectName("proj1"sv); + std::filesystem::path BasePath = TempDir.Path() / "projectstore"; + ProjectStore ProjectStore(CidStore, BasePath, Gc); + std::filesystem::path RootDir = TempDir.Path() / "root"; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; + std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; + std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; + + Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / ProjectName, + ProjectName, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + CHECK(ProjectStore.DeleteProject(ProjectName)); + CHECK(!Project->Exists(BasePath)); +} + +TEST_CASE("project.store.lifetimes") +{ + using namespace std::literals; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"; + ProjectStore ProjectStore(CidStore, BasePath, Gc); + std::filesystem::path RootDir = TempDir.Path() / "root"; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; + std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; + std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; + + Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {}); + CHECK(Oplog != nullptr); + + std::filesystem::path DeletePath; + CHECK(Project->PrepareForDelete(DeletePath)); + CHECK(!DeletePath.empty()); + CHECK(Project->OpenOplog("oplog1") == nullptr); + // Oplog is now invalid, but pointer can still be accessed since we store old oplog pointers + CHECK(Oplog->OplogCount() == 0); + // Project is still valid since we have a Ref to it + CHECK(Project->Identifier == "proj1"sv); +} + +TEST_CASE("project.store.gc") +{ + using namespace std::literals; + using namespace testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"; + ProjectStore ProjectStore(CidStore, BasePath, Gc); + std::filesystem::path RootDir = TempDir.Path() / "root"; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; + + std::filesystem::path Project1RootDir = TempDir.Path() / "game1"; + std::filesystem::path Project1FilePath = TempDir.Path() / "game1" / "game.uproject"; + { + CreateDirectories(Project1FilePath.parent_path()); + BasicFile ProjectFile; + ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); + } + + std::filesystem::path Project2RootDir = TempDir.Path() / "game2"; + std::filesystem::path Project2FilePath = TempDir.Path() / "game2" / "game.uproject"; + { + CreateDirectories(Project2FilePath.parent_path()); + BasicFile ProjectFile; + ProjectFile.Open(Project2FilePath, BasicFile::Mode::kTruncate); + } + + { + Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + Project1RootDir.string(), + Project1FilePath.string())); + ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1", {}); + CHECK(Oplog != nullptr); + + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}))); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122}))); + } + + { + Ref<ProjectStore::Project> Project2(ProjectStore.NewProject(BasePath / "proj2"sv, + "proj2"sv, + RootDir.string(), + EngineRootDir.string(), + Project2RootDir.string(), + Project2FilePath.string())); + ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog1", {}); + CHECK(Oplog != nullptr); + + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{177}))); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9123, 383, 590, 96}))); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{535, 221}))); + } + + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + ProjectStore.GatherReferences(GcCtx); + size_t RefCount = 0; + GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; }); + CHECK(RefCount == 14); + ProjectStore.CollectGarbage(GcCtx); + CHECK(ProjectStore.OpenProject("proj1"sv)); + CHECK(ProjectStore.OpenProject("proj2"sv)); + } + + std::filesystem::remove(Project1FilePath); + + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + ProjectStore.GatherReferences(GcCtx); + size_t RefCount = 0; + GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; }); + CHECK(RefCount == 7); + ProjectStore.CollectGarbage(GcCtx); + CHECK(!ProjectStore.OpenProject("proj1"sv)); + CHECK(ProjectStore.OpenProject("proj2"sv)); + } +} + +TEST_CASE("project.store.partial.read") +{ + using namespace std::literals; + using namespace testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; + ProjectStore ProjectStore(CidStore, BasePath, Gc); + std::filesystem::path RootDir = TempDir.Path() / "root"sv; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; + + std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv; + std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv; + { + CreateDirectories(Project1FilePath.parent_path()); + BasicFile ProjectFile; + ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); + } + + std::vector<Oid> OpIds; + OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()}); + std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; + { + Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + Project1RootDir.string(), + Project1FilePath.string())); + ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, {}); + CHECK(Oplog != nullptr); + Attachments[OpIds[0]] = {}; + Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77}); + Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{7123, 9583, 690, 99}); + Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122}); + for (auto It : Attachments) + { + Oplog->AppendNewOplogEntry(CreateOplogPackage(It.first, It.second)); + } + } + { + IoBuffer Chunk; + CHECK(ProjectStore + .GetChunk("proj1"sv, + "oplog1"sv, + Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(), + HttpContentType::kCompressedBinary, + Chunk) + .first == HttpResponseCode::OK); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); + CHECK(RawSize == Attachments[OpIds[1]][0].second.DecodeRawSize()); + } + + IoBuffer ChunkResult; + CHECK(ProjectStore + .GetChunkRange("proj1"sv, + "oplog1"sv, + OidAsString(Attachments[OpIds[2]][1].first), + 0, + ~0ull, + HttpContentType::kCompressedBinary, + ChunkResult) + .first == HttpResponseCode::OK); + CHECK(ChunkResult); + CHECK(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult)).DecodeRawSize() == + Attachments[OpIds[2]][1].second.DecodeRawSize()); + + IoBuffer PartialChunkResult; + CHECK(ProjectStore + .GetChunkRange("proj1"sv, + "oplog1"sv, + OidAsString(Attachments[OpIds[2]][1].first), + 5, + 1773, + HttpContentType::kCompressedBinary, + PartialChunkResult) + .first == HttpResponseCode::OK); + CHECK(PartialChunkResult); + IoHash PartialRawHash; + uint64_t PartialRawSize; + CompressedBuffer PartialCompressedResult = + CompressedBuffer::FromCompressed(SharedBuffer(PartialChunkResult), PartialRawHash, PartialRawSize); + CHECK(PartialRawSize >= 1773); + + uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5); + SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed); + SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress(); + const uint8_t* FullDataPtr = &(reinterpret_cast<const uint8_t*>(FullDecompressed.GetView().GetData())[5]); + const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData()); + CHECK(FullDataPtr[0] == PartialDataPtr[0]); +} +#endif + +void +prj_forcelink() +{ +} + +} // namespace zen |