diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-02 16:35:22 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-02 16:35:22 +0200 |
| commit | 289379b55d19e08c47afb54a363fda9478623b9d (patch) | |
| tree | da2eb6aa2f95833c0c7063c6f71dc9576512d7c1 /src/zenstore/projectstore.cpp | |
| parent | fix for RPC replay issue (wrong content-type) (#536) (diff) | |
| download | zen-289379b55d19e08c47afb54a363fda9478623b9d.tar.xz zen-289379b55d19e08c47afb54a363fda9478623b9d.zip | |
move projectstore to zenstore (#541)
Diffstat (limited to 'src/zenstore/projectstore.cpp')
| -rw-r--r-- | src/zenstore/projectstore.cpp | 8098 |
1 files changed, 8098 insertions, 0 deletions
diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp new file mode 100644 index 000000000..9a02cc7f0 --- /dev/null +++ b/src/zenstore/projectstore.cpp @@ -0,0 +1,8098 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/projectstore.h> + +#include <zencore/assertfmt.h> +#include <zencore/compactbinaryutil.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/except.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/memory/llm.h> +#include <zencore/scopeguard.h> +#include <zencore/stream.h> +#include <zencore/timer.h> +#include <zencore/trace.h> +#include <zenstore/caslog.h> +#include <zenstore/cidstore.h> +#include <zenstore/scrubcontext.h> +#include <zenutil/parallelwork.h> +#include <zenutil/referencemetadata.h> +#include <zenutil/workerpools.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_set.h> +#include <xxh3.h> +ZEN_THIRD_PARTY_INCLUDES_END + +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zenutil/chunkblock.h> + +# include <unordered_map> +#endif // ZEN_WITH_TESTS + +namespace zen { + +const FLLMTag& +GetProjectstoreTag() +{ + static FLLMTag _("store", FLLMTag("project")); + + return _; +} + +namespace { + bool PrepareDirectoryDelete(const std::filesystem::path& Dir, std::filesystem::path& OutDeleteDir) + { + std::filesystem::path DroppedBucketPath; + do + { + if (!IsDir(Dir)) + { + return true; + } + + StringBuilder<64> MovedId; + Oid::NewOid().ToString(MovedId); + + std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), MovedId); + DroppedBucketPath = Dir.parent_path() / DroppedName; + if (IsDir(DroppedBucketPath)) + { + if (!DeleteDirectories(DroppedBucketPath)) + { + ZEN_INFO("Drop directory '{}' for '{}' already exists but could not be removed, attempting different name.", + DroppedBucketPath, + Dir); + continue; + } + if (IsDir(DroppedBucketPath)) + { + ZEN_INFO("Drop directory '{}' for '{}' still exists after remove, attempting different name.", DroppedBucketPath, Dir); + continue; + } + } + + int RenameAttempt = 0; + do + { + std::error_code Ec; + RenameDirectory(Dir, DroppedBucketPath, Ec); + if (!Ec) + { + OutDeleteDir = DroppedBucketPath; + return true; + } + if (IsDir(DroppedBucketPath)) + { + ZEN_INFO("Can't rename '{}' to still existing drop directory '{}'. Reason: '{}'. Attempting different name.", + Dir, + DroppedBucketPath, + Ec.message()); + break; + } + if (++RenameAttempt == 10) + { + ZEN_INFO("Can't rename '{}' to drop directory '{}' after {} attempts. Reason: {}.", + Dir, + DroppedBucketPath, + RenameAttempt, + Ec.message()); + return false; + } + ZEN_INFO("Can't rename '{}' to drop directory '{}', pausing and retrying. Reason: {}.", + Dir, + DroppedBucketPath, + Ec.message()); + Sleep(100); + } while (true); + + } while (true); + + return false; + } + + bool IsFileOlderThan(const std::filesystem::path& CheckPath, const std::filesystem::path& ReferencePath) + { + std::error_code Ec; + std::filesystem::file_time_type CheckWriteTime = std::filesystem::last_write_time(CheckPath, Ec); + if (Ec) + { + return true; + } + std::filesystem::file_time_type ReferenceWriteTime = std::filesystem::last_write_time(ReferencePath, Ec); + if (Ec) + { + return true; + } + return CheckWriteTime < ReferenceWriteTime; + } + + std::pair<int32_t, int32_t> GetPagedRange(int32_t TotalSize, const ProjectStore::Oplog::Paging& EntryPaging) + { + int32_t Start = std::clamp(EntryPaging.Start, 0, TotalSize); + int32_t End = EntryPaging.Count < 0 ? TotalSize : (Start + std::min<int32_t>(EntryPaging.Count, TotalSize - Start)); + return {Start, End}; + } + +#pragma pack(push) +#pragma pack(1) + struct OplogIndexHeader + { + OplogIndexHeader() {} + + struct BodyV1 + { + uint64_t LogPosition = 0; + uint32_t LSNCount = 0; + uint64_t KeyCount = 0; + uint32_t OpAddressMapCount = 0; + uint32_t LatestOpMapCount = 0; + uint64_t ChunkMapCount = 0; + uint64_t MetaMapCount = 0; + uint64_t FileMapCount = 0; + }; + + struct BodyV2 + { + uint64_t LogPosition = 0; + uint64_t KeyCount = 0; + uint32_t OpPayloadIndexCount = 0; + uint32_t OpPayloadCount = 0; + uint32_t ChunkMapCount = 0; + uint32_t MetaMapCount = 0; + uint32_t FileMapCount = 0; + uint32_t MaxLSN = 0; + uint32_t NextOpCoreOffset = 0; // note: Multiple of oplog data alignment! + uint32_t Reserved[2] = {0, 0}; + }; + + static constexpr uint32_t ExpectedMagic = 0x7569647a; // 'zidx'; + static constexpr uint32_t Version1 = 1; + static constexpr uint32_t Version2 = 2; + static constexpr uint32_t CurrentVersion = Version2; + static constexpr uint64_t DataAlignment = 8; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + + union + { + BodyV1 V1; + BodyV2 V2; + }; + + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const OplogIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(OplogIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } + }; + +#pragma pack(pop) + + static_assert(sizeof(OplogIndexHeader) == 64); + + static std::uint64_t GetModificationTagFromRawHash(const IoHash& Hash) + { + IoHash::Hasher H; + return H(Hash); + } + + static std::uint64_t GetModificationTagFromModificationTime(IoBuffer FileBuffer) + { + IoBufferFileReference FileRef; + if (FileBuffer.GetFileReference(FileRef)) + { + std::error_code Ec; + uint64_t ModificationTick = GetModificationTickFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + return ModificationTick; + } + } + return {}; + } + +} // namespace + +////////////////////////////////////////////////////////////////////////// + +Oid +ComputeOpKey(const CbObjectView& Op) +{ + using namespace std::literals; + + eastl::fixed_vector<uint8_t, 256> KeyData; + + Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { + auto Begin = reinterpret_cast<const uint8_t*>(Data); + auto End = Begin + Size; + KeyData.insert(KeyData.end(), Begin, End); + }); + + XXH3_128 KeyHash128; + + // This logic currently exists to work around a problem caused by misusing the xxhash + // functions in the past. Short keys are evaluated using the old and buggy + // path but longer paths are evaluated properly. In the future all key lengths + // should be evaluated using the proper path, this is a temporary workaround to + // maintain compatibility with existing disk state. + if (KeyData.size() < 240) + { + XXH3_128Stream_deprecated KeyHasher; + KeyHasher.Append(KeyData.data(), KeyData.size()); + KeyHash128 = KeyHasher.GetHash(); + } + else + { + KeyHash128 = XXH3_128::HashMemory(KeyData.data(), KeyData.size()); + } + + Oid KeyHash; + memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); + + return KeyHash; +} + +struct ProjectStore::OplogStorage : public RefCounted +{ + OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath) + { + } + + ~OplogStorage() + { + ZEN_DEBUG("oplog '{}/{}': closing oplog storage at {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + m_OplogStoragePath); + try + { + m_Oplog.Close(); + m_OpBlobs.Close(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("oplog '{}/{}': flushing oplog at '{}' failed. Reason: '{}'", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + m_OplogStoragePath, + Ex.what()); + } + } + + [[nodiscard]] bool Exists() const { return Exists(m_OplogStoragePath); } + [[nodiscard]] static bool Exists(const std::filesystem::path& BasePath) + { + return IsFile(GetLogPath(BasePath)) && IsFile(GetBlobsPath(BasePath)); + } + [[nodiscard]] bool IsValid() const { return IsValid(m_OplogStoragePath); } + [[nodiscard]] static bool IsValid(const std::filesystem::path& BasePath) + { + return TCasLogFile<OplogEntry>::IsValid(GetLogPath(BasePath)); + } + void WipeState() const + { + std::error_code Ec; + RemoveFile(GetLogPath(), Ec); + RemoveFile(GetBlobsPath(), Ec); + } + + static bool Delete(const std::filesystem::path& BasePath) { return DeleteDirectories(BasePath); } + + uint64_t OpBlobsSize() const { return FileSizeFromPath(GetBlobsPath()); } + + uint64_t OpsSize() const { return OpsSize(m_OplogStoragePath); } + static uint64_t OpsSize(const std::filesystem::path& BasePath) + { + if (Exists(BasePath)) + { + std::error_code DummyEc; + return FileSizeFromPath(GetLogPath(BasePath)) + FileSizeFromPath(GetBlobsPath(BasePath)); + } + return 0; + } + + uint32_t MaxLSN() const { return m_MaxLsn; } + void SetMaxLSNAndNextWriteAddress(uint32_t MaxLSN, const OplogEntryAddress& NextOpFileOffset) + { + m_MaxLsn.store(MaxLSN); + m_NextOpsOffset = RoundUp((NextOpFileOffset.Offset * m_OpsAlign) + NextOpFileOffset.Size, m_OpsAlign); + } + + void SetMaxLSNAndNextOpOffset(uint32_t MaxLSN, uint32_t NextOpOffset) + { + m_MaxLsn.store(MaxLSN); + m_NextOpsOffset = NextOpOffset * m_OpsAlign; + } + + uint32_t GetNextOpOffset() const { return gsl::narrow<uint32_t>(m_NextOpsOffset / m_OpsAlign); } + + enum EMode + { + Create, + Read, + Write + }; + + void Open(EMode InMode) + { + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::OplogStorage::Open"); + + CasLogFile::Mode LogMode = CasLogFile::Mode::kRead; + BasicFile::Mode BlobsMode = BasicFile::Mode::kRead; + + if (InMode == EMode::Create) + { + ZEN_DEBUG("oplog '{}/{}': initializing storage at '{}'", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + m_OplogStoragePath); + + DeleteDirectories(m_OplogStoragePath); + CreateDirectories(m_OplogStoragePath); + + LogMode = CasLogFile::Mode::kTruncate; + BlobsMode = BasicFile::Mode::kTruncate; + } + else if (InMode == EMode::Write) + { + LogMode = CasLogFile::Mode::kWrite; + BlobsMode = BasicFile::Mode::kWrite; + ZEN_DEBUG("oplog '{}/{}': opening storage at '{}'", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + m_OplogStoragePath); + } + else if (InMode == EMode::Read) + { + ZEN_DEBUG("oplog '{}/{}': opening read only storage at '{}'", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + m_OplogStoragePath); + } + + m_Oplog.Open(GetLogPath(m_OplogStoragePath), LogMode); + m_Oplog.Initialize(); + + m_OpBlobs.Open(GetBlobsPath(m_OplogStoragePath), BlobsMode); + + ZEN_ASSERT(IsPow2(m_OpsAlign)); + ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1))); + } + + IoBuffer GetOpBuffer(BasicFileBuffer& OpBlobsBuffer, const OplogEntry& LogEntry) const + { + ZEN_MEMSCOPE(GetProjectstoreTag()); + + const uint64_t OpFileOffset = LogEntry.OpCoreAddress.Offset * m_OpsAlign; + const MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreAddress.Size, OpFileOffset); + if (OpBufferView.GetSize() == LogEntry.OpCoreAddress.Size) + { + return IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize()); + } + else + { + IoBuffer OpBuffer(LogEntry.OpCoreAddress.Size); + OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreAddress.Size, OpFileOffset); + return OpBuffer; + } + } + + uint64_t GetEffectiveBlobsSize(std::span<const Oplog::OplogPayload> Addresses) const + { + uint64_t EffectiveSize = 0; + for (const Oplog::OplogPayload& OpPayload : Addresses) + { + EffectiveSize += RoundUp(OpPayload.Address.Size, m_OpsAlign); + } + return EffectiveSize; + } + + void Compact(std::span<const ProjectStore::LogSequenceNumber> LSNs, + std::function<void(const Oid& OpKeyHash, + ProjectStore::LogSequenceNumber OldLSN, + ProjectStore::LogSequenceNumber NewLSN, + const OplogEntryAddress& NewAddress)>&& Callback, + bool RetainLSNs, + bool DryRun) + { + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::OplogStorage::Compact"); + + ZEN_INFO("oplog '{}/{}': compacting at '{}'", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + m_OplogStoragePath); + + Stopwatch Timer; + + StringBuilder<64> OplogName; + Oid::NewOid().ToString(OplogName); + + std::filesystem::path OplogPath = m_OplogStoragePath / OplogName.c_str(); + + std::error_code Ec; + TCasLogFile<OplogEntry> Oplog; + Oplog.Open(OplogPath, CasLogFile::Mode::kTruncate); + (void)Oplog.Initialize(); + + TemporaryFile OpBlobs; + OpBlobs.CreateTemporary(m_OplogStoragePath, Ec); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to create temp file for op blob at '{}'", m_OplogStoragePath)); + } + + try + { + std::vector<OplogEntry> Ops; + Ops.reserve(LSNs.size()); + + ProjectStore::LsnMap<size_t> LSNToIndex; + LSNToIndex.reserve(LSNs.size()); + for (ProjectStore::LogSequenceNumber LSN : LSNs) + { + LSNToIndex[LSN] = (size_t)-1; + } + + RwLock::ExclusiveLockScope Lock(m_RwLock); + + const uint64_t SkipEntryCount = 0; + m_Oplog.Replay( + [&](const OplogEntry& LogEntry) { + if (auto It = LSNToIndex.find(LogEntry.OpLsn); It != LSNToIndex.end()) + { + if (It->second != (size_t)-1) + { + Ops[It->second] = LogEntry; + } + else + { + LSNToIndex[LogEntry.OpLsn] = Ops.size(); + Ops.push_back(LogEntry); + } + } + }, + SkipEntryCount); + + std::sort(Ops.begin(), Ops.end(), [&](const OplogEntry& Lhs, const OplogEntry& Rhs) { + return Lhs.OpCoreAddress.Offset < Rhs.OpCoreAddress.Offset; + }); + + std::vector<ProjectStore::LogSequenceNumber> OldLSNs; + OldLSNs.reserve(Ops.size()); + + uint64_t OpWriteOffset = 0; + uint32_t MaxLSN = 0; + { + BasicFileBuffer OldBlobsBuffer(m_OpBlobs, 65536); + BasicFileWriter NewOpBlobsBuffer(OpBlobs, 65536); + + for (OplogEntry& LogEntry : Ops) + { + OldLSNs.push_back(LogEntry.OpLsn); + + IoBuffer OpBuffer = GetOpBuffer(OldBlobsBuffer, LogEntry); + if (RetainLSNs) + { + MaxLSN = Max(MaxLSN, LogEntry.OpLsn.Number); + } + else + { + LogEntry.OpLsn = ProjectStore::LogSequenceNumber(++MaxLSN); + } + LogEntry.OpCoreAddress.Offset = gsl::narrow<uint32_t>(OpWriteOffset / m_OpsAlign); + NewOpBlobsBuffer.Write(OpBuffer.GetData(), LogEntry.OpCoreAddress.Size, OpWriteOffset); + OpWriteOffset = RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign); + } + Oplog.Append(Ops); + } + + uint64_t OldOpLogSize = m_Oplog.GetLogSize(); + uint64_t OldOpBlobsSize = m_OpBlobs.FileSize(); + + if (!DryRun) + { + m_Oplog.Close(); + m_OpBlobs.Close(); + + Oplog.Close(); + RenameFile(OplogPath, GetLogPath(), Ec); + if (Ec) + { + throw std::system_error( + Ec, + fmt::format("Oplog::Compact failed to rename temporary oplog blob storage file from '{}' to '{}'", + OplogPath, + GetLogPath())); + } + OpBlobs.MoveTemporaryIntoPlace(GetBlobsPath(), Ec); + if (Ec) + { + // We failed late - clean everything up as best we can + RemoveFile(OpBlobs.GetPath(), Ec); + RemoveFile(GetLogPath(), Ec); + RemoveFile(GetBlobsPath(), Ec); + throw std::system_error(Ec, + fmt::format("Oplog::Compact failed to rename temporary oplog file from '{}' to '{}'", + OpBlobs.GetPath(), + GetBlobsPath())); + } + + m_Oplog.Open(GetLogPath(), CasLogFile::Mode::kWrite); + m_Oplog.Initialize(); + + m_OpBlobs.Open(GetBlobsPath(), BasicFile::Mode::kWrite); + + m_MaxLsn.store(MaxLSN); + m_NextOpsOffset.store(OpWriteOffset); + } + + for (size_t Index = 0; Index < Ops.size(); Index++) + { + const OplogEntry& LogEntry = Ops[Index]; + Callback(LogEntry.OpKeyHash, OldLSNs[Index], LogEntry.OpLsn, LogEntry.OpCoreAddress); + } + + ZEN_INFO("oplog '{}/{}': compact completed in {} - Max LSN# {}, New size: {}, old size {}.", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + m_MaxLsn.load(), + NiceBytes(m_Oplog.GetLogSize() + m_OpBlobs.FileSize()), + NiceBytes(OldOpLogSize + OldOpBlobsSize)); + } + catch (const std::exception& /*Ex*/) + { + RemoveFile(OpBlobs.GetPath(), Ec); + throw; + } + } + + static std::filesystem::path GetLogPath(const std::filesystem::path& OplogStoragePath) + { + using namespace std::literals; + return OplogStoragePath / "ops.zlog"sv; + } + + static std::filesystem::path GetBlobsPath(const std::filesystem::path& OplogStoragePath) + { + using namespace std::literals; + return OplogStoragePath / "ops.zops"sv; + } + + std::filesystem::path GetLogPath() const { return GetLogPath(m_OplogStoragePath); } + + std::filesystem::path GetBlobsPath() const { return GetBlobsPath(m_OplogStoragePath); } + + void ReadOplogEntriesFromLog(std::vector<OplogEntry>& OutOpLogEntries, uint64_t& OutInvalidEntries, uint64_t SkipEntryCount) + { + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::OplogStorage::ReadOplogEntriesFromLog"); + + if (m_Oplog.GetLogCount() == SkipEntryCount) + { + return; + } + + Stopwatch Timer; + + uint64_t OpsBlockSize = m_OpBlobs.FileSize(); + + { + tsl::robin_map<Oid, size_t, Oid::Hasher> LatestKeys; + if (m_Oplog.GetLogCount() > SkipEntryCount) + { + LatestKeys.reserve(m_Oplog.GetLogCount() - SkipEntryCount); + } + + m_Oplog.Replay( + [&](const OplogEntry& LogEntry) { + if (LogEntry.IsTombstone()) + { + if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It == LatestKeys.end()) + { + ZEN_SCOPED_WARN("found tombstone referencing unknown key {}", LogEntry.OpKeyHash); + ++OutInvalidEntries; + return; + } + } + else if (LogEntry.OpCoreAddress.Size == 0) + { + ZEN_SCOPED_WARN("skipping zero size op {}", LogEntry.OpKeyHash); + ++OutInvalidEntries; + return; + } + else if (LogEntry.OpLsn.Number == 0) + { + ZEN_SCOPED_WARN("skipping zero lsn op {}", LogEntry.OpKeyHash); + ++OutInvalidEntries; + return; + } + + const uint64_t OpFileOffset = LogEntry.OpCoreAddress.Offset * m_OpsAlign; + if ((OpFileOffset + LogEntry.OpCoreAddress.Size) > OpsBlockSize) + { + ZEN_SCOPED_WARN("skipping out of bounds op {}", LogEntry.OpKeyHash); + ++OutInvalidEntries; + return; + } + + if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It != LatestKeys.end()) + { + OplogEntry& Entry = OutOpLogEntries[It->second]; + + if (LogEntry.IsTombstone() && Entry.IsTombstone()) + { + ZEN_SCOPED_WARN("found double tombstone - {}", LogEntry.OpKeyHash); + } + + Entry = LogEntry; + } + else + { + const size_t OpIndex = OutOpLogEntries.size(); + LatestKeys[LogEntry.OpKeyHash] = OpIndex; + OutOpLogEntries.push_back(LogEntry); + } + }, + SkipEntryCount); + } + } + + void ReplayLog(std::function<void(CbObjectView, const OplogEntry&)>&& Handler, uint64_t SkipEntryCount = 0) + { + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::OplogStorage::ReplayLog"); + + if (m_Oplog.GetLogCount() == SkipEntryCount) + { + return; + } + + Stopwatch Timer; + + std::vector<OplogEntry> OpLogEntries; + uint64_t InvalidEntries = 0; + + ReadOplogEntriesFromLog(OpLogEntries, InvalidEntries, SkipEntryCount); + + uint64_t TombstoneEntries = 0; + + BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); + + uint32_t MaxOpLsn = m_MaxLsn; + uint64_t NextOpFileOffset = m_NextOpsOffset; + + std::sort(OpLogEntries.begin(), OpLogEntries.end(), [&](const OplogEntry& Lhs, const OplogEntry& Rhs) { + return Lhs.OpCoreAddress.Offset < Rhs.OpCoreAddress.Offset; + }); + + for (const OplogEntry& LogEntry : OpLogEntries) + { + if (LogEntry.IsTombstone()) + { + TombstoneEntries++; + } + else + { + IoBuffer OpBuffer = GetOpBuffer(OpBlobsBuffer, LogEntry); + + // Verify checksum, ignore op data if incorrect + + const uint32_t ExpectedOpCoreHash = LogEntry.OpCoreHash; + const uint32_t OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.GetData(), LogEntry.OpCoreAddress.Size) & 0xffffFFFF); + + if (OpCoreHash != ExpectedOpCoreHash) + { + ZEN_WARN("oplog '{}/{}': skipping bad checksum op - {}. Expected: {}, found: {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + LogEntry.OpKeyHash, + ExpectedOpCoreHash, + OpCoreHash); + } + else if (CbValidateError Err = ValidateCompactBinary(OpBuffer.GetView(), CbValidateMode::Default); + Err != CbValidateError::None) + { + ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Error: '{}'", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + LogEntry.OpKeyHash, + ToString(Err)); + } + else + { + CbObjectView OpView(OpBuffer.GetData()); + if (OpView.GetSize() != OpBuffer.GetSize()) + { + ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + LogEntry.OpKeyHash, + OpView.GetSize(), + OpBuffer.GetSize()); + } + else + { + Handler(OpView, LogEntry); + MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn.Number); + const uint64_t EntryNextOpFileOffset = + RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign); + NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset); + } + } + } + } + + m_MaxLsn = MaxOpLsn; + m_NextOpsOffset = NextOpFileOffset; + + ZEN_INFO("oplog '{}/{}': replay from '{}' completed in {} - Max LSN# {}, Next offset: {}, {} tombstones, {} invalid entries", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + m_OplogStoragePath, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + m_MaxLsn.load(), + m_NextOpsOffset.load(), + TombstoneEntries, + InvalidEntries); + } + + void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries, + const std::span<const Oplog::PayloadIndex> Order, + std::function<void(LogSequenceNumber Lsn, CbObjectView)>&& Handler) + { + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries"); + + BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); + + for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order) + { + const Oplog::OplogPayload& Entry = Entries[EntryOffset]; + + const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign; + MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset); + if (OpBufferView.GetSize() == Entry.Address.Size) + { + if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None) + { + CbObjectView OpView(OpBufferView.GetData()); + if (OpView.GetSize() != OpBufferView.GetSize()) + { + ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + OpView.GetSize(), + OpBufferView.GetSize()); + } + else + { + Handler(Entry.Lsn, OpView); + } + } + else + { + ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + ToString(Error)); + } + } + else + { + IoBuffer OpBuffer(Entry.Address.Size); + OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset); + OpBufferView = OpBuffer.GetView(); + if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None) + { + CbObjectView OpView(OpBuffer.Data()); + if (OpView.GetSize() != OpBuffer.GetSize()) + { + ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + OpView.GetSize(), + OpBuffer.GetSize()); + } + else + { + Handler(Entry.Lsn, OpView); + } + } + else + { + ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + ToString(Error)); + } + } + } + } + + CbObject GetOp(const OplogEntryAddress& Entry) + { + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::OplogStorage::GetOp"); + + IoBuffer OpBuffer(Entry.Size); + + const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign; + m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset); + + return CbObject(SharedBuffer(std::move(OpBuffer))); + } + + struct AppendOpData + { + MemoryView Buffer; + uint32_t OpCoreHash; + Oid KeyHash; + }; + + static OplogStorage::AppendOpData GetAppendOpData(const CbObjectView& Core) + { + ZEN_MEMSCOPE(GetProjectstoreTag()); + using namespace std::literals; + + AppendOpData OpData; + + OpData.Buffer = Core.GetView(); + const uint64_t WriteSize = OpData.Buffer.GetSize(); + OpData.OpCoreHash = uint32_t(XXH3_64bits(OpData.Buffer.GetData(), WriteSize) & 0xffffFFFF); + + ZEN_ASSERT(WriteSize != 0); + + OpData.KeyHash = ComputeOpKey(Core); + + return OpData; + } + + OplogEntry AppendOp(const AppendOpData& OpData) + { + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::OplogStorage::AppendOp"); + + uint64_t WriteSize = OpData.Buffer.GetSize(); + + RwLock::ExclusiveLockScope Lock(m_RwLock); + const uint64_t WriteOffset = m_NextOpsOffset; + const LogSequenceNumber OpLsn(++m_MaxLsn); + if (!OpLsn.Number) + { + ZEN_ERROR("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId()); + throw std::runtime_error(fmt::format("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId())); + } + m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign); + Lock.ReleaseNow(); + + ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign)); + + OplogEntry Entry = {.OpLsn = OpLsn, + .OpCoreAddress = {.Offset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign), .Size = uint32_t(WriteSize)}, + .OpCoreHash = OpData.OpCoreHash, + .OpKeyHash = OpData.KeyHash}; + + m_OpBlobs.Write(OpData.Buffer.GetData(), WriteSize, WriteOffset); + m_Oplog.Append(Entry); + + return Entry; + } + + std::vector<OplogEntry> AppendOps(std::span<const AppendOpData> Ops) + { + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::OplogStorage::AppendOps"); + + size_t OpCount = Ops.size(); + std::vector<std::pair<uint64_t, uint64_t>> OffsetAndSizes; + std::vector<LogSequenceNumber> OpLsns; + OffsetAndSizes.resize(OpCount); + OpLsns.resize(OpCount); + + for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) + { + OffsetAndSizes[OpIndex].second = Ops[OpIndex].Buffer.GetSize(); + } + + uint64_t WriteStart = 0; + uint64_t WriteLength = 0; + { + RwLock::ExclusiveLockScope Lock(m_RwLock); + WriteStart = m_NextOpsOffset; + ZEN_ASSERT(IsMultipleOf(WriteStart, m_OpsAlign)); + uint64_t WriteOffset = WriteStart; + for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) + { + OffsetAndSizes[OpIndex].first = WriteOffset - WriteStart; + OpLsns[OpIndex] = LogSequenceNumber(++m_MaxLsn); + if (!OpLsns[OpIndex]) + { + ZEN_ERROR("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId()); + throw std::runtime_error(fmt::format("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId())); + } + WriteOffset = RoundUp(WriteOffset + OffsetAndSizes[OpIndex].second, m_OpsAlign); + } + WriteLength = WriteOffset - WriteStart; + m_NextOpsOffset = RoundUp(WriteOffset, m_OpsAlign); + } + + IoBuffer WriteBuffer(WriteLength); + + std::vector<OplogEntry> Entries; + Entries.resize(OpCount); + for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) + { + MutableMemoryView WriteBufferView = WriteBuffer.GetMutableView().RightChop(OffsetAndSizes[OpIndex].first); + WriteBufferView.CopyFrom(Ops[OpIndex].Buffer); + Entries[OpIndex] = { + .OpLsn = OpLsns[OpIndex], + .OpCoreAddress = {.Offset = gsl::narrow_cast<uint32_t>((WriteStart + OffsetAndSizes[OpIndex].first) / m_OpsAlign), + .Size = uint32_t(OffsetAndSizes[OpIndex].second)}, + .OpCoreHash = Ops[OpIndex].OpCoreHash, + .OpKeyHash = Ops[OpIndex].KeyHash}; + } + + m_OpBlobs.Write(WriteBuffer.GetData(), WriteBuffer.GetSize(), WriteStart); + m_Oplog.Append(Entries); + + return Entries; + } + + void AppendTombstone(Oid KeyHash) + { + OplogEntry Entry = {.OpKeyHash = KeyHash}; + Entry.MakeTombstone(); + + m_Oplog.Append(Entry); + } + + void Flush() + { + m_Oplog.Flush(); + m_OpBlobs.Flush(); + } + uint64_t LogCount() const { return m_Oplog.GetLogCount(); } + + LoggerRef Log() { return m_OwnerOplog->Log(); } + +private: + ProjectStore::Oplog* m_OwnerOplog; + std::filesystem::path m_OplogStoragePath; + mutable RwLock m_RwLock; + TCasLogFile<OplogEntry> m_Oplog; + BasicFile m_OpBlobs; + std::atomic<uint64_t> m_NextOpsOffset{0}; + uint64_t m_OpsAlign = 32; + std::atomic<uint32_t> m_MaxLsn{0}; +}; + +////////////////////////////////////////////////////////////////////////// + +ProjectStore::Oplog::Oplog(const LoggerRef& InLog, + std::string_view ProjectIdentifier, + std::string_view Id, + CidStore& Store, + const std::filesystem::path& BasePath, + const std::filesystem::path& MarkerPath, + EMode State) +: m_Log(InLog) +, m_OuterProjectId(ProjectIdentifier) +, m_OplogId(Id) +, m_CidStore(Store) +, m_BasePath(BasePath) +, m_MarkerPath(MarkerPath) +, m_TempPath(m_BasePath / std::string_view("temp")) +, m_MetaPath(m_BasePath / std::string_view("ops.meta")) +, m_Mode(State) +, m_MetaValid(false) +, m_PendingPrepOpAttachmentsRetainEnd(GcClock::Now()) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + using namespace std::literals; + + m_Storage = new OplogStorage(this, m_BasePath); + OplogStorage::EMode StorageMode = OplogStorage::EMode::Write; + if (m_Mode == EMode::kBasicReadOnly) + { + StorageMode = OplogStorage::EMode::Read; + } + else + { + bool StoreExists = m_Storage->Exists(); + if (StoreExists) + { + if (!m_Storage->IsValid()) + { + ZEN_WARN("Invalid oplog found at '{}'. Wiping state for oplog.", m_BasePath); + m_Storage->WipeState(); + std::error_code DummyEc; + RemoveFile(m_MetaPath, DummyEc); + StoreExists = false; + } + } + if (!StoreExists) + { + StorageMode = OplogStorage::EMode::Create; + } + } + m_Storage->Open(StorageMode); + m_MetaValid = !IsFileOlderThan(m_MetaPath, m_Storage->GetBlobsPath()); + + CleanDirectory(m_TempPath, /*ForceRemoveReadOnlyFiles*/ false); +} + +ProjectStore::Oplog::~Oplog() +{ + if (m_Storage) + { + Flush(); + } +} + +void +ProjectStore::Oplog::Flush() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + ZEN_TRACE_CPU("Oplog::Flush"); + + if (m_Mode == EMode::kFull) + { + RwLock::SharedLockScope Lock(m_OplogLock); + + if (!m_Storage) + { + return; + } + + m_Storage->Flush(); + if (!m_MetaValid) + { + std::error_code DummyEc; + RemoveFile(m_MetaPath, DummyEc); + } + + uint64_t LogCount = m_Storage->LogCount(); + if (m_LogFlushPosition != LogCount || m_IsLegacySnapshot) + { + WriteIndexSnapshot(); + } + } +} + +void +ProjectStore::Oplog::RefreshLsnToPayloadOffsetMap(RwLock::ExclusiveLockScope&) +{ + if (!m_LsnToPayloadOffsetMap) + { + m_LsnToPayloadOffsetMap = std::make_unique<LsnMap<PayloadIndex>>(); + m_LsnToPayloadOffsetMap->reserve(m_OpLogPayloads.size()); + for (uint32_t PayloadOffset = 0; PayloadOffset < m_OpLogPayloads.size(); PayloadOffset++) + { + const OplogPayload& Payload = m_OpLogPayloads[PayloadOffset]; + if (Payload.Lsn.Number != 0 && Payload.Address.Size != 0) + { + m_LsnToPayloadOffsetMap->insert_or_assign(Payload.Lsn, PayloadIndex(PayloadOffset)); + } + } + } +} + +void +ProjectStore::Oplog::Scrub(ScrubContext& Ctx) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + ZEN_ASSERT(m_Mode == EMode::kFull); + + std::vector<std::pair<LogSequenceNumber, Oid>> BadEntries; + + using namespace std::literals; + + IterateOplogWithKey([&](LogSequenceNumber Lsn, const Oid& Key, CbObjectView Op) { + { + const Oid KeyHash = ComputeOpKey(Op); + if (KeyHash != Key) + { + BadEntries.push_back({Lsn, Key}); + ZEN_WARN("Scrub: oplog data does not match information from index (op:{} != index:{})", KeyHash, Key); + return; + } + } + + // TODO: Should we really delete an Op because it points to a missing or malformed Cid chunk? + + Op.IterateAttachments([&](CbFieldView Visitor) { + const IoHash Cid = Visitor.AsAttachment(); + if (Ctx.IsBadCid(Cid)) + { + // oplog entry references a CAS chunk which has been flagged as bad + BadEntries.push_back({Lsn, Key}); + return; + } + if (!m_CidStore.ContainsChunk(Cid)) + { + // oplog entry references a CAS chunk which is not present + BadEntries.push_back({Lsn, Key}); + return; + } + }); + }); + + if (!BadEntries.empty()) + { + if (Ctx.RunRecovery()) + { + ZEN_WARN("oplog '{}/{}': scrubbing found {} bad ops in oplog @ '{}', these will be removed from the index", + m_OuterProjectId, + m_OplogId, + BadEntries.size(), + m_BasePath); + + // Actually perform some clean-up + RwLock::ExclusiveLockScope Lock(m_OplogLock); + + RefreshLsnToPayloadOffsetMap(Lock); + + for (const auto& BadEntry : BadEntries) + { + const LogSequenceNumber BadLsn = BadEntry.first; + const Oid& BadKey = BadEntry.second; + if (auto It = m_OpToPayloadOffsetMap.find(BadKey); It != m_OpToPayloadOffsetMap.end()) + { + OplogPayload& Payload = m_OpLogPayloads[It->second]; + if (Payload.Lsn == BadLsn) + { + m_OpToPayloadOffsetMap.erase(It); + m_Storage->AppendTombstone(BadKey); + } + } + if (auto LsnIt = m_LsnToPayloadOffsetMap->find(BadLsn); LsnIt != m_LsnToPayloadOffsetMap->end()) + { + const PayloadIndex LsnPayloadOffset = LsnIt->second; + OplogPayload& LsnPayload = m_OpLogPayloads[LsnPayloadOffset]; + LsnPayload = {.Lsn = LogSequenceNumber(0), .Address = {.Offset = 0, .Size = 0}}; + } + m_LsnToPayloadOffsetMap->erase(BadLsn); + } + if (!BadEntries.empty()) + { + m_MetaValid = false; + } + } + else + { + ZEN_WARN("oplog '{}/{}': scrubbing found {} bad ops in oplog @ '{}' but no cleanup will be performed", + m_OuterProjectId, + m_OplogId, + BadEntries.size(), + m_BasePath); + } + } +} + +uint64_t +ProjectStore::Oplog::TotalSize(const std::filesystem::path& BasePath) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + using namespace std::literals; + + uint64_t Size = OplogStorage::OpsSize(BasePath); + std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; + if (IsFile(StateFilePath)) + { + Size += FileSizeFromPath(StateFilePath); + } + std::filesystem::path MetaFilePath = BasePath / "ops.meta"sv; + if (IsFile(MetaFilePath)) + { + Size += FileSizeFromPath(MetaFilePath); + } + std::filesystem::path IndexFilePath = BasePath / "ops.zidx"sv; + if (IsFile(IndexFilePath)) + { + Size += FileSizeFromPath(IndexFilePath); + } + + return Size; +} + +uint64_t +ProjectStore::Oplog::TotalSize() const +{ + return TotalSize(m_BasePath); +} + +void +ProjectStore::Oplog::ResetState() +{ + RwLock::ExclusiveLockScope _(m_OplogLock); + m_ChunkMap.clear(); + m_MetaMap.clear(); + m_FileMap.clear(); + m_OpToPayloadOffsetMap.clear(); + m_OpLogPayloads.clear(); + m_LsnToPayloadOffsetMap.reset(); + m_Storage = {}; +} + +bool +ProjectStore::Oplog::PrepareForDelete(std::filesystem::path& OutRemoveDirectory) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + RwLock::ExclusiveLockScope _(m_OplogLock); + m_UpdateCaptureRefCounter = 0; + m_CapturedOps.reset(); + m_CapturedAttachments.reset(); + m_PendingPrepOpAttachments.clear(); + m_ChunkMap.clear(); + m_MetaMap.clear(); + m_FileMap.clear(); + m_OpToPayloadOffsetMap.clear(); + m_OpLogPayloads.clear(); + m_LsnToPayloadOffsetMap.reset(); + m_Storage = {}; + if (PrepareDirectoryDelete(m_BasePath, OutRemoveDirectory)) + { + return true; + } + return false; +} + +bool +ProjectStore::Oplog::ExistsAt(const std::filesystem::path& BasePath) +{ + using namespace std::literals; + + std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; + return IsFile(StateFilePath); +} + +bool +ProjectStore::Oplog::Exists() const +{ + return ExistsAt(m_BasePath); +} + +void +ProjectStore::Oplog::Read() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + using namespace std::literals; + ZEN_TRACE_CPU("Oplog::Read"); + ZEN_LOG_SCOPE("Oplog::Read '{}'", m_OplogId); + + ZEN_DEBUG("oplog '{}/{}': reading from '{}'. State: {}", + m_OuterProjectId, + m_OplogId, + m_BasePath, + m_Mode == EMode::kFull ? "Full" : "BasicNoLookups"); + + std::optional<CbObject> Config = ReadStateFile(m_BasePath, [this]() { return Log(); }); + if (Config.has_value()) + { + if (Config.value().GetSize() == 0) + { + // Invalid config file + return; + } + m_MarkerPath = Config.value()["gcpath"sv].AsU8String(); + } + + if (!m_MetaValid) + { + std::error_code DummyEc; + RemoveFile(m_MetaPath, DummyEc); + } + + ZEN_ASSERT(!m_LsnToPayloadOffsetMap); + + ReadIndexSnapshot(); + + if (m_Mode == EMode::kFull) + { + m_Storage->ReplayLog( + [&](CbObjectView Op, const OplogEntry& OpEntry) { + const OplogEntryMapping OpMapping = GetMapping(Op); + // Update chunk id maps + for (const ChunkMapping& Chunk : OpMapping.Chunks) + { + m_ChunkMap.insert_or_assign(Chunk.Id, Chunk.Hash); + } + + for (const FileMapping& File : OpMapping.Files) + { + if (File.Hash != IoHash::Zero) + { + m_ChunkMap.insert_or_assign(File.Id, File.Hash); + } + m_FileMap.insert_or_assign(File.Id, + FileMapEntry{.ServerPath = File.Hash == IoHash::Zero ? File.ServerPath : std::string(), + .ClientPath = File.ClientPath}); + } + + for (const ChunkMapping& Meta : OpMapping.Meta) + { + m_MetaMap.insert_or_assign(Meta.Id, Meta.Hash); + } + + const PayloadIndex PayloadOffset(m_OpLogPayloads.size()); + + m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset); + m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress}); + }, + m_LogFlushPosition); + + if (m_Storage->LogCount() != m_LogFlushPosition || m_IsLegacySnapshot) + { + WriteIndexSnapshot(); + } + } + else + { + std::vector<OplogEntry> OpLogEntries; + uint64_t InvalidEntries; + m_Storage->ReadOplogEntriesFromLog(OpLogEntries, InvalidEntries, m_LogFlushPosition); + for (const OplogEntry& OpEntry : OpLogEntries) + { + const PayloadIndex PayloadOffset(m_OpLogPayloads.size()); + + m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset); + m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress}); + } + } +} + +void +ProjectStore::Oplog::Write() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_ASSERT(m_Mode != EMode::kBasicReadOnly); + + using namespace std::literals; + + BinaryWriter Mem; + + CbObjectWriter Cfg; + + Cfg << "gcpath"sv << PathToUtf8(m_MarkerPath); + + Cfg.Save(Mem); + + std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; + + ZEN_DEBUG("oplog '{}/{}': persisting config to '{}'", m_OuterProjectId, m_OplogId, StateFilePath); + + TemporaryFile::SafeWriteFile(StateFilePath, Mem.GetView()); +} + +void +ProjectStore::Oplog::Update(const std::filesystem::path& MarkerPath) +{ + if (m_MarkerPath == MarkerPath) + { + return; + } + Write(); +} + +bool +ProjectStore::Oplog::Reset() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_ASSERT(m_Mode == EMode::kFull); + std::filesystem::path MovedDir; + + { + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + m_Storage = {}; + if (!PrepareDirectoryDelete(m_BasePath, MovedDir)) + { + m_Storage = new OplogStorage(this, m_BasePath); + const bool StoreExists = m_Storage->Exists(); + m_Storage->Open(StoreExists ? OplogStorage::EMode::Write : OplogStorage::EMode::Create); + m_MetaValid = !IsFileOlderThan(m_MetaPath, m_Storage->GetBlobsPath()); + return false; + } + m_ChunkMap.clear(); + m_MetaMap.clear(); + m_FileMap.clear(); + m_OpToPayloadOffsetMap.clear(); + m_OpLogPayloads.clear(); + m_LsnToPayloadOffsetMap.reset(); + m_Storage = new OplogStorage(this, m_BasePath); + m_Storage->Open(OplogStorage::EMode::Create); + m_MetaValid = false; + CleanDirectory(m_TempPath, /*ForceRemoveReadOnlyFiles*/ false); + Write(); + } + // Erase content on disk + if (!MovedDir.empty()) + { + OplogStorage::Delete(MovedDir); + } + return true; +} + +bool +ProjectStore::Oplog::CanUnload() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + RwLock::SharedLockScope _(m_OplogLock); + + uint64_t LogCount = m_Storage->LogCount(); + if (m_LogFlushPosition != LogCount) + { + return false; // The oplog is not flushed so likely this is an active oplog + } + + if (!m_PendingPrepOpAttachments.empty()) + { + return false; // We have a pending oplog prep operation in flight + } + if (m_UpdateCaptureRefCounter > 0) + { + return false; // GC capture is enable for the oplog + } + return true; +} + +std::optional<CbObject> +ProjectStore::Oplog::ReadStateFile(const std::filesystem::path& BasePath, std::function<LoggerRef()>&& Log) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Oplog::ReadStateFile"); + using namespace std::literals; + + std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; + if (IsFile(StateFilePath)) + { + // ZEN_INFO("oplog '{}/{}': config read from '{}'", m_OuterProjectId, m_OplogId, StateFilePath); + + BasicFile Blob; + Blob.Open(StateFilePath, BasicFile::Mode::kRead); + + IoBuffer Obj = Blob.ReadAll(); + CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); + + if (ValidationError != CbValidateError::None) + { + ZEN_ERROR("validation error {} hit for oplog config at '{}'", ToString(ValidationError), StateFilePath); + return CbObject(); + } + + return LoadCompactBinaryObject(Obj); + } + ZEN_INFO("config for oplog not found at '{}'. Assuming legacy store", StateFilePath); + return {}; +} + +ProjectStore::Oplog::ValidationResult +ProjectStore::Oplog::Validate(const std::filesystem::path& ProjectRootDir, + std::atomic_bool& IsCancelledFlag, + WorkerThreadPool* OptionalWorkerPool) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + using namespace std::literals; + + ValidationResult Result; + + const size_t OpCount = OplogCount(); + + std::vector<Oid> KeyHashes; + std::vector<std::string> Keys; + std::vector<std::vector<IoHash>> Attachments; + std::vector<OplogEntryMapping> Mappings; + + KeyHashes.reserve(OpCount); + Keys.reserve(OpCount); + Mappings.reserve(OpCount); + + IterateOplogWithKey([&](LogSequenceNumber LSN, const Oid& Key, CbObjectView OpView) { + Result.LSNLow = Min(Result.LSNLow, LSN); + Result.LSNHigh = Max(Result.LSNHigh, LSN); + KeyHashes.push_back(Key); + Keys.emplace_back(std::string(OpView["key"sv].AsString())); + + std::vector<IoHash> OpAttachments; + OpView.IterateAttachments([&OpAttachments](CbFieldView Attachment) { OpAttachments.push_back(Attachment.AsAttachment()); }); + Attachments.emplace_back(std::move(OpAttachments)); + + Mappings.push_back(GetMapping(OpView)); + }); + + Result.OpCount = gsl::narrow<uint32_t>(Keys.size()); + + RwLock ResultLock; + + auto ValidateOne = [&](uint32_t OpIndex) { + const Oid& KeyHash = KeyHashes[OpIndex]; + const std::string& Key = Keys[OpIndex]; + const OplogEntryMapping& Mapping(Mappings[OpIndex]); + bool HasMissingEntries = false; + for (const ChunkMapping& Chunk : Mapping.Chunks) + { + if (!m_CidStore.ContainsChunk(Chunk.Hash)) + { + ResultLock.WithExclusiveLock([&]() { Result.MissingChunks.push_back({KeyHash, Chunk}); }); + HasMissingEntries = true; + } + } + + for (const ChunkMapping& Meta : Mapping.Meta) + { + if (!m_CidStore.ContainsChunk(Meta.Hash)) + { + ResultLock.WithExclusiveLock([&]() { Result.MissingMetas.push_back({KeyHash, Meta}); }); + HasMissingEntries = true; + } + } + + for (const FileMapping& File : Mapping.Files) + { + if (File.Hash == IoHash::Zero) + { + std::filesystem::path FilePath = ProjectRootDir / File.ServerPath; + if (!IsFile(FilePath)) + { + ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); + HasMissingEntries = true; + } + } + else if (!m_CidStore.ContainsChunk(File.Hash)) + { + ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); + HasMissingEntries = true; + } + } + + const std::vector<IoHash>& OpAttachments = Attachments[OpIndex]; + for (const IoHash& Attachment : OpAttachments) + { + if (!m_CidStore.ContainsChunk(Attachment)) + { + ResultLock.WithExclusiveLock([&]() { Result.MissingAttachments.push_back({KeyHash, Attachment}); }); + HasMissingEntries = true; + } + } + + if (HasMissingEntries) + { + ResultLock.WithExclusiveLock([&]() { Result.OpKeys.push_back({KeyHash, Key}); }); + } + }; + + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); + try + { + for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) + { + if (AbortFlag) + { + break; + } + if (OptionalWorkerPool) + { + Work.ScheduleWork(*OptionalWorkerPool, [&ValidateOne, Index = OpIndex](std::atomic<bool>& AbortFlag) { + ZEN_MEMSCOPE(GetProjectstoreTag()); + if (AbortFlag) + { + return; + } + ValidateOne(Index); + }); + } + else + { + ValidateOne(OpIndex); + } + } + } + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed validating oplogs in {}. Reason: '{}'", m_BasePath, Ex.what()); + } + Work.Wait(); + + { + // Check if we were deleted while we were checking the references without a lock... + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + Result = {}; + } + } + + return Result; +} + +void +ProjectStore::Oplog::WriteIndexSnapshot() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Oplog::WriteIndexSnapshot"); + + ZEN_ASSERT(m_Mode == EMode::kFull); + + ZEN_DEBUG("oplog '{}/{}': write store snapshot at '{}'", m_OuterProjectId, m_OplogId, m_BasePath); + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("oplog '{}/{}': wrote store snapshot for '{}' containing {} entries in {}", + m_OuterProjectId, + m_OplogId, + m_BasePath, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + namespace fs = std::filesystem; + + const fs::path IndexPath = m_BasePath / "ops.zidx"; + try + { + std::vector<Oid> Keys; + std::vector<PayloadIndex> OpPayloadOffsets; + std::vector<IoHash> ChunkMapEntries; + std::vector<IoHash> MetaMapEntries; + std::vector<uint32_t> FilePathLengths; + std::vector<std::string> FilePaths; + uint64_t IndexLogPosition = 0; + { + IndexLogPosition = m_Storage->LogCount(); + Keys.reserve(m_OpToPayloadOffsetMap.size() + m_ChunkMap.size() + m_MetaMap.size() + m_FileMap.size()); + OpPayloadOffsets.reserve(m_OpToPayloadOffsetMap.size()); + for (const auto& Kv : m_OpToPayloadOffsetMap) + { + Keys.push_back(Kv.first); + OpPayloadOffsets.push_back(Kv.second); + } + + ChunkMapEntries.reserve(m_ChunkMap.size()); + for (const auto& It : m_ChunkMap) + { + Keys.push_back(It.first); + ChunkMapEntries.push_back(It.second); + } + + MetaMapEntries.reserve(m_MetaMap.size()); + for (const auto& It : m_MetaMap) + { + Keys.push_back(It.first); + MetaMapEntries.push_back(It.second); + } + + FilePathLengths.reserve(m_FileMap.size() * 2); + FilePaths.reserve(m_FileMap.size() * 2); + for (const auto& It : m_FileMap) + { + Keys.push_back(It.first); + FilePathLengths.push_back(gsl::narrow<uint32_t>(It.second.ServerPath.length())); + FilePathLengths.push_back(gsl::narrow<uint32_t>(It.second.ClientPath.length())); + FilePaths.push_back(It.second.ServerPath); + FilePaths.push_back(It.second.ClientPath); + } + } + + TemporaryFile ObjectIndexFile; + std::error_code Ec; + ObjectIndexFile.CreateTemporary(IndexPath.parent_path(), Ec); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to create temp file for index snapshot at '{}'", IndexPath)); + } + + { + BasicFileWriter IndexFile(ObjectIndexFile, 65536u); + + OplogIndexHeader Header; + Header.V2 = {.LogPosition = IndexLogPosition, + .KeyCount = gsl::narrow<uint64_t>(Keys.size()), + .OpPayloadIndexCount = gsl::narrow<uint32_t>(OpPayloadOffsets.size()), + .OpPayloadCount = gsl::narrow<uint32_t>(m_OpLogPayloads.size()), + .ChunkMapCount = gsl::narrow<uint32_t>(ChunkMapEntries.size()), + .MetaMapCount = gsl::narrow<uint32_t>(MetaMapEntries.size()), + .FileMapCount = gsl::narrow<uint32_t>(FilePathLengths.size() / 2), + .MaxLSN = m_Storage->MaxLSN(), + .NextOpCoreOffset = m_Storage->GetNextOpOffset()}; + + Header.Checksum = OplogIndexHeader::ComputeChecksum(Header); + + uint64_t Offset = 0; + IndexFile.Write(&Header, sizeof(OplogIndexHeader), Offset); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); + + IndexFile.Write(Keys.data(), Keys.size() * sizeof(Oid), Offset); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); + + IndexFile.Write(OpPayloadOffsets.data(), OpPayloadOffsets.size() * sizeof(PayloadIndex), Offset); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); + + IndexFile.Write(m_OpLogPayloads.data(), m_OpLogPayloads.size() * sizeof(OplogPayload), Offset); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); + + IndexFile.Write(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); + + IndexFile.Write(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); + + IndexFile.Write(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); + + for (const auto& FilePath : FilePaths) + { + IndexFile.Write(FilePath.c_str(), FilePath.length(), Offset); + Offset += FilePath.length(); + } + } + + ObjectIndexFile.Flush(); + ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); + if (Ec) + { + throw std::system_error(Ec, + fmt::format("Snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", + ObjectIndexFile.GetPath(), + IndexPath, + Ec.message())); + } + EntryCount = m_OpLogPayloads.size(); + m_LogFlushPosition = IndexLogPosition; + m_IsLegacySnapshot = false; + } + catch (const std::exception& Err) + { + ZEN_WARN("oplog '{}/{}': snapshot FAILED, reason: '{}'", m_OuterProjectId, m_OplogId, Err.what()); + } +} + +void +ProjectStore::Oplog::ReadIndexSnapshot() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Oplog::ReadIndexSnapshot"); + + const std::filesystem::path IndexPath = m_BasePath / "ops.zidx"; + if (IsFile(IndexPath)) + { + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("oplog '{}/{}': index read from '{}' containing {} entries in {}", + m_OuterProjectId, + m_OplogId, + IndexPath, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + try + { + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(OplogIndexHeader)) + { + OplogIndexHeader Header; + uint64_t Offset = 0; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + Offset += sizeof(OplogIndexHeader); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + + if (Header.Magic == OplogIndexHeader::ExpectedMagic) + { + uint32_t Checksum = OplogIndexHeader::ComputeChecksum(Header); + if (Header.Checksum != Checksum) + { + ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Checksum mismatch. Expected: {}, Found: {}", + m_OuterProjectId, + m_OplogId, + IndexPath, + Header.Checksum, + Checksum); + return; + } + + if (Header.Version == OplogIndexHeader::Version1) + { + uint32_t MaxLSN = 0; + OplogEntryAddress LastOpAddress{.Offset = 0, .Size = 0}; + + if (Header.V1.LatestOpMapCount + Header.V1.ChunkMapCount + Header.V1.MetaMapCount + Header.V1.FileMapCount != + Header.V1.KeyCount) + { + ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Key count mismatch. Expected: {}, Found: {}", + m_OuterProjectId, + m_OplogId, + IndexPath, + Header.V1.LatestOpMapCount + Header.V1.ChunkMapCount + Header.V1.MetaMapCount + Header.V1.FileMapCount, + Header.V1.KeyCount); + return; + } + + std::vector<uint32_t> LSNEntries(Header.V1.LSNCount); + ObjectIndexFile.Read(LSNEntries.data(), LSNEntries.size() * sizeof(uint32_t), Offset); + Offset += LSNEntries.size() * sizeof(uint32_t); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + size_t LSNOffset = 0; + + std::vector<Oid> Keys(Header.V1.KeyCount); + ObjectIndexFile.Read(Keys.data(), Keys.size() * sizeof(Oid), Offset); + Offset += Keys.size() * sizeof(Oid); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + size_t KeyOffset = 0; + + { + std::vector<OplogEntryAddress> AddressMapEntries(Header.V1.OpAddressMapCount); + ObjectIndexFile.Read(AddressMapEntries.data(), AddressMapEntries.size() * sizeof(OplogEntryAddress), Offset); + Offset += AddressMapEntries.size() * sizeof(OplogEntryAddress); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + + tsl::robin_map<uint32_t, PayloadIndex> LsnToPayloadOffset; + LsnToPayloadOffset.reserve(AddressMapEntries.size()); + + m_OpLogPayloads.reserve(AddressMapEntries.size()); + for (const OplogEntryAddress& Address : AddressMapEntries) + { + const uint32_t LSN = LSNEntries[LSNOffset++]; + LsnToPayloadOffset.insert_or_assign(LSN, PayloadIndex(m_OpLogPayloads.size())); + + m_OpLogPayloads.push_back({.Lsn = LogSequenceNumber(LSN), .Address = Address}); + if (Address.Offset > LastOpAddress.Offset) + { + LastOpAddress = Address; + } + MaxLSN = Max(MaxLSN, LSN); + } + + { + std::vector<uint32_t> LatestOpMapEntries(Header.V1.LatestOpMapCount); + ObjectIndexFile.Read(LatestOpMapEntries.data(), LatestOpMapEntries.size() * sizeof(uint32_t), Offset); + Offset += LatestOpMapEntries.size() * sizeof(uint32_t); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + m_OpToPayloadOffsetMap.reserve(LatestOpMapEntries.size()); + for (uint32_t Lsn : LatestOpMapEntries) + { + if (auto It = LsnToPayloadOffset.find(Lsn); It != LsnToPayloadOffset.end()) + { + m_OpToPayloadOffsetMap.insert_or_assign(Keys[KeyOffset++], It->second); + MaxLSN = Max(MaxLSN, Lsn); + } + } + } + } + if (m_Mode == EMode::kFull) + { + { + std::vector<IoHash> ChunkMapEntries(Header.V1.ChunkMapCount); + ObjectIndexFile.Read(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset); + Offset += ChunkMapEntries.size() * sizeof(IoHash); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + m_ChunkMap.reserve(ChunkMapEntries.size()); + for (const IoHash& ChunkId : ChunkMapEntries) + { + m_ChunkMap.insert_or_assign(Keys[KeyOffset++], ChunkId); + } + } + { + std::vector<IoHash> MetaMapEntries(Header.V1.MetaMapCount); + ObjectIndexFile.Read(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset); + Offset += MetaMapEntries.size() * sizeof(IoHash); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + m_MetaMap.reserve(MetaMapEntries.size()); + for (const IoHash& ChunkId : MetaMapEntries) + { + m_MetaMap.insert_or_assign(Keys[KeyOffset++], ChunkId); + } + } + { + m_FileMap.reserve(Header.V1.FileMapCount); + + std::vector<uint32_t> FilePathLengths(Header.V1.FileMapCount * 2); + ObjectIndexFile.Read(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset); + Offset += FilePathLengths.size() * sizeof(uint32_t); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + + BasicFileBuffer IndexFile(ObjectIndexFile, 65536); + auto ReadString([&IndexFile, &Offset](uint32_t Length) -> std::string { + MemoryView StringData = IndexFile.MakeView(Length, Offset); + if (StringData.GetSize() != Length) + { + throw std::runtime_error(fmt::format("Invalid format. Expected to read {} bytes but got {}", + Length, + uint32_t(StringData.GetSize()))); + } + Offset += Length; + return std::string((const char*)StringData.GetData(), Length); + }); + for (uint64_t FileLengthOffset = 0; FileLengthOffset < FilePathLengths.size();) + { + std::string ServerPath = ReadString(FilePathLengths[FileLengthOffset++]); + std::string ClientPath = ReadString(FilePathLengths[FileLengthOffset++]); + m_FileMap.insert_or_assign( + Keys[KeyOffset++], + FileMapEntry{.ServerPath = std::move(ServerPath), .ClientPath = std::move(ClientPath)}); + } + } + } + m_LogFlushPosition = Header.V1.LogPosition; + m_Storage->SetMaxLSNAndNextWriteAddress(MaxLSN, LastOpAddress); + EntryCount = Header.V1.LSNCount; + m_IsLegacySnapshot = true; + } + else if (Header.Version == OplogIndexHeader::Version2) + { + std::vector<Oid> Keys(Header.V2.KeyCount); + + ObjectIndexFile.Read(Keys.data(), Keys.size() * sizeof(Oid), Offset); + Offset += Keys.size() * sizeof(Oid); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + + std::vector<PayloadIndex> OpPayloadOffsets(Header.V2.OpPayloadIndexCount); + ObjectIndexFile.Read(OpPayloadOffsets.data(), OpPayloadOffsets.size() * sizeof(PayloadIndex), Offset); + Offset += OpPayloadOffsets.size() * sizeof(PayloadIndex); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + + m_OpLogPayloads.resize(Header.V2.OpPayloadCount); + ObjectIndexFile.Read(m_OpLogPayloads.data(), m_OpLogPayloads.size() * sizeof(OplogPayload), Offset); + Offset += m_OpLogPayloads.size() * sizeof(OplogPayload); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + + size_t KeyOffset = 0; + + m_OpToPayloadOffsetMap.reserve(Header.V2.OpPayloadIndexCount); + for (const PayloadIndex PayloadOffset : OpPayloadOffsets) + { + const Oid& Key = Keys[KeyOffset++]; + ZEN_ASSERT_SLOW(PayloadOffset.Index < Header.V2.OpPayloadCount); + m_OpToPayloadOffsetMap.insert({Key, PayloadOffset}); + } + + if (m_Mode == EMode::kFull) + { + { + std::vector<IoHash> ChunkMapEntries(Header.V2.ChunkMapCount); + ObjectIndexFile.Read(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset); + Offset += ChunkMapEntries.size() * sizeof(IoHash); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + m_ChunkMap.reserve(ChunkMapEntries.size()); + for (const IoHash& ChunkId : ChunkMapEntries) + { + m_ChunkMap.insert_or_assign(Keys[KeyOffset++], ChunkId); + } + } + { + std::vector<IoHash> MetaMapEntries(Header.V2.MetaMapCount); + ObjectIndexFile.Read(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset); + Offset += MetaMapEntries.size() * sizeof(IoHash); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + m_MetaMap.reserve(MetaMapEntries.size()); + for (const IoHash& ChunkId : MetaMapEntries) + { + m_MetaMap.insert_or_assign(Keys[KeyOffset++], ChunkId); + } + } + { + m_FileMap.reserve(Header.V2.FileMapCount); + + std::vector<uint32_t> FilePathLengths(Header.V2.FileMapCount * 2); + ObjectIndexFile.Read(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset); + Offset += FilePathLengths.size() * sizeof(uint32_t); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + + BasicFileBuffer IndexFile(ObjectIndexFile, 65536); + auto ReadString([&IndexFile, &Offset](uint32_t Length) -> std::string { + MemoryView StringData = IndexFile.MakeView(Length, Offset); + if (StringData.GetSize() != Length) + { + throw std::runtime_error(fmt::format("Invalid format. Expected to read {} bytes but got {}", + Length, + uint32_t(StringData.GetSize()))); + } + Offset += Length; + return std::string((const char*)StringData.GetData(), Length); + }); + for (uint64_t FileLengthOffset = 0; FileLengthOffset < FilePathLengths.size();) + { + std::string ServerPath = ReadString(FilePathLengths[FileLengthOffset++]); + std::string ClientPath = ReadString(FilePathLengths[FileLengthOffset++]); + m_FileMap.insert_or_assign( + Keys[KeyOffset++], + FileMapEntry{.ServerPath = std::move(ServerPath), .ClientPath = std::move(ClientPath)}); + } + } + } + m_LogFlushPosition = Header.V2.LogPosition; + m_Storage->SetMaxLSNAndNextOpOffset(Header.V2.MaxLSN, Header.V2.NextOpCoreOffset); + EntryCount = Header.V2.OpPayloadCount; + m_IsLegacySnapshot = false; + } + else + { + ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Unsupported version number {}", + m_OuterProjectId, + m_OplogId, + IndexPath, + Header.Version); + return; + } + } + else + { + ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'", m_OuterProjectId, m_OplogId, IndexPath); + } + } + } + catch (const std::exception& Ex) + { + m_OpToPayloadOffsetMap.clear(); + m_OpLogPayloads.clear(); + m_ChunkMap.clear(); + m_MetaMap.clear(); + m_FileMap.clear(); + m_LogFlushPosition = 0; + ZEN_ERROR("oplog '{}/{}': failed reading index snapshot from '{}'. Reason: '{}'", + m_OuterProjectId, + m_OplogId, + IndexPath, + Ex.what()); + } + } +} + +uint32_t +ProjectStore::Oplog::GetUnusedSpacePercent() const +{ + RwLock::SharedLockScope OplogLock(m_OplogLock); + return GetUnusedSpacePercentLocked(); +} + +uint32_t +ProjectStore::Oplog::GetUnusedSpacePercentLocked() const +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + const uint64_t ActualBlobsSize = m_Storage->OpBlobsSize(); + if (ActualBlobsSize == 0) + { + return 0; + } + + const uint64_t EffectiveBlobsSize = m_Storage->GetEffectiveBlobsSize(m_OpLogPayloads); + + if (EffectiveBlobsSize < ActualBlobsSize) + { + return gsl::narrow<uint32_t>((100 * (ActualBlobsSize - EffectiveBlobsSize)) / ActualBlobsSize); + } + + return 0; +} + +void +ProjectStore::Oplog::Compact(bool DryRun, bool RetainLSNs, std::string_view LogPrefix) +{ + RwLock::ExclusiveLockScope Lock(m_OplogLock); + Compact(Lock, DryRun, RetainLSNs, LogPrefix); +} + +void +ProjectStore::Oplog::Compact(RwLock::ExclusiveLockScope&, bool DryRun, bool RetainLSNs, std::string_view LogPrefix) +{ + ZEN_ASSERT(m_Mode == EMode::kFull); + + ZEN_MEMSCOPE(GetProjectstoreTag()); + + Stopwatch Timer; + + std::vector<LogSequenceNumber> LatestLSNs; + LatestLSNs.reserve(m_OpLogPayloads.size()); + for (const auto& Kv : m_OpToPayloadOffsetMap) + { + const OplogPayload& OpPayload = m_OpLogPayloads[Kv.second]; + LatestLSNs.push_back(OpPayload.Lsn); + } + + std::vector<OplogPayload> OpPayloads; + OpPayloads.reserve(LatestLSNs.size()); + + OidMap<PayloadIndex> OpToPayloadOffsetMap; + OpToPayloadOffsetMap.reserve(LatestLSNs.size()); + + uint64_t PreSize = TotalSize(); + + m_Storage->Compact( + LatestLSNs, + [&](const Oid& OpKeyHash, LogSequenceNumber, LogSequenceNumber NewLsn, const OplogEntryAddress& NewAddress) { + OpToPayloadOffsetMap.insert({OpKeyHash, PayloadIndex(OpPayloads.size())}); + OpPayloads.push_back({.Lsn = NewLsn, .Address = NewAddress}); + }, + RetainLSNs, + /*DryRun*/ DryRun); + + if (!DryRun) + { + m_OpToPayloadOffsetMap.swap(OpToPayloadOffsetMap); + m_OpLogPayloads.swap(OpPayloads); + m_LsnToPayloadOffsetMap.reset(); + WriteIndexSnapshot(); + } + + uint64_t PostSize = TotalSize(); + uint64_t FreedSize = (PreSize > PostSize) ? (PreSize - PostSize) : 0; + + ZEN_INFO("{} oplog '{}/{}': Compacted in {}. New size: {}, freeing: {}", + LogPrefix, + m_OuterProjectId, + m_OplogId, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + NiceBytes(PostSize), + NiceBytes(FreedSize)); +} + +IoBuffer +ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash) +{ + ZEN_ASSERT(m_Mode == EMode::kFull); + return m_CidStore.FindChunkByCid(RawHash); +} + +bool +ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes, + bool IncludeModTag, + const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) +{ + ZEN_ASSERT(m_Mode == EMode::kFull); + return m_CidStore.IterateChunks( + RawHashes, + [&](size_t Index, const IoBuffer& Payload) { + return AsyncCallback(Index, Payload, IncludeModTag ? GetModificationTagFromRawHash(RawHashes[Index]) : 0); + }, + OptionalWorkerPool, + LargeSizeLimit); +} + +bool +ProjectStore::Oplog::IterateChunks(const std::filesystem::path& ProjectRootDir, + std::span<Oid> ChunkIds, + bool IncludeModTag, + const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) +{ + ZEN_ASSERT(m_Mode == EMode::kFull); + ZEN_MEMSCOPE(GetProjectstoreTag()); + + std::vector<size_t> CidChunkIndexes; + std::vector<IoHash> CidChunkHashes; + std::vector<size_t> FileChunkIndexes; + std::vector<std::filesystem::path> FileChunkPaths; + { + RwLock::SharedLockScope OplogLock(m_OplogLock); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkIds.size(); ChunkIndex++) + { + const Oid& ChunkId = ChunkIds[ChunkIndex]; + if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) + { + CidChunkIndexes.push_back(ChunkIndex); + CidChunkHashes.push_back(ChunkIt->second); + } + else if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) + { + CidChunkIndexes.push_back(ChunkIndex); + CidChunkHashes.push_back(ChunkIt->second); + } + else if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) + { + FileChunkIndexes.push_back(ChunkIndex); + FileChunkPaths.emplace_back(ProjectRootDir / FileIt->second.ServerPath); + } + } + } + if (OptionalWorkerPool) + { + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); + try + { + for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) + { + if (AbortFlag) + { + break; + } + Work.ScheduleWork( + *OptionalWorkerPool, + [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback]( + std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + return; + } + size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; + const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; + try + { + IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); + if (!Payload) + { + ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[FileChunkIndex], FilePath); + } + + if (!AsyncCallback(FileChunkIndex, + Payload, + IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0)) + { + AbortFlag.store(true); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'", + m_OuterProjectId, + m_OplogId, + FileChunkIndex, + FilePath, + Ex.what()); + } + }); + } + + if (!CidChunkHashes.empty() && !AbortFlag) + { + m_CidStore.IterateChunks( + CidChunkHashes, + [&](size_t Index, const IoBuffer& Payload) { + size_t CidChunkIndex = CidChunkIndexes[Index]; + if (AbortFlag) + { + return false; + } + return AsyncCallback(CidChunkIndex, + Payload, + IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0); + }, + OptionalWorkerPool, + LargeSizeLimit); + } + } + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed iterating oplog chunks in {}. Reason: '{}'", m_BasePath, Ex.what()); + } + + Work.Wait(); + + return !AbortFlag; + } + else + { + if (!CidChunkHashes.empty()) + { + m_CidStore.IterateChunks( + CidChunkHashes, + [&](size_t Index, const IoBuffer& Payload) { + size_t CidChunkIndex = CidChunkIndexes[Index]; + return AsyncCallback(CidChunkIndex, Payload, IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0); + }, + OptionalWorkerPool, + LargeSizeLimit); + } + + for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) + { + size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; + const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; + IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); + if (Payload) + { + bool Result = AsyncCallback(FileChunkIndex, Payload, IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0); + if (!Result) + { + return false; + } + } + } + } + return true; +} + +IoBuffer +ProjectStore::Oplog::FindChunk(const std::filesystem::path& ProjectRootDir, const Oid& ChunkId, uint64_t* OptOutModificationTag) +{ + ZEN_ASSERT(m_Mode == EMode::kFull); + + RwLock::SharedLockScope OplogLock(m_OplogLock); + if (!m_Storage) + { + return IoBuffer{}; + } + + if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) + { + IoHash ChunkHash = ChunkIt->second; + OplogLock.ReleaseNow(); + + IoBuffer Result = m_CidStore.FindChunkByCid(ChunkHash); + if (Result && OptOutModificationTag != nullptr) + { + *OptOutModificationTag = GetModificationTagFromRawHash(ChunkHash); + } + return Result; + } + + if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) + { + std::filesystem::path FilePath = ProjectRootDir / FileIt->second.ServerPath; + + OplogLock.ReleaseNow(); + + IoBuffer Result = IoBufferBuilder::MakeFromFile(FilePath); + if (!Result) + { + ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkId, FilePath); + } + else if (OptOutModificationTag != nullptr) + { + *OptOutModificationTag = GetModificationTagFromModificationTime(Result); + } + return Result; + } + + if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) + { + IoHash ChunkHash = MetaIt->second; + OplogLock.ReleaseNow(); + + IoBuffer Result = m_CidStore.FindChunkByCid(ChunkHash); + if (Result && OptOutModificationTag != nullptr) + { + *OptOutModificationTag = GetModificationTagFromRawHash(ChunkHash); + } + return Result; + } + + return {}; +} + +std::vector<ProjectStore::Oplog::ChunkInfo> +ProjectStore::Oplog::GetAllChunksInfo(const std::filesystem::path& ProjectRootDir) +{ + ZEN_ASSERT(m_Mode == EMode::kFull); + + ZEN_MEMSCOPE(GetProjectstoreTag()); + + // First just capture all the chunk ids + + std::vector<ChunkInfo> InfoArray; + + { + RwLock::SharedLockScope _(m_OplogLock); + + if (m_Storage) + { + const size_t NumEntries = m_FileMap.size() + m_ChunkMap.size(); + + InfoArray.reserve(NumEntries); + + for (const auto& Kv : m_FileMap) + { + InfoArray.push_back({.ChunkId = Kv.first}); + } + + for (const auto& Kv : m_ChunkMap) + { + InfoArray.push_back({.ChunkId = Kv.first}); + } + } + } + + for (ChunkInfo& Info : InfoArray) + { + if (IoBuffer Chunk = FindChunk(ProjectRootDir, Info.ChunkId, nullptr)) + { + Info.ChunkSize = Chunk.GetSize(); + } + } + + return InfoArray; +} + +void +ProjectStore::Oplog::IterateChunkMap(std::function<void(const Oid&, const IoHash&)>&& Fn) +{ + ZEN_ASSERT(m_Mode == EMode::kFull); + + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return; + } + + for (const auto& Kv : m_ChunkMap) + { + Fn(Kv.first, Kv.second); + } +} + +void +ProjectStore::Oplog::IterateFileMap( + std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn) +{ + ZEN_ASSERT(m_Mode == EMode::kFull); + + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return; + } + + for (const auto& Kv : m_FileMap) + { + Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath); + } +} + +void +ProjectStore::Oplog::IterateOplog(std::function<void(CbObjectView)>&& Handler, const Paging& EntryPaging) +{ + RwLock::SharedLockScope _(m_OplogLock); + IterateOplogLocked(std::move(Handler), EntryPaging); +} + +std::vector<ProjectStore::Oplog::PayloadIndex> +ProjectStore::Oplog::GetSortedOpPayloadRangeLocked(const Paging& EntryPaging, + tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher>* OutOptionalReverseKeyMap) +{ + std::pair<int32_t, int32_t> StartAndEnd = GetPagedRange(int32_t(m_OpToPayloadOffsetMap.size()), EntryPaging); + if (StartAndEnd.first == StartAndEnd.second) + { + return {}; + } + + const int32_t ReplayCount = StartAndEnd.second - StartAndEnd.first; + + auto Start = m_OpToPayloadOffsetMap.cbegin(); + std::advance(Start, StartAndEnd.first); + + auto End = Start; + std::advance(End, ReplayCount); + + std::vector<PayloadIndex> ReplayOrder; + ReplayOrder.reserve(ReplayCount); + + for (auto It = Start; It != End; It++) + { + const PayloadIndex PayloadOffset = It->second; + if (OutOptionalReverseKeyMap != nullptr) + { + OutOptionalReverseKeyMap->insert_or_assign(PayloadOffset, It->first); + } + ReplayOrder.push_back(PayloadOffset); + } + + std::sort(ReplayOrder.begin(), ReplayOrder.end(), [this](const PayloadIndex Lhs, const PayloadIndex Rhs) { + const OplogEntryAddress& LhsEntry = m_OpLogPayloads[Lhs].Address; + const OplogEntryAddress& RhsEntry = m_OpLogPayloads[Rhs].Address; + return LhsEntry.Offset < RhsEntry.Offset; + }); + + return ReplayOrder; +} + +void +ProjectStore::Oplog::IterateOplogLocked(std::function<void(CbObjectView)>&& Handler, const Paging& EntryPaging) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::Oplog::IterateOplogLocked"); + if (!m_Storage) + { + return; + } + + std::vector<PayloadIndex> ReplayOrder = GetSortedOpPayloadRangeLocked(EntryPaging, nullptr); + if (!ReplayOrder.empty()) + { + m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber, CbObjectView Op) { Handler(Op); }); + } +} + +void +ProjectStore::Oplog::IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Handler) +{ + IterateOplogWithKey(std::move(Handler), Paging{}); +} + +void +ProjectStore::Oplog::IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Handler, + const Paging& EntryPaging) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher> ReverseKeyMap; + std::vector<PayloadIndex> ReplayOrder; + + { + RwLock::SharedLockScope _(m_OplogLock); + if (m_Storage) + { + ReplayOrder = GetSortedOpPayloadRangeLocked(EntryPaging, &ReverseKeyMap); + if (!ReplayOrder.empty()) + { + uint32_t EntryIndex = 0; + m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, CbObjectView Op) { + const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex]; + Handler(Lsn, ReverseKeyMap.at(PayloadOffset), Op); + EntryIndex++; + }); + } + } + } +} + +static constexpr uint32_t OplogMetaDataExpectedMagic = 0x6f'74'6d'62; // 'omta'; + +void +ProjectStore::Oplog::GetAttachmentsLocked(std::vector<IoHash>& OutAttachments, bool StoreMetaDataOnDisk) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::Oplog::GetAttachmentsLocked"); + if (!m_Storage) + { + return; + } + + if (StoreMetaDataOnDisk && m_MetaValid) + { + IoBuffer MetadataPayload = IoBufferBuilder::MakeFromFile(m_MetaPath); + if (MetadataPayload) + { + ZEN_TRACE_CPU("Store::Oplog::GetAttachmentsFromMetaData"); + if (GetAttachmentsFromMetaData<Oid, IoHash>( + MetadataPayload, + OplogMetaDataExpectedMagic, + [&](std::span<const Oid> Keys, std::span<const uint32_t> AttachmentCounts, std::span<const IoHash> Attachments) { + ZEN_UNUSED(Keys); + ZEN_UNUSED(AttachmentCounts); + OutAttachments.insert(OutAttachments.end(), Attachments.begin(), Attachments.end()); + })) + { + return; + } + } + } + + std::vector<Oid> Keys; + std::vector<uint32_t> AttachmentCounts; + size_t AttachmentOffset = OutAttachments.size(); + + IterateOplogLocked( + [&](CbObjectView Op) { + using namespace std::literals; + size_t CurrentAttachmentCount = OutAttachments.size(); + Op.IterateAttachments([&](CbFieldView Visitor) { OutAttachments.emplace_back(Visitor.AsAttachment()); }); + if (StoreMetaDataOnDisk) + { + const Oid KeyHash = ComputeOpKey(Op); + Keys.push_back(KeyHash); + AttachmentCounts.push_back(gsl::narrow<uint32_t>(OutAttachments.size() - CurrentAttachmentCount)); + } + }, + Paging{}); + if (StoreMetaDataOnDisk) + { + const IoHash* FirstAttachment = OutAttachments.data() + AttachmentOffset; + size_t AttachmentCount = OutAttachments.size() - AttachmentOffset; + IoBuffer MetaPayload = BuildReferenceMetaData<Oid>(OplogMetaDataExpectedMagic, + Keys, + AttachmentCounts, + std::span<const IoHash>(FirstAttachment, AttachmentCount)) + .Flatten() + .AsIoBuffer(); + + const std::filesystem::path MetaPath = m_MetaPath; + std::error_code Ec; + TemporaryFile::SafeWriteFile(MetaPath, MetaPayload.GetView(), Ec); + if (Ec) + { + m_MetaValid = false; + ZEN_WARN("oplog '{}/{}': unable to set meta data meta path: '{}'. Reason: '{}'", + m_OuterProjectId, + m_OplogId, + MetaPath, + Ec.message()); + } + else + { + m_MetaValid = true; + } + } +} + +size_t +ProjectStore::Oplog::GetOplogEntryCount() const +{ + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return 0; + } + return m_OpToPayloadOffsetMap.size(); +} + +ProjectStore::LogSequenceNumber +ProjectStore::Oplog::GetOpIndexByKey(const Oid& Key) +{ + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return {}; + } + if (const auto LatestOp = m_OpToPayloadOffsetMap.find(Key); LatestOp != m_OpToPayloadOffsetMap.end()) + { + return m_OpLogPayloads[LatestOp->second].Lsn; + } + return {}; +} + +std::optional<CbObject> +ProjectStore::Oplog::GetOpByKey(const Oid& Key) +{ + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return {}; + } + + if (const auto LatestOp = m_OpToPayloadOffsetMap.find(Key); LatestOp != m_OpToPayloadOffsetMap.end()) + { + return m_Storage->GetOp(m_OpLogPayloads[LatestOp->second].Address); + } + + return {}; +} + +std::optional<CbObject> +ProjectStore::Oplog::GetOpByIndex(ProjectStore::LogSequenceNumber Index) +{ + { + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return {}; + } + + if (m_LsnToPayloadOffsetMap) + { + if (auto It = m_LsnToPayloadOffsetMap->find(Index); It != m_LsnToPayloadOffsetMap->end()) + { + return m_Storage->GetOp(m_OpLogPayloads[It->second].Address); + } + } + } + + RwLock::ExclusiveLockScope Lock(m_OplogLock); + if (!m_Storage) + { + return {}; + } + + RefreshLsnToPayloadOffsetMap(Lock); + if (auto It = m_LsnToPayloadOffsetMap->find(Index); It != m_LsnToPayloadOffsetMap->end()) + { + return m_Storage->GetOp(m_OpLogPayloads[It->second].Address); + } + return {}; +} + +void +ProjectStore::Oplog::CaptureAddedAttachments(std::span<const IoHash> AttachmentHashes) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + m_OplogLock.WithExclusiveLock([this, AttachmentHashes]() { + if (m_CapturedAttachments) + { + m_CapturedAttachments->reserve(m_CapturedAttachments->size() + AttachmentHashes.size()); + m_CapturedAttachments->insert(m_CapturedAttachments->end(), AttachmentHashes.begin(), AttachmentHashes.end()); + } + }); +} + +void +ProjectStore::Oplog::EnableUpdateCapture() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + m_OplogLock.WithExclusiveLock([&]() { + if (m_UpdateCaptureRefCounter == 0) + { + ZEN_ASSERT(!m_CapturedOps); + ZEN_ASSERT(!m_CapturedAttachments); + m_CapturedOps = std::make_unique<std::vector<Oid>>(); + m_CapturedAttachments = std::make_unique<std::vector<IoHash>>(); + } + else + { + ZEN_ASSERT(m_CapturedOps); + ZEN_ASSERT(m_CapturedAttachments); + } + m_UpdateCaptureRefCounter++; + }); +} + +void +ProjectStore::Oplog::DisableUpdateCapture() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + m_OplogLock.WithExclusiveLock([&]() { + ZEN_ASSERT(m_CapturedOps); + ZEN_ASSERT(m_CapturedAttachments); + ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); + m_UpdateCaptureRefCounter--; + if (m_UpdateCaptureRefCounter == 0) + { + m_CapturedOps.reset(); + m_CapturedAttachments.reset(); + } + }); +} + +void +ProjectStore::Oplog::IterateCapturedOpsLocked( + std::function<bool(const Oid& Key, ProjectStore::LogSequenceNumber LSN, const CbObjectView& UpdateOp)>&& Callback) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + if (m_CapturedOps) + { + if (!m_Storage) + { + return; + } + for (const Oid& OpKey : *m_CapturedOps) + { + if (const auto AddressEntryIt = m_OpToPayloadOffsetMap.find(OpKey); AddressEntryIt != m_OpToPayloadOffsetMap.end()) + { + const OplogPayload& OpPayload = m_OpLogPayloads[AddressEntryIt->second]; + Callback(OpKey, OpPayload.Lsn, m_Storage->GetOp(OpPayload.Address)); + } + } + } +} + +std::vector<IoHash> +ProjectStore::Oplog::GetCapturedAttachmentsLocked() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + if (m_CapturedAttachments) + { + return *m_CapturedAttachments; + } + return {}; +} + +std::vector<IoHash> +ProjectStore::Oplog::CheckPendingChunkReferences(std::span<const IoHash> ChunkHashes, const GcClock::Duration& RetainTime) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + m_OplogLock.WithExclusiveLock([&]() { + GcClock::TimePoint Now = GcClock::Now(); + if (m_PendingPrepOpAttachmentsRetainEnd < Now) + { + m_PendingPrepOpAttachments.clear(); + } + m_PendingPrepOpAttachments.insert(ChunkHashes.begin(), ChunkHashes.end()); + GcClock::TimePoint NewEndPoint = Now + RetainTime; + if (m_PendingPrepOpAttachmentsRetainEnd < NewEndPoint) + { + m_PendingPrepOpAttachmentsRetainEnd = NewEndPoint; + } + }); + + std::vector<IoHash> MissingChunks; + MissingChunks.reserve(ChunkHashes.size()); + for (const IoHash& FileHash : ChunkHashes) + { + if (!m_CidStore.ContainsChunk(FileHash)) + { + MissingChunks.push_back(FileHash); + } + } + + return MissingChunks; +} + +void +ProjectStore::Oplog::RemovePendingChunkReferences(std::span<const IoHash> ChunkHashes) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + m_OplogLock.WithExclusiveLock([&]() { + GcClock::TimePoint Now = GcClock::Now(); + if (m_PendingPrepOpAttachmentsRetainEnd < Now) + { + m_PendingPrepOpAttachments.clear(); + } + else + { + for (const IoHash& Chunk : ChunkHashes) + { + m_PendingPrepOpAttachments.erase(Chunk); + } + } + }); +} + +std::vector<IoHash> +ProjectStore::Oplog::GetPendingChunkReferencesLocked() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + std::vector<IoHash> Result; + Result.reserve(m_PendingPrepOpAttachments.size()); + Result.insert(Result.end(), m_PendingPrepOpAttachments.begin(), m_PendingPrepOpAttachments.end()); + GcClock::TimePoint Now = GcClock::Now(); + if (m_PendingPrepOpAttachmentsRetainEnd < Now) + { + m_PendingPrepOpAttachments.clear(); + } + return Result; +} + +void +ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, + const Oid& FileId, + const IoHash& Hash, + std::string_view ServerPath, + std::string_view ClientPath) +{ + FileMapEntry Entry; + + if (Hash != IoHash::Zero) + { + m_ChunkMap.insert_or_assign(FileId, Hash); + } + else + { + Entry.ServerPath = ServerPath; + } + + Entry.ClientPath = ClientPath; + + m_FileMap[FileId] = std::move(Entry); +} + +void +ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, const Oid& ChunkId, const IoHash& Hash) +{ + m_ChunkMap.insert_or_assign(ChunkId, Hash); +} + +void +ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, const Oid& ChunkId, const IoHash& Hash) +{ + m_MetaMap.insert_or_assign(ChunkId, Hash); +} + +ProjectStore::Oplog::OplogEntryMapping +ProjectStore::Oplog::GetMapping(CbObjectView Core) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + using namespace std::literals; + + OplogEntryMapping Result; + + // Update chunk id maps + for (CbFieldView Field : Core) + { + std::string_view FieldName = Field.GetName(); + if (FieldName == "package"sv) + { + CbObjectView PackageObj = Field.AsObjectView(); + Oid Id = PackageObj["id"sv].AsObjectId(); + IoHash Hash = PackageObj["data"sv].AsBinaryAttachment(); + Result.Chunks.emplace_back(ChunkMapping{Id, Hash}); + ZEN_DEBUG("oplog {}/{}: package data {} -> {}", m_OuterProjectId, m_OplogId, Id, Hash); + continue; + } + if (FieldName == "bulkdata"sv) + { + CbArrayView BulkDataArray = Field.AsArrayView(); + for (CbFieldView& Entry : BulkDataArray) + { + CbObjectView BulkObj = Entry.AsObjectView(); + Oid Id = BulkObj["id"sv].AsObjectId(); + IoHash Hash = BulkObj["data"sv].AsBinaryAttachment(); + Result.Chunks.emplace_back(ChunkMapping{Id, Hash}); + ZEN_DEBUG("oplog {}/{}: bulkdata {} -> {}", m_OuterProjectId, m_OplogId, Id, Hash); + } + continue; + } + if (FieldName == "packagedata"sv) + { + CbArrayView PackageDataArray = Field.AsArrayView(); + for (CbFieldView& Entry : PackageDataArray) + { + CbObjectView PackageDataObj = Entry.AsObjectView(); + Oid Id = PackageDataObj["id"sv].AsObjectId(); + IoHash Hash = PackageDataObj["data"sv].AsBinaryAttachment(); + Result.Chunks.emplace_back(ChunkMapping{Id, Hash}); + ZEN_DEBUG("oplog {}/{}: package {} -> {}", m_OuterProjectId, m_OplogId, Id, Hash); + } + continue; + } + if (FieldName == "files"sv) + { + CbArrayView FilesArray = Field.AsArrayView(); + Result.Files.reserve(FilesArray.Num()); + for (CbFieldView& Entry : FilesArray) + { + CbObjectView FileObj = Entry.AsObjectView(); + + std::string_view ServerPath = FileObj["serverpath"sv].AsString(); + std::string_view ClientPath = FileObj["clientpath"sv].AsString(); + Oid Id = FileObj["id"sv].AsObjectId(); + IoHash Hash = FileObj["data"sv].AsBinaryAttachment(); + if (ServerPath.empty() && Hash == IoHash::Zero) + { + ZEN_WARN("oplog {}/{}: invalid file for entry '{}', missing both 'serverpath' and 'data' fields", + m_OuterProjectId, + m_OplogId, + Id); + continue; + } + if (ClientPath.empty()) + { + ZEN_WARN("oplog {}/{}: invalid file for entry '{}', missing 'clientpath' field", m_OuterProjectId, m_OplogId, Id); + continue; + } + + Result.Files.emplace_back(FileMapping{Id, Hash, std::string(ServerPath), std::string(ClientPath)}); + ZEN_DEBUG("oplog {}/{}: file {} -> {}, ServerPath: {}, ClientPath: {}", + m_OuterProjectId, + m_OplogId, + Id, + Hash, + ServerPath, + ClientPath); + } + continue; + } + if (FieldName == "meta"sv) + { + CbArrayView MetaArray = Field.AsArrayView(); + Result.Meta.reserve(MetaArray.Num()); + for (CbFieldView& Entry : MetaArray) + { + CbObjectView MetaObj = Entry.AsObjectView(); + Oid Id = MetaObj["id"sv].AsObjectId(); + IoHash Hash = MetaObj["data"sv].AsBinaryAttachment(); + Result.Meta.emplace_back(ChunkMapping{Id, Hash}); + auto NameString = MetaObj["name"sv].AsString(); + ZEN_DEBUG("oplog {}/{}: meta data ({}) {} -> {}", m_OuterProjectId, m_OplogId, NameString, Id, Hash); + } + continue; + } + } + + return Result; +} + +ProjectStore::LogSequenceNumber +ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, + const OplogEntryMapping& OpMapping, + const OplogEntry& OpEntry) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing + // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however + + using namespace std::literals; + + // Update chunk id maps + for (const ChunkMapping& Chunk : OpMapping.Chunks) + { + AddChunkMapping(OplogLock, Chunk.Id, Chunk.Hash); + } + + for (const FileMapping& File : OpMapping.Files) + { + AddFileMapping(OplogLock, File.Id, File.Hash, File.ServerPath, File.ClientPath); + } + + for (const ChunkMapping& Meta : OpMapping.Meta) + { + AddMetaMapping(OplogLock, Meta.Id, Meta.Hash); + } + + const PayloadIndex PayloadOffset(m_OpLogPayloads.size()); + m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset); + m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress}); + + if (m_LsnToPayloadOffsetMap) + { + m_LsnToPayloadOffsetMap->insert_or_assign(OpEntry.OpLsn, PayloadOffset); + } + return OpEntry.OpLsn; +} + +ProjectStore::LogSequenceNumber +ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) +{ + ZEN_ASSERT(m_Mode == EMode::kFull); + + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntry"); + + const CbObject& Core = OpPackage.GetObject(); + const ProjectStore::LogSequenceNumber EntryId = AppendNewOplogEntry(Core); + if (!EntryId) + { + // The oplog has been deleted so just drop this + return EntryId; + } + + // Persist attachments after oplog entry so GC won't find attachments without references + + uint64_t AttachmentBytes = 0; + uint64_t NewAttachmentBytes = 0; + + auto Attachments = OpPackage.GetAttachments(); + + if (!Attachments.empty()) + { + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + std::vector<uint64_t> WriteRawSizes; + + WriteAttachmentBuffers.reserve(Attachments.size()); + WriteRawHashes.reserve(Attachments.size()); + WriteRawSizes.reserve(Attachments.size()); + + for (const auto& Attach : Attachments) + { + ZEN_ASSERT(Attach.IsCompressedBinary()); + + const CompressedBuffer& AttachmentData = Attach.AsCompressedBinary(); + const uint64_t AttachmentSize = AttachmentData.DecodeRawSize(); + WriteAttachmentBuffers.push_back(AttachmentData.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(Attach.GetHash()); + WriteRawSizes.push_back(AttachmentSize); + AttachmentBytes += AttachmentSize; + } + + std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < InsertResults.size(); Index++) + { + if (InsertResults[Index].New) + { + NewAttachmentBytes += WriteRawSizes[Index]; + } + } + } + + ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId.Number, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes)); + + return EntryId; +} + +RefPtr<ProjectStore::OplogStorage> +ProjectStore::Oplog::GetStorage() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + RefPtr<OplogStorage> Storage; + { + RwLock::SharedLockScope _(m_OplogLock); + Storage = m_Storage; + } + return Storage; +} + +ProjectStore::LogSequenceNumber +ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core) +{ + ZEN_ASSERT(m_Mode == EMode::kFull); + + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntry"); + + using namespace std::literals; + + RefPtr<OplogStorage> Storage = GetStorage(); + if (!Storage) + { + return {}; + } + + OplogEntryMapping Mapping = GetMapping(Core); + OplogStorage::AppendOpData OpData = OplogStorage::GetAppendOpData(Core); + + const OplogEntry OpEntry = Storage->AppendOp(OpData); + + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + const ProjectStore::LogSequenceNumber EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry); + if (m_CapturedOps) + { + m_CapturedOps->push_back(OpData.KeyHash); + } + m_MetaValid = false; + + return EntryId; +} + +std::vector<ProjectStore::LogSequenceNumber> +ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores) +{ + ZEN_ASSERT(m_Mode == EMode::kFull); + + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntries"); + + using namespace std::literals; + + RefPtr<OplogStorage> Storage = GetStorage(); + if (!Storage) + { + return std::vector<ProjectStore::LogSequenceNumber>(Cores.size(), LogSequenceNumber{}); + } + + size_t OpCount = Cores.size(); + std::vector<OplogEntryMapping> Mappings; + std::vector<OplogStorage::AppendOpData> OpDatas; + Mappings.resize(OpCount); + OpDatas.resize(OpCount); + + for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) + { + const CbObjectView& Core = Cores[OpIndex]; + OpDatas[OpIndex] = OplogStorage::GetAppendOpData(Core); + Mappings[OpIndex] = GetMapping(Core); + } + + std::vector<OplogEntry> OpEntries = Storage->AppendOps(OpDatas); + + std::vector<ProjectStore::LogSequenceNumber> EntryIds; + EntryIds.resize(OpCount); + { + { + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + if (m_CapturedOps) + { + m_CapturedOps->reserve(m_CapturedOps->size() + OpCount); + } + for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) + { + EntryIds[OpIndex] = RegisterOplogEntry(OplogLock, Mappings[OpIndex], OpEntries[OpIndex]); + if (m_CapturedOps) + { + m_CapturedOps->push_back(OpDatas[OpIndex].KeyHash); + } + } + } + m_MetaValid = false; + } + return EntryIds; +} + +////////////////////////////////////////////////////////////////////////// + +ProjectStore::Project::Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath) +: m_ProjectStore(PrjStore) +, m_CidStore(Store) +, m_OplogStoragePath(BasePath) +, m_LastAccessTimes({std::make_pair(std::string(), GcClock::TickCount())}) +{ +} + +ProjectStore::Project::~Project() +{ + // Only write access times if we have not been explicitly deleted + if (!m_OplogStoragePath.empty()) + { + WriteAccessTimes(); + } +} + +bool +ProjectStore::Project::Exists(const std::filesystem::path& BasePath) +{ + return IsFile(BasePath / "Project.zcb"); +} + +void +ProjectStore::Project::Read() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Project::Read"); + + using namespace std::literals; + + std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv; + + ZEN_DEBUG("project '{}': reading config from '{}'", Identifier, ProjectStateFilePath); + + BasicFile Blob; + Blob.Open(ProjectStateFilePath, BasicFile::Mode::kRead); + + IoBuffer Obj = Blob.ReadAll(); + CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); + + if (ValidationError == CbValidateError::None) + { + CbObject Cfg = LoadCompactBinaryObject(Obj); + + Identifier = std::filesystem::path(Cfg["id"sv].AsU8String()).string(); + RootDir = std::filesystem::path(Cfg["root"sv].AsU8String()).string(); + ProjectRootDir = std::filesystem::path(Cfg["project"sv].AsU8String()).string(); + EngineRootDir = std::filesystem::path(Cfg["engine"sv].AsU8String()).string(); + ProjectFilePath = std::filesystem::path(Cfg["projectfile"sv].AsU8String()).string(); + } + else + { + ZEN_ERROR("validation error {} hit for '{}'", ToString(ValidationError), ProjectStateFilePath); + } + + ReadAccessTimes(); +} + +void +ProjectStore::Project::Write() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Project::Write"); + + using namespace std::literals; + + BinaryWriter Mem; + + CbObjectWriter Cfg; + Cfg << "id"sv << Identifier; + Cfg << "root"sv << PathToUtf8(RootDir); + Cfg << "project"sv << PathToUtf8(ProjectRootDir); + Cfg << "engine"sv << PathToUtf8(EngineRootDir); + Cfg << "projectfile"sv << PathToUtf8(ProjectFilePath); + + Cfg.Save(Mem); + + CreateDirectories(m_OplogStoragePath); + + std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv; + + ZEN_INFO("project '{}': persisting config to '{}'", Identifier, ProjectStateFilePath); + + TemporaryFile::SafeWriteFile(ProjectStateFilePath, Mem.GetView()); +} + +void +ProjectStore::Project::ReadAccessTimes() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + using namespace std::literals; + + std::filesystem::path ProjectAccessTimesFilePath = m_OplogStoragePath / "AccessTimes.zcb"sv; + if (!IsFile(ProjectAccessTimesFilePath)) + { + return; + } + + ZEN_DEBUG("project '{}': reading access times '{}'", Identifier, ProjectAccessTimesFilePath); + + BasicFile Blob; + Blob.Open(ProjectAccessTimesFilePath, BasicFile::Mode::kRead); + + IoBuffer Obj = Blob.ReadAll(); + CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); + + if (ValidationError == CbValidateError::None) + { + CbObject Reader = LoadCompactBinaryObject(Obj); + + uint64_t Count = Reader["count"sv].AsUInt64(0); + if (Count > 0) + { + std::vector<uint64_t> Ticks; + Ticks.reserve(Count); + CbArrayView TicksArray = Reader["ticks"sv].AsArrayView(); + for (CbFieldView& TickView : TicksArray) + { + Ticks.emplace_back(TickView.AsUInt64()); + } + CbArrayView IdArray = Reader["ids"sv].AsArrayView(); + uint64_t Index = 0; + + for (CbFieldView& IdView : IdArray) + { + std::string_view Id = IdView.AsString(); + m_LastAccessTimes.insert_or_assign(std::string(Id), Ticks[Index++]); + } + } + + ////// Legacy format read + { + CbArrayView LastAccessTimes = Reader["lastaccess"sv].AsArrayView(); + for (CbFieldView& Entry : LastAccessTimes) + { + CbObjectView AccessTime = Entry.AsObjectView(); + std::string_view Id = AccessTime["id"sv].AsString(); + GcClock::Tick AccessTick = AccessTime["tick"sv].AsUInt64(); + m_LastAccessTimes.insert_or_assign(std::string(Id), AccessTick); + } + } + } + else + { + ZEN_WARN("project '{}': validation error {} hit for '{}'", Identifier, ToString(ValidationError), ProjectAccessTimesFilePath); + } +} + +void +ProjectStore::Project::WriteAccessTimes() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + using namespace std::literals; + + CbObjectWriter Writer(32 + (m_LastAccessTimes.size() * 16)); + + { + RwLock::SharedLockScope _(m_LastAccessTimesLock); + + Writer.AddInteger("count", gsl::narrow<uint64_t>(m_LastAccessTimes.size())); + Writer.BeginArray("ids"); + + for (const auto& It : m_LastAccessTimes) + { + Writer << It.first; + } + Writer.EndArray(); + Writer.BeginArray("ticks"); + for (const auto& It : m_LastAccessTimes) + { + Writer << gsl::narrow<uint64_t>(It.second); + } + Writer.EndArray(); + } + + CbObject Data = Writer.Save(); + + try + { + CreateDirectories(m_OplogStoragePath); + + std::filesystem::path ProjectAccessTimesFilePath = m_OplogStoragePath / "AccessTimes.zcb"sv; + + ZEN_DEBUG("project '{}': persisting access times for '{}'", Identifier, ProjectAccessTimesFilePath); + + TemporaryFile::SafeWriteFile(ProjectAccessTimesFilePath, Data.GetView()); + } + catch (const std::exception& Err) + { + ZEN_WARN("project '{}': writing access times FAILED, reason: '{}'", Identifier, Err.what()); + } +} + +LoggerRef +ProjectStore::Project::Log() const +{ + return m_ProjectStore->Log(); +} + +std::filesystem::path +ProjectStore::Project::BasePathForOplog(std::string_view OplogId) const +{ + return m_OplogStoragePath / OplogId; +} + +Ref<ProjectStore::Oplog> +ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + RwLock::ExclusiveLockScope _(m_ProjectLock); + + std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); + + try + { + Stopwatch Timer; + + Ref<Oplog> NewLog(new Oplog(Log(), Identifier, OplogId, m_CidStore, OplogBasePath, MarkerPath, Oplog::EMode::kFull)); + m_Oplogs.insert_or_assign(std::string{OplogId}, NewLog); + + NewLog->Write(); + + ZEN_INFO("oplog '{}/{}': created oplog at '{}' in {}", + Identifier, + OplogId, + OplogBasePath, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + + if (m_CapturedOplogs) + { + m_CapturedOplogs->push_back(std::string(OplogId)); + } + + return NewLog; + } + catch (const std::exception&) + { + // In case of failure we need to ensure there's no half constructed entry around + // + // (This is probably already ensured by the try_emplace implementation?) + + m_Oplogs.erase(std::string{OplogId}); + + return {}; + } +} + +Ref<ProjectStore::Oplog> +ProjectStore::Project::ReadOplog(std::string_view OplogId) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::OpenOplog"); + + std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); + + RwLock::SharedLockScope Lock(m_ProjectLock); + + if (auto It = m_Oplogs.find(std::string{OplogId}); It != m_Oplogs.end()) + { + if (Oplog::ExistsAt(OplogBasePath)) + { + return It->second; + } + else + { + return {}; + } + } + + if (Oplog::ExistsAt(OplogBasePath)) + { + Stopwatch Timer; + + Ref<Oplog> ExistingLog(new Oplog(m_ProjectStore->Log(), + Identifier, + OplogId, + m_CidStore, + OplogBasePath, + std::filesystem::path{}, + Oplog::EMode::kBasicReadOnly)); + + ExistingLog->Read(); + Lock.ReleaseNow(); + + ZEN_INFO("oplog '{}/{}': read oplog at '{}' in {}", Identifier, OplogId, OplogBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + + return ExistingLog; + } + return {}; +} + +Ref<ProjectStore::Oplog> +ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bool VerifyPathOnDisk) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::OpenOplog"); + + { + RwLock::SharedLockScope ProjectLock(m_ProjectLock); + + auto OplogIt = m_Oplogs.find(std::string(OplogId)); + + if (OplogIt != m_Oplogs.end()) + { + bool ReOpen = false; + + if (VerifyPathOnDisk) + { + std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); + + if (!Oplog::ExistsAt(OplogBasePath)) + { + // Somebody deleted the oplog on disk behind our back + ProjectLock.ReleaseNow(); + std::filesystem::path DeletePath; + if (!RemoveOplog(OplogId, DeletePath)) + { + ZEN_WARN("Failed to clean up deleted oplog {}/{}", Identifier, OplogId, OplogBasePath); + } + + ReOpen = true; + } + } + + if (!ReOpen) + { + return OplogIt->second; + } + } + } + + std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); + + RwLock::ExclusiveLockScope Lock(m_ProjectLock); + if (auto It = m_Oplogs.find(std::string{OplogId}); It != m_Oplogs.end()) + { + return It->second; + } + if (Oplog::ExistsAt(OplogBasePath)) + { + try + { + Stopwatch Timer; + + Ref<Oplog> ExistingLog(new Oplog(m_ProjectStore->Log(), + Identifier, + OplogId, + m_CidStore, + OplogBasePath, + std::filesystem::path{}, + Oplog::EMode::kFull)); + + m_Oplogs.insert_or_assign(std::string{OplogId}, ExistingLog); + ExistingLog->Read(); + Lock.ReleaseNow(); + + ZEN_INFO("oplog '{}/{}': opened oplog at '{}' in {}", + Identifier, + OplogId, + OplogBasePath, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + + if (AllowCompact) + { + const uint32_t CompactUnusedThreshold = 50; + ExistingLog->CompactIfUnusedExceeds(/*DryRun*/ false, + CompactUnusedThreshold, + fmt::format("Compact on initial open of oplog {}/{}: ", Identifier, OplogId)); + } + return ExistingLog; + } + catch (const std::exception& Ex) + { + ZEN_WARN("oplog '{}/{}': failed to open oplog at '{}': {}", Identifier, OplogId, OplogBasePath, Ex.what()); + m_Oplogs.erase(std::string{OplogId}); + } + } + + return {}; +} + +void +ProjectStore::Oplog::CompactIfUnusedExceeds(bool DryRun, uint32_t CompactUnusedThreshold, std::string_view LogPrefix) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + if (!m_Storage) + { + return; + } + uint32_t UnusedPercent = GetUnusedSpacePercentLocked(); + if (UnusedPercent >= CompactUnusedThreshold) + { + Compact(OplogLock, + DryRun, + /*RetainLSNs*/ m_Storage->MaxLSN() <= + 0xff000000ul, // If we have less than 16 miln entries left of our LSN range, allow renumbering of LSNs + LogPrefix); + } +} + +bool +ProjectStore::Project::TryUnloadOplog(std::string_view OplogId) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + RwLock::ExclusiveLockScope _(m_ProjectLock); + if (auto OplogIt = m_Oplogs.find(std::string(OplogId)); OplogIt != m_Oplogs.end()) + { + Ref<Oplog>& Oplog = OplogIt->second; + + if (Oplog->CanUnload()) + { + m_Oplogs.erase(OplogIt); + return true; + } + return false; + } + + return false; +} + +bool +ProjectStore::Project::RemoveOplog(std::string_view OplogId, std::filesystem::path& OutDeletePath) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + { + RwLock::ExclusiveLockScope _(m_ProjectLock); + + if (auto OplogIt = m_Oplogs.find(std::string(OplogId)); OplogIt == m_Oplogs.end()) + { + std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); + + if (Oplog::ExistsAt(OplogBasePath)) + { + if (!PrepareDirectoryDelete(OplogBasePath, OutDeletePath)) + { + return false; + } + } + } + else + { + Ref<Oplog>& Oplog = OplogIt->second; + if (!Oplog->PrepareForDelete(OutDeletePath)) + { + return false; + } + m_Oplogs.erase(OplogIt); + } + } + m_LastAccessTimesLock.WithExclusiveLock([&]() { m_LastAccessTimes.erase(std::string(OplogId)); }); + return true; +} + +bool +ProjectStore::Project::DeleteOplog(std::string_view OplogId) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + std::filesystem::path DeletePath; + if (!RemoveOplog(OplogId, DeletePath)) + { + return false; + } + + // Erase content on disk + if (!DeletePath.empty()) + { + if (!OplogStorage::Delete(DeletePath)) + { + ZEN_WARN("oplog '{}/{}': failed to remove old oplog path '{}'", Identifier, OplogId, DeletePath); + return false; + } + } + return true; +} + +std::vector<std::string> +ProjectStore::Project::ScanForOplogs() const +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + RwLock::SharedLockScope _(m_ProjectLock); + + std::vector<std::string> Oplogs; + if (Project::Exists(m_OplogStoragePath)) + { + DirectoryContent DirContent; + GetDirectoryContent(m_OplogStoragePath, DirectoryContentFlags::IncludeDirs, DirContent); + Oplogs.reserve(DirContent.Directories.size()); + for (const std::filesystem::path& DirPath : DirContent.Directories) + { + std::string DirName = PathToUtf8(DirPath.filename()); + if (DirName.starts_with("[dropped]")) + { + continue; + } + if (Oplog::ExistsAt(DirPath)) + { + Oplogs.push_back(DirPath.filename().string()); + } + } + } + + return Oplogs; +} + +void +ProjectStore::Project::IterateOplogs(std::function<void(const RwLock::SharedLockScope&, const Oplog&)>&& Fn) const +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + RwLock::SharedLockScope Lock(m_ProjectLock); + + for (auto& Kv : m_Oplogs) + { + Fn(Lock, *Kv.second); + } +} + +void +ProjectStore::Project::IterateOplogs(std::function<void(const RwLock::SharedLockScope&, Oplog&)>&& Fn) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + RwLock::SharedLockScope Lock(m_ProjectLock); + + for (auto& Kv : m_Oplogs) + { + Fn(Lock, *Kv.second); + } +} + +void +ProjectStore::Project::Flush() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Project::Flush"); + + // We only need to flush oplogs that we have already loaded + IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) { Ops.Flush(); }); + WriteAccessTimes(); +} + +void +ProjectStore::Project::Scrub(ScrubContext& Ctx) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + // Scrubbing needs to check all existing oplogs + std::vector<std::string> OpLogs = ScanForOplogs(); + for (const std::string& OpLogId : OpLogs) + { + OpenOplog(OpLogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); + } + IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) { + if (!IsExpired(GcClock::TimePoint::min(), Ops)) + { + Ops.Scrub(Ctx); + } + }); +} + +uint64_t +ProjectStore::Project::TotalSize(const std::filesystem::path& BasePath) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + using namespace std::literals; + + uint64_t Size = 0; + std::filesystem::path AccessTimesFilePath = BasePath / "AccessTimes.zcb"sv; + if (IsFile(AccessTimesFilePath)) + { + Size += FileSizeFromPath(AccessTimesFilePath); + } + std::filesystem::path ProjectFilePath = BasePath / "Project.zcb"sv; + if (IsFile(ProjectFilePath)) + { + Size += FileSizeFromPath(ProjectFilePath); + } + + return Size; +} + +uint64_t +ProjectStore::Project::TotalSize() const +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + uint64_t Result = TotalSize(m_OplogStoragePath); + { + std::vector<std::string> OpLogs = ScanForOplogs(); + for (const std::string& OpLogId : OpLogs) + { + std::filesystem::path OplogBasePath = BasePathForOplog(OpLogId); + Result += Oplog::TotalSize(OplogBasePath); + } + } + return Result; +} + +bool +ProjectStore::Project::PrepareForDelete(std::filesystem::path& OutDeletePath) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + RwLock::ExclusiveLockScope _(m_ProjectLock); + + for (auto& It : m_Oplogs) + { + It.second->ResetState(); + } + + m_Oplogs.clear(); + + bool Success = PrepareDirectoryDelete(m_OplogStoragePath, OutDeletePath); + if (!Success) + { + return false; + } + m_OplogStoragePath.clear(); + return true; +} + +void +ProjectStore::Project::EnableUpdateCapture() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + m_ProjectLock.WithExclusiveLock([&]() { + if (m_UpdateCaptureRefCounter == 0) + { + ZEN_ASSERT(!m_CapturedOplogs); + m_CapturedOplogs = std::make_unique<std::vector<std::string>>(); + } + else + { + ZEN_ASSERT(m_CapturedOplogs); + } + m_UpdateCaptureRefCounter++; + }); +} + +void +ProjectStore::Project::DisableUpdateCapture() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + m_ProjectLock.WithExclusiveLock([&]() { + ZEN_ASSERT(m_CapturedOplogs); + ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); + m_UpdateCaptureRefCounter--; + if (m_UpdateCaptureRefCounter == 0) + { + m_CapturedOplogs.reset(); + } + }); +} + +std::vector<std::string> +ProjectStore::Project::GetCapturedOplogsLocked() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + if (m_CapturedOplogs) + { + return *m_CapturedOplogs; + } + return {}; +} + +std::vector<RwLock::SharedLockScope> +ProjectStore::Project::GetGcReferencerLocks() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + std::vector<RwLock::SharedLockScope> Locks; + Locks.emplace_back(RwLock::SharedLockScope(m_ProjectLock)); + Locks.reserve(1 + m_Oplogs.size()); + for (auto& Kv : m_Oplogs) + { + Locks.emplace_back(Kv.second->GetGcReferencerLock()); + } + return Locks; +} + +bool +ProjectStore::Project::IsExpired(const std::string& EntryName, + const std::filesystem::path& MarkerPath, + const GcClock::TimePoint ExpireTime) const +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + if (!MarkerPath.empty()) + { + std::error_code Ec; + if (IsFile(MarkerPath, Ec)) + { + if (Ec) + { + ZEN_WARN("{} '{}{}{}', Failed to check expiry via marker file '{}', assuming not expired", + EntryName.empty() ? "project"sv : "oplog"sv, + Identifier, + EntryName.empty() ? ""sv : "/"sv, + EntryName, + MarkerPath.string()); + return false; + } + return false; + } + } + + const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); + + RwLock::SharedLockScope _(m_LastAccessTimesLock); + if (auto It = m_LastAccessTimes.find(EntryName); It != m_LastAccessTimes.end()) + { + if (It->second <= ExpireTicks) + { + return true; + } + } + return false; +} + +bool +ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime) const +{ + return IsExpired(std::string(), ProjectFilePath, ExpireTime); +} + +bool +ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog) const +{ + return IsExpired(Oplog.OplogId(), Oplog.MarkerPath(), ExpireTime); +} + +bool +ProjectStore::Project::IsOplogTouchedSince(const GcClock::TimePoint TouchTime, std::string_view Oplog) const +{ + const GcClock::Tick TouchTicks = TouchTime.time_since_epoch().count(); + + RwLock::ExclusiveLockScope _(m_LastAccessTimesLock); + if (auto It = m_LastAccessTimes.find(std::string(Oplog)); It != m_LastAccessTimes.end()) + { + if (It->second > TouchTicks) + { + return true; + } + } + return false; +} + +bool +ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime, std::string_view OplogId) const +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + using namespace std::literals; + + { + RwLock::SharedLockScope Lock(m_ProjectLock); + auto OplogIt = m_Oplogs.find(std::string(OplogId)); + + if (OplogIt != m_Oplogs.end()) + { + Lock.ReleaseNow(); + return IsExpired(ExpireTime, *OplogIt->second); + } + } + + std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); + std::optional<CbObject> OplogConfig = Oplog::ReadStateFile(OplogBasePath, [this]() { return Log(); }); + std::filesystem::path MarkerPath = OplogConfig.has_value() ? OplogConfig.value()["gcpath"sv].AsU8String() : std::u8string(); + return IsExpired(std::string(OplogId), MarkerPath, ExpireTime); +} + +void +ProjectStore::Project::TouchProject() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + RwLock::ExclusiveLockScope _(m_LastAccessTimesLock); + m_LastAccessTimes.insert_or_assign(std::string(), GcClock::TickCount()); +} + +void +ProjectStore::Project::TouchOplog(std::string_view Oplog) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_ASSERT(!Oplog.empty()); + + RwLock::ExclusiveLockScope _(m_LastAccessTimesLock); + m_LastAccessTimes.insert_or_assign(std::string(Oplog), GcClock::TickCount()); +} + +GcClock::TimePoint +ProjectStore::Project::LastOplogAccessTime(std::string_view Oplog) const +{ + RwLock::SharedLockScope _(m_LastAccessTimesLock); + if (auto It = m_LastAccessTimes.find(std::string(Oplog)); It != m_LastAccessTimes.end()) + { + return GcClock::TimePointFromTick(It->second); + } + return GcClock::TimePoint::min(); +} + +////////////////////////////////////////////////////////////////////////// + +ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, const Configuration& Config) +: m_Log(logging::Get("project")) +, m_Gc(Gc) +, m_CidStore(Store) +, m_ProjectBasePath(BasePath) +, m_Config(Config) +, m_DiskWriteBlocker(Gc.GetDiskWriteBlocker()) +{ + ZEN_INFO("initializing project store at '{}'", m_ProjectBasePath); + // m_Log.set_level(spdlog::level::debug); + m_Gc.AddGcStorage(this); + m_Gc.AddGcReferencer(*this); + m_Gc.AddGcReferenceLocker(*this); +} + +ProjectStore::~ProjectStore() +{ + ZEN_INFO("closing project store at '{}'", m_ProjectBasePath); + m_Gc.RemoveGcReferenceLocker(*this); + m_Gc.RemoveGcReferencer(*this); + m_Gc.RemoveGcStorage(this); +} + +std::filesystem::path +ProjectStore::BasePathForProject(std::string_view ProjectId) +{ + return m_ProjectBasePath / ProjectId; +} + +void +ProjectStore::DiscoverProjects() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + if (!IsDir(m_ProjectBasePath)) + { + return; + } + + DirectoryContent DirContent; + GetDirectoryContent(m_ProjectBasePath, DirectoryContentFlags::IncludeDirs, DirContent); + + for (const std::filesystem::path& DirPath : DirContent.Directories) + { + std::string DirName = PathToUtf8(DirPath.filename()); + if (DirName.starts_with("[dropped]")) + { + continue; + } + OpenProject(DirName); + } +} + +void +ProjectStore::IterateProjects(std::function<void(Project& Prj)>&& Fn) +{ + RwLock::SharedLockScope _(m_ProjectsLock); + + for (auto& Kv : m_Projects) + { + Fn(*Kv.second.Get()); + } +} + +void +ProjectStore::Flush() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::Flush"); + + ZEN_INFO("flushing project store at '{}'", m_ProjectBasePath); + std::vector<Ref<Project>> Projects; + { + RwLock::SharedLockScope _(m_ProjectsLock); + Projects.reserve(m_Projects.size()); + + for (auto& Kv : m_Projects) + { + Projects.push_back(Kv.second); + } + } + + WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); + try + { + for (const Ref<Project>& Project : Projects) + { + Work.ScheduleWork( + WorkerPool, + [this, Project](std::atomic<bool>&) { + try + { + Project->Flush(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what()); + } + }, + 0); + } + } + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed projects in {}. Reason: '{}'", m_ProjectBasePath, Ex.what()); + } + + Work.Wait(); +} + +void +ProjectStore::ScrubStorage(ScrubContext& Ctx) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_INFO("scrubbing '{}'", m_ProjectBasePath); + + DiscoverProjects(); + + std::vector<Ref<Project>> Projects; + { + RwLock::SharedLockScope Lock(m_ProjectsLock); + Projects.reserve(m_Projects.size()); + + for (auto& Kv : m_Projects) + { + if (Kv.second->IsExpired(GcClock::TimePoint::min())) + { + continue; + } + Projects.push_back(Kv.second); + } + } + for (const Ref<Project>& Project : Projects) + { + Project->Scrub(Ctx); + } +} + +GcStorageSize +ProjectStore::StorageSize() const +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::StorageSize"); + + using namespace std::literals; + + GcStorageSize Result; + { + if (IsDir(m_ProjectBasePath)) + { + DirectoryContent ProjectsFolderContent; + GetDirectoryContent(m_ProjectBasePath, DirectoryContentFlags::IncludeDirs, ProjectsFolderContent); + + for (const std::filesystem::path& ProjectBasePath : ProjectsFolderContent.Directories) + { + std::filesystem::path ProjectStateFilePath = ProjectBasePath / "Project.zcb"sv; + if (IsFile(ProjectStateFilePath)) + { + Result.DiskSize += Project::TotalSize(ProjectBasePath); + DirectoryContent DirContent; + GetDirectoryContent(ProjectBasePath, DirectoryContentFlags::IncludeDirs, DirContent); + for (const std::filesystem::path& OplogBasePath : DirContent.Directories) + { + Result.DiskSize += Oplog::TotalSize(OplogBasePath); + } + } + } + } + } + return Result; +} + +Ref<ProjectStore::Project> +ProjectStore::OpenProject(std::string_view ProjectId) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::OpenProject"); + + { + RwLock::SharedLockScope _(m_ProjectsLock); + if (auto ProjIt = m_Projects.find(std::string{ProjectId}); ProjIt != m_Projects.end()) + { + return ProjIt->second; + } + } + + RwLock::ExclusiveLockScope _(m_ProjectsLock); + if (auto ProjIt = m_Projects.find(std::string{ProjectId}); ProjIt != m_Projects.end()) + { + return ProjIt->second; + } + + std::filesystem::path BasePath = BasePathForProject(ProjectId); + + if (Project::Exists(BasePath)) + { + try + { + ZEN_INFO("project '{}': opening project at '{}'", ProjectId, BasePath); + + Ref<Project>& Prj = + m_Projects + .try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath))) + .first->second; + Prj->Identifier = ProjectId; + Prj->Read(); + return Prj; + } + catch (const std::exception& e) + { + ZEN_WARN("project '{}': failed to open at {} ({})", ProjectId, BasePath, e.what()); + m_Projects.erase(std::string{ProjectId}); + } + } + + return {}; +} + +Ref<ProjectStore::Project> +ProjectStore::NewProject(const std::filesystem::path& BasePath, + std::string_view ProjectId, + const std::filesystem::path& RootDir, + const std::filesystem::path& EngineRootDir, + const std::filesystem::path& ProjectRootDir, + const std::filesystem::path& ProjectFilePath) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::NewProject"); + + RwLock::ExclusiveLockScope _(m_ProjectsLock); + + ZEN_INFO("project '{}': creating project at '{}'", ProjectId, BasePath); + + Ref<Project>& Prj = + m_Projects.try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath))) + .first->second; + Prj->Identifier = ProjectId; + Prj->RootDir = RootDir; + Prj->EngineRootDir = EngineRootDir; + Prj->ProjectRootDir = ProjectRootDir; + Prj->ProjectFilePath = ProjectFilePath; + Prj->Write(); + + if (m_CapturedProjects) + { + m_CapturedProjects->push_back(std::string(ProjectId)); + } + + return Prj; +} + +bool +ProjectStore::UpdateProject(std::string_view ProjectId, + const std::filesystem::path& RootDir, + const std::filesystem::path& EngineRootDir, + const std::filesystem::path& ProjectRootDir, + const std::filesystem::path& ProjectFilePath) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::UpdateProject"); + + RwLock::ExclusiveLockScope ProjectsLock(m_ProjectsLock); + + auto ProjIt = m_Projects.find(std::string{ProjectId}); + + if (ProjIt == m_Projects.end()) + { + return false; + } + Ref<ProjectStore::Project> Prj = ProjIt->second; + + Prj->RootDir = RootDir; + Prj->EngineRootDir = EngineRootDir; + Prj->ProjectRootDir = ProjectRootDir; + Prj->ProjectFilePath = ProjectFilePath; + Prj->Write(); + + ZEN_INFO("project '{}': updated", ProjectId); + + return true; +} + +bool +ProjectStore::RemoveProject(std::string_view ProjectId, std::filesystem::path& OutDeletePath) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + RwLock::ExclusiveLockScope ProjectsLock(m_ProjectsLock); + + auto ProjIt = m_Projects.find(std::string{ProjectId}); + + if (ProjIt == m_Projects.end()) + { + return true; + } + + bool Success = ProjIt->second->PrepareForDelete(OutDeletePath); + + if (!Success) + { + return false; + } + m_Projects.erase(ProjIt); + return true; +} + +bool +ProjectStore::DeleteProject(std::string_view ProjectId) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::DeleteProject"); + + ZEN_INFO("project '{}': deleting", ProjectId); + + std::filesystem::path DeletePath; + if (!RemoveProject(ProjectId, DeletePath)) + { + return false; + } + + if (!DeletePath.empty()) + { + if (!DeleteDirectories(DeletePath)) + { + ZEN_WARN("project '{}': failed to remove old project path '{}'", ProjectId, DeletePath); + return false; + } + } + + return true; +} + +bool +ProjectStore::Exists(std::string_view ProjectId) +{ + return Project::Exists(BasePathForProject(ProjectId)); +} + +CbArray +ProjectStore::GetProjectsList() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::GetProjectsList"); + + using namespace std::literals; + + DiscoverProjects(); + + CbWriter Response; + Response.BeginArray(); + + IterateProjects([&Response](Project& Prj) { + Response.BeginObject(); + Response << "Id"sv << Prj.Identifier; + Response << "RootDir"sv << Prj.RootDir.string(); + Response << "ProjectRootDir"sv << PathToUtf8(Prj.ProjectRootDir); + Response << "EngineRootDir"sv << PathToUtf8(Prj.EngineRootDir); + Response << "ProjectFilePath"sv << PathToUtf8(Prj.ProjectFilePath); + Response.EndObject(); + }); + Response.EndArray(); + return Response.Save().AsArray(); +} + +CbObject +ProjectStore::GetProjectFiles(LoggerRef InLog, Project& Project, Oplog& Oplog, const std::unordered_set<std::string>& WantedFieldNames) +{ + auto Log = [&InLog]() { return InLog; }; + + using namespace std::literals; + + const bool WantsAllFields = WantedFieldNames.empty(); + + const bool WantsIdField = WantsAllFields || WantedFieldNames.contains("id"); + const bool WantsClientPathField = WantsAllFields || WantedFieldNames.contains("clientpath"); + const bool WantsServerPathField = WantsAllFields || WantedFieldNames.contains("serverpath"); + const bool WantsRawSizeField = WantsAllFields || WantedFieldNames.contains("rawsize"); + const bool WantsSizeField = WantsAllFields || WantedFieldNames.contains("size"); + + std::vector<Oid> Ids; + std::vector<std::string> ServerPaths; + std::vector<std::string> ClientPaths; + std::vector<uint64_t> Sizes; + std::vector<uint64_t> RawSizes; + + size_t Count = 0; + Oplog.IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) { + if (WantsIdField || WantsRawSizeField || WantsSizeField) + { + Ids.push_back(Id); + } + if (WantsServerPathField) + { + ServerPaths.push_back(std::string(ServerPath)); + } + if (WantsClientPathField) + { + ClientPaths.push_back(std::string(ClientPath)); + } + Count++; + }); + if (WantsRawSizeField || WantsSizeField) + { + if (WantsSizeField) + { + Sizes.resize(Ids.size(), (uint64_t)-1); + } + if (WantsRawSizeField) + { + RawSizes.resize(Ids.size(), (uint64_t)-1); + } + + Oplog.IterateChunks( + Project.RootDir, + Ids, + false, + [&](size_t Index, const IoBuffer& Payload, uint64_t /*ModTag*/) { + try + { + if (Payload) + { + if (Payload.GetContentType() == ZenContentType::kCompressedBinary) + { + if (WantsRawSizeField) + { + IoHash _; + if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index])) + { + if (WantsSizeField) + { + Sizes[Index] = Payload.GetSize(); + } + } + else + { + ZEN_WARN("oplog '{}/{}': payload for project file info for id {} is not a valid compressed binary.", + Project.Identifier, + Oplog.OplogId(), + Ids[Index]); + } + } + else if (WantsSizeField) + { + Sizes[Index] = Payload.GetSize(); + } + } + else + { + if (WantsSizeField) + { + Sizes[Index] = Payload.GetSize(); + } + if (WantsRawSizeField) + { + RawSizes[Index] = Payload.GetSize(); + } + } + } + else + { + ZEN_WARN("oplog '{}/{}': failed getting payload for project file info for id {}.", + Project.Identifier, + Oplog.OplogId(), + Ids[Index]); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("oplog '{}/{}': failed getting project file info for id {}. Reason: '{}'", + Project.Identifier, + Oplog.OplogId(), + Ids[Index], + Ex.what()); + } + return true; + }, + &GetSmallWorkerPool(EWorkloadType::Burst), + 256u * 1024u); + } + + CbObjectWriter Response(64u + Count * ((WantsIdField ? (5 + sizeof(Oid::OidBits)) : 0) + (WantsServerPathField ? 64 : 0) + + (WantsClientPathField ? 64 : 0) + (WantsSizeField ? 16 : 0) + (WantsRawSizeField ? 16 : 0))); + Response.BeginArray("files"sv); + for (size_t Index = 0; Index < Count; Index++) + { + Response.BeginObject(); + if (WantsIdField) + { + Response << "id"sv << Ids[Index]; + } + if (WantsServerPathField) + { + Response << "serverpath"sv << ServerPaths[Index]; + } + if (WantsClientPathField) + { + Response << "clientpath"sv << ClientPaths[Index]; + } + if (WantsSizeField && Sizes[Index] != (uint64_t)-1) + { + Response << "size"sv << Sizes[Index]; + } + if (WantsRawSizeField && RawSizes[Index] != (uint64_t)-1) + { + Response << "rawsize"sv << RawSizes[Index]; + } + Response.EndObject(); + } + Response.EndArray(); + + return Response.Save(); +} + +CbObject +ProjectStore::GetProjectChunkInfos(LoggerRef InLog, Project& Project, Oplog& Oplog, const std::unordered_set<std::string>& WantedFieldNames) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("ProjectStore::GetProjectChunkInfos"); + + auto Log = [&InLog]() { return InLog; }; + + using namespace std::literals; + + const bool WantsAllFields = WantedFieldNames.empty(); + + const bool WantsIdField = WantsAllFields || WantedFieldNames.contains("id"); + const bool WantsRawHashField = WantsAllFields || WantedFieldNames.contains("rawhash"); + const bool WantsRawSizeField = WantsAllFields || WantedFieldNames.contains("rawsize"); + const bool WantsSizeField = WantsAllFields || WantedFieldNames.contains("size"); + + std::vector<Oid> Ids; + std::vector<IoHash> Hashes; + std::vector<uint64_t> RawSizes; + std::vector<uint64_t> Sizes; + + size_t Count = 0; + size_t EstimatedCount = Oplog.OplogCount(); + if (WantsIdField) + { + Ids.reserve(EstimatedCount); + } + if (WantsRawHashField || WantsRawSizeField || WantsSizeField) + { + Hashes.reserve(EstimatedCount); + } + Oplog.IterateChunkMap([&](const Oid& Id, const IoHash& Hash) { + if (WantsIdField) + { + Ids.push_back(Id); + } + if (WantsRawHashField || WantsRawSizeField || WantsSizeField) + { + Hashes.push_back(Hash); + } + Count++; + }); + + if (WantsRawSizeField || WantsSizeField) + { + if (WantsRawSizeField) + { + RawSizes.resize(Hashes.size(), (uint64_t)-1); + } + if (WantsSizeField) + { + Sizes.resize(Hashes.size(), (uint64_t)-1); + } + + WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); + (void)Oplog.IterateChunks( + Hashes, + false, + [&](size_t Index, const IoBuffer& Payload, uint64_t /*ModTag*/) -> bool { + try + { + if (Payload) + { + if (Payload.GetContentType() == ZenContentType::kCompressedBinary) + { + if (WantsRawSizeField) + { + ZEN_ASSERT_SLOW(RawSizes[Index] == (uint64_t)-1); + IoHash _; + if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index])) + { + if (WantsSizeField) + { + ZEN_ASSERT_SLOW(Sizes[Index] == (uint64_t)-1); + Sizes[Index] = Payload.GetSize(); + } + } + else + { + ZEN_WARN("oplog '{}/{}': payload for project chunk for id {} is not a valid compressed binary.", + Project.Identifier, + Oplog.OplogId(), + Ids[Index]); + } + } + else if (WantsSizeField) + { + ZEN_ASSERT_SLOW(Sizes[Index] == (uint64_t)-1); + Sizes[Index] = Payload.GetSize(); + } + } + else + { + if (WantsSizeField) + { + ZEN_ASSERT_SLOW(Sizes[Index] == (uint64_t)-1); + Sizes[Index] = Payload.GetSize(); + } + if (WantsRawSizeField) + { + ZEN_ASSERT_SLOW(Sizes[Index] == (uint64_t)-1); + RawSizes[Index] = Payload.GetSize(); + } + } + } + else + { + ZEN_WARN("oplog '{}/{}': failed getting payload for chunk for id {}", + Project.Identifier, + Oplog.OplogId(), + Ids[Index]); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("oplog '{}/{}': failed getting project chunk info for id {}. Reason: '{}'", + Project.Identifier, + Oplog.OplogId(), + Ids[Index], + Ex.what()); + } + return true; + }, + &WorkerPool, + 256u * 1024u); + } + + CbObjectWriter Response(64u + Count * ((WantsIdField ? (5 + sizeof(Oid::OidBits)) : 0) + + (WantsRawHashField ? (10 + sizeof(IoHash::Hash)) : 0) + (WantsSizeField ? 16 : 0) + + (WantsRawSizeField ? 16 : 0))); + Response.BeginArray("chunkinfos"sv); + + for (size_t Index = 0; Index < Count; Index++) + { + Response.BeginObject(); + if (WantsIdField) + { + Response << "id"sv << Ids[Index]; + } + if (WantsRawHashField) + { + Response << "rawhash"sv << Hashes[Index]; + } + if (WantsSizeField && Sizes[Index] != (uint64_t)-1) + { + Response << "size"sv << Sizes[Index]; + } + if (WantsRawSizeField && RawSizes[Index] != (uint64_t)-1) + { + Response << "rawsize"sv << RawSizes[Index]; + } + Response.EndObject(); + } + Response.EndArray(); + + return Response.Save(); +} + +CbObject +ProjectStore::GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, const Oid& ChunkId) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("ProjectStore::GetChunkInfo"); + + using namespace std::literals; + + auto Log = [&InLog]() { return InLog; }; + + IoBuffer Chunk = Oplog.FindChunk(Project.RootDir, ChunkId, nullptr); + if (!Chunk) + { + return {}; + } + + uint64_t ChunkSize = Chunk.GetSize(); + if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash RawHash; + uint64_t RawSize; + bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize); + if (!IsCompressed) + { + throw std::runtime_error( + fmt::format("Chunk info request for malformed chunk id '{}/{}'/'{}'", Project.Identifier, Oplog.OplogId(), ChunkId)); + } + ChunkSize = RawSize; + } + + CbObjectWriter Response; + Response << "size"sv << ChunkSize; + return Response.Save(); +} + +static ProjectStore::GetChunkRangeResult +ExtractRange(IoBuffer&& Chunk, uint64_t Offset, uint64_t Size, ZenContentType AcceptType) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + ProjectStore::GetChunkRangeResult Result; + + Result.ContentType = Chunk.GetContentType(); + + if (Result.ContentType == ZenContentType::kCompressedBinary) + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize); + if (!Compressed) + { + Result.Error = ProjectStore::GetChunkRangeResult::EError::MalformedContent; + Result.ErrorDescription = fmt::format("Malformed payload, not a compressed buffer"); + return Result; + } + + const bool IsFullRange = (Offset == 0) && ((Size == ~(0ull)) || (Size == RawSize)); + + if (IsFullRange) + { + if (AcceptType == ZenContentType::kBinary) + { + Result.Chunk = Compressed.DecompressToComposite(); + Result.ContentType = ZenContentType::kBinary; + } + else + { + Result.Chunk = Compressed.GetCompressed(); + } + Result.RawSize = 0; + } + else + { + if (Size == ~(0ull) || (Offset + Size) > RawSize) + { + if (Offset < RawSize) + { + Size = RawSize - Offset; + } + else + { + Size = 0; + } + } + + if (Size == 0) + { + Result.Error = ProjectStore::GetChunkRangeResult::EError::OutOfRange; + Result.ErrorDescription = + fmt::format("Chunk request for range outside of compressed chunk. Offset: {}, Size: {}, ChunkSize: {}", + Offset, + Size, + RawSize); + return Result; + } + + if (AcceptType == ZenContentType::kBinary) + { + Result.Chunk = CompositeBuffer(Compressed.Decompress(Offset, Size)); + Result.ContentType = ZenContentType::kBinary; + } + else + { + // Value will be a range of compressed blocks that covers the requested range + // The client will have to compensate for any offsets that do not land on an even block size multiple + Result.Chunk = Compressed.GetRange(Offset, Size).GetCompressed(); + } + Result.RawSize = RawSize; + } + Result.RawHash = RawHash; + } + else + { + const uint64_t ChunkSize = Chunk.GetSize(); + + const bool IsFullRange = (Offset == 0) && ((Size == ~(0ull)) || (Size == ChunkSize)); + if (IsFullRange) + { + Result.Chunk = CompositeBuffer(SharedBuffer(std::move(Chunk))); + Result.RawSize = 0; + } + else + { + if (Size == ~(0ull) || (Offset + Size) > ChunkSize) + { + if (Offset < ChunkSize) + { + Size = ChunkSize - Offset; + } + else + { + Size = 0; + } + } + + if (Size == 0) + { + Result.Error = ProjectStore::GetChunkRangeResult::EError::OutOfRange; + Result.ErrorDescription = + fmt::format("Chunk request for range outside of compressed chunk. Offset: {}, Size: {}, ChunkSize: {}", + Offset, + Size, + ChunkSize); + return Result; + } + + Result.Chunk = CompositeBuffer(SharedBuffer(IoBuffer(std::move(Chunk), Offset, Size))); + Result.RawSize = ChunkSize; + } + } + Result.Error = ProjectStore::GetChunkRangeResult::EError::Ok; + return Result; +} + +ProjectStore::GetChunkRangeResult +ProjectStore::GetChunkRange(LoggerRef InLog, + Project& Project, + Oplog& Oplog, + const Oid& ChunkId, + uint64_t Offset, + uint64_t Size, + ZenContentType AcceptType, + uint64_t* OptionalInOutModificationTag) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + ZEN_TRACE_CPU("ProjectStore::GetChunkRange"); + + auto Log = [&InLog]() { return InLog; }; + + uint64_t OldTag = OptionalInOutModificationTag == nullptr ? 0 : *OptionalInOutModificationTag; + IoBuffer Chunk = Oplog.FindChunk(Project.RootDir, ChunkId, OptionalInOutModificationTag); + if (!Chunk) + { + return GetChunkRangeResult{.Error = GetChunkRangeResult::EError::NotFound, + .ErrorDescription = fmt::format("Chunk range request for chunk {}/{}/{} failed, payload not found", + Project.Identifier, + Oplog.OplogId(), + ChunkId)}; + } + if (OptionalInOutModificationTag != nullptr && OldTag == *OptionalInOutModificationTag) + { + return {.Error = GetChunkRangeResult::EError::NotModified}; + } + + return ExtractRange(std::move(Chunk), Offset, Size, AcceptType); +} + +IoBuffer +ProjectStore::GetChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("ProjectStore::GetChunk"); + ZEN_UNUSED(Project, Oplog); + + IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash); + + if (!Chunk) + { + return {}; + } + + Chunk.SetContentType(ZenContentType::kCompressedBinary); + return Chunk; +} + +IoBuffer +ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const Oid& ChunkId) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("ProjectStore::GetChunk"); + + Ref<Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {}; + } + Ref<Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact */ false, /*VerifyPathOnDisk*/ false); + if (!Oplog) + { + return {}; + } + return Oplog->FindChunk(Project->RootDir, ChunkId, /*OptOutModificationTag*/ nullptr); +} + +IoBuffer +ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const IoHash& Cid) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("ProjectStore::GetChunk"); + + Ref<Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {}; + } + Ref<Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact */ false, /*VerifyPathOnDisk*/ false); + if (!Oplog) + { + return {}; + } + return m_CidStore.FindChunkByCid(Cid); +} + +bool +ProjectStore::PutChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash, IoBuffer&& Chunk) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("ProjectStore::PutChunk"); + + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); + if (RawHash != ChunkHash) + { + throw std::runtime_error( + fmt::format("Chunk request for invalid payload format for chunk {}/{}/{}", Project.Identifier, Oplog.OplogId(), ChunkHash)); + } + + Oplog.CaptureAddedAttachments(std::vector<IoHash>{ChunkHash}); + CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk, ChunkHash); + return Result.New; +} + +std::vector<ProjectStore::ChunkResult> +ProjectStore::GetChunks(Project& Project, Oplog& Oplog, std::span<const ChunkRequest> Requests) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("ProjectStore::GetChunks"); + + ZEN_ASSERT(!Requests.empty()); + + std::vector<ProjectStore::ChunkResult> Results; + size_t RequestCount = Requests.size(); + + Results.resize(RequestCount); + + if (RequestCount > 1) + { + std::vector<IoHash> ChunkRawHashes; + std::vector<size_t> ChunkRawHashesRequestIndex; + std::vector<Oid> ChunkIds; + std::vector<size_t> ChunkIdsRequestIndex; + + ChunkRawHashes.reserve(RequestCount); + ChunkRawHashesRequestIndex.reserve(RequestCount); + ChunkIds.reserve(RequestCount); + ChunkIdsRequestIndex.reserve(RequestCount); + + for (size_t RequestIndex = 0; RequestIndex < Requests.size(); RequestIndex++) + { + const ChunkRequest& Request = Requests[RequestIndex]; + if (Request.Id.index() == 0) + { + ChunkRawHashes.push_back(std::get<IoHash>(Request.Id)); + ChunkRawHashesRequestIndex.push_back(RequestIndex); + } + else + { + ChunkIds.push_back(std::get<Oid>(Request.Id)); + ChunkIdsRequestIndex.push_back(RequestIndex); + } + } + + WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); + if (!ChunkRawHashes.empty()) + { + Oplog.IterateChunks( + ChunkRawHashes, + true, + [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool { + if (Payload) + { + size_t RequestIndex = ChunkRawHashesRequestIndex[Index]; + const ChunkRequest& Request = Requests[RequestIndex]; + ChunkResult& Result = Results[RequestIndex]; + Result.Exists = true; + if (!Request.SkipData) + { + Result.ChunkBuffer = std::move(Payload); + Result.ChunkBuffer.MakeOwned(); + } + Result.ModTag = ModTag; + } + return true; + }, + &WorkerPool, + 8u * 1024); + } + if (!ChunkIdsRequestIndex.empty()) + { + Oplog.IterateChunks( + Project.RootDir, + ChunkIds, + true, + [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool { + if (Payload) + { + size_t RequestIndex = ChunkIdsRequestIndex[Index]; + const ChunkRequest& Request = Requests[RequestIndex]; + ChunkResult& Result = Results[RequestIndex]; + Result.Exists = true; + if (!Request.SkipData) + { + Result.ChunkBuffer = std::move(Payload); + Result.ChunkBuffer.MakeOwned(); + } + Result.ModTag = ModTag; + } + return true; + }, + &WorkerPool, + 8u * 1024); + } + } + else + { + const ChunkRequest& Request = Requests.front(); + ChunkResult& Result = Results.front(); + + if (Request.Id.index() == 0) + { + const IoHash& ChunkHash = std::get<IoHash>(Request.Id); + IoBuffer Payload = m_CidStore.FindChunkByCid(ChunkHash); + if (Payload) + { + Result.Exists = true; + Result.ModTag = GetModificationTagFromRawHash(ChunkHash); + if (!Request.SkipData) + { + Result.ChunkBuffer = std::move(Payload); + Result.ChunkBuffer.MakeOwned(); + } + } + } + else + { + const Oid& ChunkId = std::get<Oid>(Request.Id); + uint64_t ModTag = 0; + IoBuffer Payload = Oplog.FindChunk(Project.RootDir, ChunkId, &ModTag); + if (Payload) + { + Result.Exists = true; + Result.ModTag = ModTag; + if (!Request.SkipData) + { + Result.ChunkBuffer = std::move(Payload); + Result.ChunkBuffer.MakeOwned(); + } + } + } + } + return Results; +} + +std::vector<ProjectStore::ChunkRequest> +ProjectStore::ParseChunksRequests(Project& Project, Oplog& Oplog, const CbObject& Cb) +{ + ZEN_TRACE_CPU("Store::Rpc::getchunks"); + + using namespace std::literals; + + std::vector<ChunkRequest> Requests; + + if (auto RequestFieldView = Cb["Request"sv]; RequestFieldView.IsObject()) + { + CbObjectView RequestView = RequestFieldView.AsObjectView(); + bool SkipData = RequestView["SkipData"].AsBool(false); + CbArrayView ChunksArray = RequestView["Chunks"sv].AsArrayView(); + + size_t RequestCount = ChunksArray.Num(); + if (RequestCount > 0) + { + Requests.reserve(RequestCount); + for (CbFieldView FieldView : ChunksArray) + { + CbObjectView ChunkObject = FieldView.AsObjectView(); + ChunkRequest ChunkRequest = {.Offset = ChunkObject["Offset"sv].AsUInt64(0), + .Size = ChunkObject["Size"sv].AsUInt64((uint64_t)-1), + .SkipData = SkipData}; + if (CbFieldView InputModificationTagView = ChunkObject.FindView("ModTag"); InputModificationTagView.IsInteger()) + { + ChunkRequest.ModTag = InputModificationTagView.AsUInt64(); + } + if (CbFieldView RawHashView = ChunkObject.FindView("RawHash"sv); RawHashView.IsHash()) + { + const IoHash ChunkHash = RawHashView.AsHash(); + ChunkRequest.Id = ChunkHash; + } + else if (CbFieldView IdView = ChunkObject.FindView("Oid"sv); IdView.IsObjectId()) + { + const Oid ChunkId = IdView.AsObjectId(); + ChunkRequest.Id = ChunkId; + } + else + { + throw std::runtime_error( + fmt::format("oplog '{}/{}': malformed getchunks rpc request object, chunk request has no identifier", + Project.Identifier, + Oplog.OplogId())); + } + Requests.emplace_back(std::move(ChunkRequest)); + } + } + } + else if (CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); ChunksArray) + { + // Legacy full chunks only by rawhash + + size_t RequestCount = ChunksArray.Num(); + + Requests.reserve(RequestCount); + + std::vector<IoHash> Cids; + Cids.reserve(ChunksArray.Num()); + for (CbFieldView FieldView : ChunksArray) + { + Requests.push_back(ProjectStore::ChunkRequest{.Id = FieldView.AsHash()}); + } + } + else + { + throw std::runtime_error(fmt::format("oplog '{}/{}': malformed getchunks rpc request object", Project.Identifier, Oplog.OplogId())); + } + return Requests; +} + +CbPackage +ProjectStore::WriteChunksRequestResponse(Project& Project, + Oplog& Oplog, + std::vector<ChunkRequest>&& Requests, + std::vector<ChunkResult>&& Results) +{ + using namespace std::literals; + + CbPackage ResponsePackage; + + CbObjectWriter ResponseWriter(32 + Requests.size() * 64u); + ResponseWriter.BeginArray("Chunks"sv); + { + for (size_t Index = 0; Index < Requests.size(); Index++) + { + const ChunkRequest& Request = Requests[Index]; + ChunkResult& Result = Results[Index]; + if (Result.Exists) + { + ResponseWriter.BeginObject(); + { + if (Request.Id.index() == 0) + { + const IoHash& RawHash = std::get<IoHash>(Request.Id); + ResponseWriter.AddHash("Id", RawHash); + } + else + { + const Oid& Id = std::get<Oid>(Request.Id); + ResponseWriter.AddObjectId("Id", Id); + } + if (!Request.ModTag.has_value() || Request.ModTag.value() != Result.ModTag) + { + ResponseWriter.AddInteger("ModTag", Result.ModTag); + if (!Request.SkipData) + { + auto ExtractRangeResult = ExtractRange(std::move(Result.ChunkBuffer), + Request.Offset, + Request.Size, + ZenContentType::kCompressedBinary); + if (ExtractRangeResult.Error == GetChunkRangeResult::EError::Ok) + { + if (ExtractRangeResult.ContentType == ZenContentType::kCompressedBinary) + { + ZEN_ASSERT(ExtractRangeResult.RawHash != IoHash::Zero); + CompressedBuffer CompressedValue = + CompressedBuffer::FromCompressedNoValidate(std::move(ExtractRangeResult.Chunk)); + ZEN_ASSERT(CompressedValue); + + if (ExtractRangeResult.RawSize != 0) + { + // This really could use some thought so we don't send the same data if we get a request for + // multiple ranges from the same chunk block + + uint64_t FragmentRawOffset = 0; + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + uint64_t BlockSize = 0; + if (CompressedValue.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + { + if (BlockSize > 0) + { + FragmentRawOffset = (Request.Offset / BlockSize) * BlockSize; + } + else + { + FragmentRawOffset = Request.Offset; + } + uint64_t FragmentRawLength = CompressedValue.DecodeRawSize(); + + IoHashStream FragmentHashStream; + FragmentHashStream.Append(ExtractRangeResult.RawHash.Hash, + sizeof(ExtractRangeResult.RawHash.Hash)); + FragmentHashStream.Append(&FragmentRawOffset, sizeof(FragmentRawOffset)); + FragmentHashStream.Append(&FragmentRawLength, sizeof(FragmentRawLength)); + IoHash FragmentHash = FragmentHashStream.GetHash(); + + ResponseWriter.AddHash("FragmentHash", FragmentHash); + ResponseWriter.AddInteger("FragmentOffset", FragmentRawOffset); + ResponseWriter.AddInteger("RawSize", ExtractRangeResult.RawSize); + ResponsePackage.AddAttachment(CbAttachment(CompressedValue, FragmentHash)); + } + else + { + std::string ErrorString = "Failed to get compression parameters from partial compressed buffer"; + ResponseWriter.AddString("Error", ErrorString); + ZEN_WARN("oplog '{}/{}': {}", Project.Identifier, Oplog.OplogId(), ErrorString); + } + } + else + { + ResponseWriter.AddHash("RawHash"sv, ExtractRangeResult.RawHash); + ResponsePackage.AddAttachment(CbAttachment(std::move(CompressedValue), ExtractRangeResult.RawHash)); + } + } + else + { + IoHashStream HashStream; + ZEN_ASSERT(Request.Id.index() == 1); + const Oid& Id = std::get<Oid>(Request.Id); + HashStream.Append(Id.OidBits, sizeof(Id.OidBits)); + HashStream.Append(&Request.Offset, sizeof(Request.Offset)); + HashStream.Append(&Request.Size, sizeof(Request.Size)); + IoHash Hash = HashStream.GetHash(); + + ResponseWriter.AddHash("Hash"sv, Hash); + if (ExtractRangeResult.RawSize != 0) + { + ResponseWriter.AddInteger("Size", ExtractRangeResult.RawSize); + } + ResponsePackage.AddAttachment(CbAttachment(std::move(ExtractRangeResult.Chunk), Hash)); + } + } + else + { + std::string ErrorString = + fmt::format("Failed fetching chunk range ({})", ExtractRangeResult.ErrorDescription); + ResponseWriter.AddString("Error", ErrorString); + ZEN_WARN("oplog '{}/{}': {}", Project.Identifier, Oplog.OplogId(), ErrorString); + } + } + } + } + ResponseWriter.EndObject(); + } + } + } + ResponseWriter.EndArray(); + ResponsePackage.SetObject(ResponseWriter.Save()); + + return ResponsePackage; +} + +bool +ProjectStore::AreDiskWritesAllowed() const +{ + return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); +} + +void +ProjectStore::EnableUpdateCapture() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + m_ProjectsLock.WithExclusiveLock([&]() { + if (m_UpdateCaptureRefCounter == 0) + { + ZEN_ASSERT(!m_CapturedProjects); + m_CapturedProjects = std::make_unique<std::vector<std::string>>(); + } + else + { + ZEN_ASSERT(m_CapturedProjects); + } + m_UpdateCaptureRefCounter++; + }); +} + +void +ProjectStore::DisableUpdateCapture() +{ + m_ProjectsLock.WithExclusiveLock([&]() { + ZEN_ASSERT(m_CapturedProjects); + ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); + m_UpdateCaptureRefCounter--; + if (m_UpdateCaptureRefCounter == 0) + { + m_CapturedProjects.reset(); + } + }); +} + +std::vector<std::string> +ProjectStore::GetCapturedProjectsLocked() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + if (m_CapturedProjects) + { + return *m_CapturedProjects; + } + return {}; +} + +std::string +ProjectStore::GetGcName(GcCtx&) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + return fmt::format("projectstore: '{}'", m_ProjectBasePath.string()); +} + +class ProjectStoreGcStoreCompactor : public GcStoreCompactor +{ +public: + ProjectStoreGcStoreCompactor(ProjectStore& ProjectStore, + const std::filesystem::path& BasePath, + std::vector<std::filesystem::path>&& OplogPathsToRemove, + std::vector<std::filesystem::path>&& ProjectPathsToRemove) + : m_ProjectStore(ProjectStore) + , m_BasePath(BasePath) + , m_OplogPathsToRemove(std::move(OplogPathsToRemove)) + , m_ProjectPathsToRemove(std::move(ProjectPathsToRemove)) + { + } + + virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>&) override + { + ZEN_TRACE_CPU("Store::CompactStore"); + ZEN_MEMSCOPE(GetProjectstoreTag()); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: projectstore [COMPACT] '{}': RemovedDisk: {} in {}", + m_BasePath, + NiceBytes(Stats.RemovedDisk), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + size_t CompactOplogCount = 0; + if (Ctx.Settings.IsDeleteMode) + { + for (const std::filesystem::path& OplogPath : m_OplogPathsToRemove) + { + uint64_t OplogSize = ProjectStore::Oplog::TotalSize(OplogPath); + if (DeleteDirectories(OplogPath)) + { + ZEN_DEBUG("GCV2: projectstore [COMPACT] '{}': removed oplog folder '{}', removed {}", + m_BasePath, + OplogPath, + NiceBytes(OplogSize)); + Stats.RemovedDisk += OplogSize; + } + else + { + ZEN_WARN("GCV2: projectstore [COMPACT] '{}': Failed to remove oplog folder '{}'", m_BasePath, OplogPath); + } + } + + for (const std::filesystem::path& ProjectPath : m_ProjectPathsToRemove) + { + uint64_t ProjectSize = ProjectStore::Project::TotalSize(ProjectPath); + if (DeleteDirectories(ProjectPath)) + { + ZEN_DEBUG("GCV2: projectstore [COMPACT] '{}': removed project folder '{}', removed {}", + m_BasePath, + ProjectPath, + NiceBytes(ProjectSize)); + Stats.RemovedDisk += ProjectSize; + } + else + { + ZEN_WARN("GCV2: projectstore [COMPACT] '{}': Failed to remove project folder '{}'", m_BasePath, ProjectPath); + } + } + } + + for (auto ProjectIt : m_ProjectStore.m_Projects) + { + Ref<ProjectStore::Project> Project = ProjectIt.second; + std::vector<std::string> OplogsToCompact = Project->GetOplogsToCompact(); + CompactOplogCount += OplogsToCompact.size(); + for (const std::string& OplogId : OplogsToCompact) + { + Ref<ProjectStore::Oplog> OpLog; + { + RwLock::SharedLockScope __(Project->m_ProjectLock); + if (auto OpIt = Project->m_Oplogs.find(OplogId); OpIt != Project->m_Oplogs.end()) + { + OpLog = OpIt->second; + } + else + { + Stopwatch OplogTimer; + std::filesystem::path OplogBasePath = Project->BasePathForOplog(OplogId); + OpLog = new ProjectStore::Oplog( + Project->Log(), + Project->Identifier, + OplogId, + Project->m_CidStore, + OplogBasePath, + std::filesystem::path{}, + ProjectStore::Oplog::EMode::kFull); // We need it to be a full read so we can write a new index snapshot + OpLog->Read(); + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: projectstore [COMPACT] '{}': read oplog '{}/{}' at '{}' in {}", + m_BasePath, + Project->Identifier, + OplogId, + OplogBasePath, + NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); + } + } + + if (OpLog) + { + const uint64_t PreSize = OpLog->TotalSize(); + + OpLog->Compact(!Ctx.Settings.IsDeleteMode, + /*RetainLSNs*/ true, + fmt::format("GCV2: projectstore [COMPACT] '{}': ", m_BasePath)); + + const uint64_t PostSize = OpLog->TotalSize(); + + const uint64_t FreedSize = (PreSize > PostSize) ? (PreSize - PostSize) : 0; + + Stats.RemovedDisk += FreedSize; + } + } + } + } + + if (!Ctx.Settings.IsDeleteMode) + { + ZEN_DEBUG("GCV2: projectstore [COMPACT] '{}': Skipped deleting of {} oplogs and {} projects, skipped compacting {} oplogs", + m_BasePath, + m_OplogPathsToRemove.size(), + m_ProjectPathsToRemove.size(), + CompactOplogCount); + } + + m_ProjectPathsToRemove.clear(); + m_OplogPathsToRemove.clear(); + } + + virtual std::string GetGcName(GcCtx&) override { return fmt::format("projectstore: '{}'", m_BasePath.string()); } + +private: + ProjectStore& m_ProjectStore; + std::filesystem::path m_BasePath; + std::vector<std::filesystem::path> m_OplogPathsToRemove; + std::vector<std::filesystem::path> m_ProjectPathsToRemove; +}; + +GcStoreCompactor* +ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) +{ + ZEN_TRACE_CPU("Store::RemoveExpiredData"); + ZEN_MEMSCOPE(GetProjectstoreTag()); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: projectstore [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {} in {}", + m_ProjectBasePath, + Stats.CheckedCount, + Stats.FoundCount, + Stats.DeletedCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + std::vector<std::filesystem::path> OplogPathsToRemove; + std::vector<std::filesystem::path> ProjectPathsToRemove; + + std::vector<Ref<Project>> ExpiredProjects; + std::vector<Ref<Project>> Projects; + + DiscoverProjects(); + + { + RwLock::SharedLockScope Lock(m_ProjectsLock); + for (auto& Kv : m_Projects) + { + Stats.CheckedCount++; + if (Kv.second->IsExpired(Ctx.Settings.ProjectStoreExpireTime)) + { + ExpiredProjects.push_back(Kv.second); + continue; + } + Projects.push_back(Kv.second); + } + } + + size_t ExpiredOplogCount = 0; + for (const Ref<Project>& Project : Projects) + { + if (Ctx.IsCancelledFlag) + { + break; + } + + std::vector<std::string> ExpiredOplogs; + + std::vector<std::string> OpLogs = Project->ScanForOplogs(); + for (const std::string& OplogId : OpLogs) + { + Stats.CheckedCount++; + if (Project->IsExpired(Ctx.Settings.ProjectStoreExpireTime, OplogId)) + { + ExpiredOplogs.push_back(OplogId); + } + else if (!Project->IsOplogTouchedSince(GcClock::Now() - std::chrono::minutes(15), OplogId)) + { + if (Project->TryUnloadOplog(OplogId)) + { + ZEN_INFO("GCV2: projectstore [REMOVE EXPIRED] '{}': Unloaded oplog {}/{} due to inactivity", + m_ProjectBasePath, + Project->Identifier, + OplogId); + } + } + } + + std::filesystem::path ProjectPath = BasePathForProject(Project->Identifier); + ExpiredOplogCount += ExpiredOplogs.size(); + if (Ctx.Settings.IsDeleteMode) + { + for (const std::string& OplogId : ExpiredOplogs) + { + std::filesystem::path RemovePath; + if (Project->RemoveOplog(OplogId, RemovePath)) + { + if (!RemovePath.empty()) + { + OplogPathsToRemove.push_back(RemovePath); + } + Stats.DeletedCount++; + } + } + Project->Flush(); + } + } + + if (Ctx.Settings.IsDeleteMode) + { + for (const Ref<Project>& Project : ExpiredProjects) + { + std::string ProjectId = Project->Identifier; + { + { + if (!Project->IsExpired(Ctx.Settings.ProjectStoreExpireTime)) + { + ZEN_DEBUG( + "GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project no longer " + "expired.", + m_ProjectBasePath, + ProjectId); + continue; + } + } + std::filesystem::path RemovePath; + bool Success = RemoveProject(ProjectId, RemovePath); + if (!Success) + { + ZEN_DEBUG( + "GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project folder is locked.", + m_ProjectBasePath, + ProjectId); + continue; + } + if (!RemovePath.empty()) + { + ProjectPathsToRemove.push_back(RemovePath); + } + } + } + Stats.DeletedCount += ExpiredProjects.size(); + } + + size_t ExpiredProjectCount = ExpiredProjects.size(); + Stats.FoundCount += ExpiredOplogCount + ExpiredProjectCount; + return new ProjectStoreGcStoreCompactor(*this, m_ProjectBasePath, std::move(OplogPathsToRemove), std::move(ProjectPathsToRemove)); +} + +class ProjectStoreReferenceChecker : public GcReferenceChecker +{ +public: + ProjectStoreReferenceChecker(ProjectStore& InProjectStore) : m_ProjectStore(InProjectStore) { m_ProjectStore.EnableUpdateCapture(); } + + virtual ~ProjectStoreReferenceChecker() + { + try + { + m_ProjectStore.DisableUpdateCapture(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~ProjectStoreReferenceChecker threw exception: '{}'", Ex.what()); + } + } + + virtual std::string GetGcName(GcCtx&) override { return "projectstore"; } + virtual void PreCache(GcCtx&) override {} + + virtual void UpdateLockedState(GcCtx& Ctx) override + { + ZEN_TRACE_CPU("Store::UpdateLockedState"); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + Stopwatch Timer; + + std::vector<Ref<ProjectStore::Oplog>> AddedOplogs; + + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} in {} new oplogs", + "projectstore", + m_References.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + AddedOplogs.size()); + }); + + std::vector<std::string> AddedProjects = m_ProjectStore.GetCapturedProjectsLocked(); + for (const std::string& AddedProject : AddedProjects) + { + if (auto It = m_ProjectStore.m_Projects.find(AddedProject); It != m_ProjectStore.m_Projects.end()) + { + ProjectStore::Project& Project = *It->second; + for (auto& OplogPair : Project.m_Oplogs) + { + Ref<ProjectStore::Oplog> Oplog = OplogPair.second; + AddedOplogs.push_back(Oplog); + } + } + } + for (auto& ProjectPair : m_ProjectStore.m_Projects) + { + ProjectStore::Project& Project = *ProjectPair.second; + + std::vector<std::string> AddedOplogNames(Project.GetCapturedOplogsLocked()); + for (const std::string& OplogName : AddedOplogNames) + { + if (auto It = Project.m_Oplogs.find(OplogName); It != Project.m_Oplogs.end()) + { + Ref<ProjectStore::Oplog> Oplog = It->second; + AddedOplogs.push_back(Oplog); + } + } + } + + for (const Ref<ProjectStore::Oplog>& Oplog : AddedOplogs) + { + size_t BaseReferenceCount = m_References.size(); + + Stopwatch InnerTimer; + const auto __ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} from {}", + Oplog->m_BasePath, + m_References.size() - BaseReferenceCount, + NiceTimeSpanMs(InnerTimer.GetElapsedTimeMs()), + Oplog->OplogId()); + }); + + Oplog->GetAttachmentsLocked(m_References, Ctx.Settings.StoreProjectAttachmentMetaData); + if (std::vector<IoHash> PendingChunkReferences = Oplog->GetPendingChunkReferencesLocked(); !PendingChunkReferences.empty()) + { + m_References.insert(m_References.end(), PendingChunkReferences.begin(), PendingChunkReferences.end()); + } + } + + FilterReferences(Ctx, fmt::format("projectstore [LOCKSTATE] '{}'", "projectstore"), m_References); + } + + virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override + { + ZEN_TRACE_CPU("Store::GetUnusedReferences"); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + size_t InitialCount = IoCids.size(); + size_t UsedCount = InitialCount; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", + "projectstore", + UsedCount, + InitialCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_References, IoCids); + UsedCount = IoCids.size() - UnusedReferences.size(); + return UnusedReferences; + } + +private: + ProjectStore& m_ProjectStore; + std::vector<IoHash> m_References; +}; + +class ProjectStoreOplogReferenceChecker : public GcReferenceChecker +{ +public: + ProjectStoreOplogReferenceChecker(ProjectStore& InProjectStore, Ref<ProjectStore::Project> InProject, std::string_view InOplog) + : m_ProjectStore(InProjectStore) + , m_Project(InProject) + , m_OplogId(InOplog) + { + m_Project->EnableUpdateCapture(); + } + + virtual ~ProjectStoreOplogReferenceChecker() + { + try + { + m_Project->DisableUpdateCapture(); + + if (m_OplogHasUpdateCapture) + { + RwLock::SharedLockScope _(m_Project->m_ProjectLock); + if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) + { + Ref<ProjectStore::Oplog> Oplog = It->second; + Oplog->DisableUpdateCapture(); + m_OplogHasUpdateCapture = false; + } + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~ProjectStoreOplogReferenceChecker threw exception: '{}'", Ex.what()); + } + } + + virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}/{}'", m_Project->Identifier, m_OplogId); } + + virtual void PreCache(GcCtx& Ctx) override + { + ZEN_TRACE_CPU("Store::Oplog::PreCache"); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': precached {} references in {} from {}/{}", + m_OplogBasePath, + m_References.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + m_Project->Identifier, + m_OplogId); + }); + + m_OplogBasePath = m_Project->BasePathForOplog(m_OplogId); + { + Ref<ProjectStore::Oplog> Oplog; + + RwLock::SharedLockScope __(m_Project->m_ProjectLock); + if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) + { + Oplog = It->second; + Oplog->EnableUpdateCapture(); + m_OplogHasUpdateCapture = true; + } + else if (ProjectStore::Oplog::ExistsAt(m_OplogBasePath)) + { + Stopwatch OplogTimer; + Oplog = new ProjectStore::Oplog(m_Project->Log(), + m_Project->Identifier, + m_OplogId, + m_Project->m_CidStore, + m_OplogBasePath, + std::filesystem::path{}, + ProjectStore::Oplog::EMode::kBasicReadOnly); + Oplog->Read(); + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': read oplog '{}/{}' in {}", + m_OplogBasePath, + m_Project->Identifier, + m_OplogId, + NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); + } + } + else + { + return; + } + + RwLock::SharedLockScope ___(Oplog->m_OplogLock); + if (Ctx.IsCancelledFlag) + { + return; + } + + GcClock::TimePoint CompactExpireTime = GcClock::Now() - std::chrono::minutes(30); + if (!m_Project->IsOplogTouchedSince(CompactExpireTime, m_OplogId)) + { + const uint32_t CompactUnusedThreshold = 25; + if (Oplog->GetUnusedSpacePercent() >= CompactUnusedThreshold) + { + m_Project->AddOplogToCompact(m_OplogId); + } + } + + Oplog->GetAttachmentsLocked(m_References, Ctx.Settings.StoreProjectAttachmentMetaData); + m_OplogAccessTime = m_Project->LastOplogAccessTime(m_OplogId); + } + FilterReferences(Ctx, fmt::format("projectstore [PRECACHE] '{}'", m_OplogBasePath), m_References); + } + + virtual void UpdateLockedState(GcCtx& Ctx) override + { + ZEN_TRACE_CPU("Store::Oplog::UpdateLockedState"); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} from {}/{}", + m_OplogBasePath, + m_AddedReferences.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + m_Project->Identifier, + m_OplogId); + }); + + if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) + { + Ref<ProjectStore::Oplog> Oplog = It->second; + Oplog->IterateCapturedOpsLocked([&](const Oid& Key, ProjectStore::LogSequenceNumber LSN, const CbObjectView& UpdateOp) -> bool { + ZEN_UNUSED(Key, LSN); + UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_AddedReferences.emplace_back(Visitor.AsAttachment()); }); + return true; + }); + std::vector<IoHash> AddedAttachments = Oplog->GetCapturedAttachmentsLocked(); + m_AddedReferences.insert(m_AddedReferences.end(), AddedAttachments.begin(), AddedAttachments.end()); + if (std::vector<IoHash> PendingChunkReferences = Oplog->GetPendingChunkReferencesLocked(); !PendingChunkReferences.empty()) + { + m_AddedReferences.insert(m_AddedReferences.end(), PendingChunkReferences.begin(), PendingChunkReferences.end()); + } + } + else if (m_Project->LastOplogAccessTime(m_OplogId) > m_OplogAccessTime && ProjectStore::Oplog::ExistsAt(m_OplogBasePath)) + { + Stopwatch OplogTimer; + { + Ref<ProjectStore::Oplog> Oplog(new ProjectStore::Oplog(m_Project->Log(), + m_Project->Identifier, + m_OplogId, + m_Project->m_CidStore, + m_OplogBasePath, + std::filesystem::path{}, + ProjectStore::Oplog::EMode::kBasicReadOnly)); + Oplog->Read(); + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': read oplog '{}/{}' in {}", + m_OplogBasePath, + m_Project->Identifier, + m_OplogId, + NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); + } + + OplogTimer.Reset(); + + Oplog->GetAttachmentsLocked(m_AddedReferences, Ctx.Settings.StoreProjectAttachmentMetaData); + } + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': read referenced attachments from oplog '{}/{}' in {}", + m_OplogBasePath, + m_Project->Identifier, + m_OplogId, + NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); + } + } + FilterReferences(Ctx, fmt::format("projectstore [LOCKSTATE] '{}'", m_OplogBasePath), m_AddedReferences); + } + + virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override + { + ZEN_TRACE_CPU("Store::Oplog::GetUnusedReferences"); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + const size_t InitialCount = IoCids.size(); + size_t UsedCount = InitialCount; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {} from {}/{}", + m_OplogBasePath, + UsedCount, + InitialCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + m_Project->Identifier, + m_OplogId); + }); + + std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_References, IoCids); + UnusedReferences = KeepUnusedReferences(m_AddedReferences, UnusedReferences); + UsedCount = IoCids.size() - UnusedReferences.size(); + return UnusedReferences; + } + + ProjectStore& m_ProjectStore; + Ref<ProjectStore::Project> m_Project; + std::string m_OplogId; + std::filesystem::path m_OplogBasePath; + bool m_OplogHasUpdateCapture = false; + std::vector<IoHash> m_References; + std::vector<IoHash> m_AddedReferences; + GcClock::TimePoint m_OplogAccessTime; +}; + +std::vector<GcReferenceChecker*> +ProjectStore::CreateReferenceCheckers(GcCtx& Ctx) +{ + ZEN_TRACE_CPU("Store::CreateReferenceCheckers"); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + size_t ProjectCount = 0; + size_t OplogCount = 0; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: projectstore [CREATE CHECKERS] '{}': opened {} projects and {} oplogs in {}", + m_ProjectBasePath, + ProjectCount, + OplogCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + DiscoverProjects(); + + std::vector<Ref<ProjectStore::Project>> Projects; + std::vector<GcReferenceChecker*> Checkers; + Checkers.emplace_back(new ProjectStoreReferenceChecker(*this)); + { + RwLock::SharedLockScope Lock(m_ProjectsLock); + Projects.reserve(m_Projects.size()); + + for (auto& Kv : m_Projects) + { + Projects.push_back(Kv.second); + } + } + ProjectCount += Projects.size(); + try + { + for (const Ref<ProjectStore::Project>& Project : Projects) + { + std::vector<std::string> OpLogs = Project->ScanForOplogs(); + Checkers.reserve(Checkers.size() + OpLogs.size()); + for (const std::string& OpLogId : OpLogs) + { + Checkers.emplace_back(new ProjectStoreOplogReferenceChecker(*this, Project, OpLogId)); + OplogCount++; + } + } + } + catch (const std::exception&) + { + while (!Checkers.empty()) + { + delete Checkers.back(); + Checkers.pop_back(); + } + throw; + } + + return Checkers; +} + +std::vector<RwLock::SharedLockScope> +ProjectStore::LockState(GcCtx& Ctx) +{ + ZEN_TRACE_CPU("Store::LockState"); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + std::vector<RwLock::SharedLockScope> Locks; + Locks.emplace_back(RwLock::SharedLockScope(m_ProjectsLock)); + for (auto& ProjectIt : m_Projects) + { + std::vector<RwLock::SharedLockScope> ProjectLocks = ProjectIt.second->GetGcReferencerLocks(); + for (auto It = std::make_move_iterator(ProjectLocks.begin()); It != std::make_move_iterator(ProjectLocks.end()); It++) + { + Locks.emplace_back(std::move(*It)); + } + } + return Locks; +} + +class ProjectStoreOplogReferenceValidator : public GcReferenceValidator +{ +public: + ProjectStoreOplogReferenceValidator(ProjectStore& InProjectStore, std::string_view InProject, std::string_view InOplog) + : m_ProjectStore(InProjectStore) + , m_ProjectId(InProject) + , m_OplogId(InOplog) + { + } + + virtual ~ProjectStoreOplogReferenceValidator() {} + + virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}/{}'", m_ProjectId, m_OplogId); } + + virtual void Validate(GcCtx& Ctx, GcReferenceValidatorStats& Stats) override + { + ZEN_TRACE_CPU("Store::Validate"); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + ProjectStore::Oplog::ValidationResult Result; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + std::string Status = Result.IsEmpty() ? "OK" : "Missing data"; + ZEN_INFO("GCV2: projectstore [VALIDATE] '{}/{}': Validated in {}. OpCount: {}, MinLSN: {}, MaxLSN: {}, Status: {}", + m_ProjectId, + m_OplogId, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + Result.OpCount, + Result.LSNLow.Number, + Result.LSNHigh.Number, + Status); + }); + Ref<ProjectStore::Oplog> Oplog; + Ref<ProjectStore::Project> Project = m_ProjectStore.OpenProject(m_ProjectId); + if (Project) + { + { + RwLock::SharedLockScope __(Project->m_ProjectLock); + if (auto It = Project->m_Oplogs.find(m_OplogId); It != Project->m_Oplogs.end()) + { + Oplog = It->second; + } + else + { + Stopwatch OplogTimer; + + std::filesystem::path OplogBasePath = Project->BasePathForOplog(m_OplogId); + Oplog = Ref<ProjectStore::Oplog>(new ProjectStore::Oplog(Project->Log(), + Project->Identifier, + m_OplogId, + Project->m_CidStore, + OplogBasePath, + std::filesystem::path{}, + ProjectStore::Oplog::EMode::kBasicReadOnly)); + Oplog->Read(); + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: projectstore [VALIDATE] '{}': read oplog '{}/{}' in {}", + OplogBasePath, + Project->Identifier, + m_OplogId, + NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); + } + + if (Ctx.IsCancelledFlag) + { + return; + } + } + } + + if (Oplog) + { + Result = Oplog->Validate(Project->RootDir, Ctx.IsCancelledFlag, nullptr); + if (Ctx.IsCancelledFlag) + { + return; + } + Stats.CheckedCount = Result.OpCount; + Stats.MissingChunks = Result.MissingChunks.size(); + Stats.MissingFiles = Result.MissingFiles.size(); + Stats.MissingMetas = Result.MissingMetas.size(); + Stats.MissingAttachments = Result.MissingAttachments.size(); + } + + if (!Result.IsEmpty()) + { + ZEN_WARN("GCV2: projectstore [VALIDATE] '{}/{}': Missing data: Files: {}, Chunks: {}, Metas: {}, Attachments: {}", + m_ProjectId, + m_OplogId, + Result.MissingFiles.size(), + Result.MissingChunks.size(), + Result.MissingMetas.size(), + Result.MissingAttachments.size()); + } + } + } + + ProjectStore& m_ProjectStore; + std::string m_ProjectId; + std::string m_OplogId; +}; + +std::vector<GcReferenceValidator*> +ProjectStore::CreateReferenceValidators(GcCtx& Ctx) +{ + if (Ctx.Settings.SkipCidDelete) + { + return {}; + } + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + DiscoverProjects(); + + std::vector<std::pair<std::string, std::string>> Oplogs; + { + RwLock::SharedLockScope _(m_ProjectsLock); + for (auto& ProjectPair : m_Projects) + { + ProjectStore::Project& Project = *ProjectPair.second; + std::vector<std::string> OpLogs = Project.ScanForOplogs(); + for (const std::string& OplogName : OpLogs) + { + Oplogs.push_back({Project.Identifier, OplogName}); + } + } + } + std::vector<GcReferenceValidator*> Validators; + Validators.reserve(Oplogs.size()); + for (const std::pair<std::string, std::string>& Oplog : Oplogs) + { + Validators.push_back(new ProjectStoreOplogReferenceValidator(*this, Oplog.first, Oplog.second)); + } + + return Validators; +} + +////////////////////////////////////////////////////////////////////////// + +Oid +OpKeyStringAsOid(std::string_view OpKey) +{ + eastl::fixed_vector<uint8_t, 512> Buffer; + return OpKeyStringAsOid(OpKey, Buffer); +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +namespace testutils { + using namespace std::literals; + + static std::string OidAsString(const Oid& Id) + { + StringBuilder<25> OidStringBuilder; + Id.ToString(OidStringBuilder); + return OidStringBuilder.ToString(); + } + + static CbPackage CreateBulkDataOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) + { + CbPackage Package; + CbObjectWriter Object; + Object << "key"sv << OidAsString(Id); + if (!Attachments.empty()) + { + Object.BeginArray("bulkdata"); + for (const auto& Attachment : Attachments) + { + CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); + Object.BeginObject(); + Object << "id"sv << Attachment.first; + Object << "type"sv + << "Standard"sv; + Object << "data"sv << Attach; + Object.EndObject(); + + Package.AddAttachment(Attach); + } + Object.EndArray(); + } + Package.SetObject(Object.Save()); + return Package; + }; + + static CbPackage CreateFilesOplogPackage(const Oid& Id, + const std::filesystem::path ProjectRootDir, + const std::span<const std::pair<Oid, std::filesystem::path>>& Attachments) + { + CbPackage Package; + CbObjectWriter Object; + Object << "key"sv << OidAsString(Id); + if (!Attachments.empty()) + { + Object.BeginArray("files"); + for (const auto& Attachment : Attachments) + { + std::filesystem::path ServerPath = std::filesystem::relative(Attachment.second, ProjectRootDir); + std::filesystem::path ClientPath = ServerPath; // dummy + Object.BeginObject(); + Object << "id"sv << Attachment.first; + Object << "serverpath"sv << ServerPath.string(); + Object << "clientpath"sv << ClientPath.string(); + Object.EndObject(); + } + Object.EndArray(); + } + Package.SetObject(Object.Save()); + return Package; + }; + + static std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments( + const std::span<const size_t>& Sizes, + OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast, + uint64_t BlockSize = 0) + { + std::vector<std::pair<Oid, CompressedBuffer>> Result; + Result.reserve(Sizes.size()); + for (size_t Size : Sizes) + { + CompressedBuffer Compressed = + CompressedBuffer::Compress(SharedBuffer(CreateSemiRandomBlob(Size)), OodleCompressor::Mermaid, CompressionLevel, BlockSize); + Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); + } + return Result; + } + + static uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64_t RawOffset) + { + if (RawOffset > 0) + { + uint64_t BlockSize = 0; + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + { + return 0; + } + return BlockSize > 0 ? RawOffset % BlockSize : 0; + } + return 0; + } + + template<typename ChunkType> + CbObject BuildChunksRequest(bool SkipData, + std::string_view IdName, + const std::vector<ChunkType>& Chunks, + const std::vector<std::pair<size_t, size_t>>& Ranges, + const std::vector<uint64_t>& ModTags) + { + CbObjectWriter Request; + Request.BeginObject("Request"sv); + { + if (SkipData) + { + Request.AddBool("SkipData"sv, true); + } + if (!Chunks.empty()) + { + Request.BeginArray("Chunks"); + for (size_t Index = 0; Index < Chunks.size(); Index++) + { + Request.BeginObject(); + { + Request << IdName << Chunks[Index]; + if (!ModTags.empty()) + { + Request << "ModTag" << ModTags[Index]; + } + if (!Ranges.empty()) + { + Request << "Offset" << Ranges[Index].first; + Request << "Size" << Ranges[Index].second; + } + } + Request.EndObject(); + } + Request.EndArray(); + } + } + Request.EndObject(); + return Request.Save(); + }; + + CbObject BuildChunksRequest(bool SkipData, + const std::vector<Oid>& Chunks, + const std::vector<std::pair<size_t, size_t>>& Ranges, + const std::vector<uint64_t>& ModTags) + { + return BuildChunksRequest<Oid>(SkipData, "Oid", Chunks, Ranges, ModTags); + } + + CbObject BuildChunksRequest(bool SkipData, + const std::vector<IoHash>& Chunks, + const std::vector<std::pair<size_t, size_t>>& Ranges, + const std::vector<uint64_t>& ModTags) + { + return BuildChunksRequest<IoHash>(SkipData, "RawHash", Chunks, Ranges, ModTags); + } + +} // namespace testutils + +TEST_CASE("project.opkeys") +{ + using namespace std::literals; + + const std::string_view LongKey = + "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" + "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" + "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"sv; + + // Not a test per se, this code just exercises the key computation logic to ensure all + // edge cases are handled by the bug workaround logic + + for (int i = 1; i < 300; ++i) + { + CbObjectWriter Cbo; + Cbo << "key"sv << LongKey.substr(0, i); + + const Oid KeyId = ComputeOpKey(Cbo.Save()); + } + + { + CbObjectWriter Cbo; + Cbo << "key"sv + << "abcdef"; + + const Oid KeyId = ComputeOpKey(Cbo.Save()); + const Oid CorrectId = Oid::FromHexString( + "7a03540e" + "ecb0daa9" + "00f2949e"); + + CHECK(KeyId == CorrectId); + } + + { + CbObjectWriter Cbo; + Cbo << "key"sv + << "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" + "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" + "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"; + + const Oid KeyId = ComputeOpKey(Cbo.Save()); + const Oid CorrectId = Oid::FromHexString( + "c5e88c79" + "06b7fa38" + "7b0d2efd"); + + CHECK(KeyId == CorrectId); + } +} + +TEST_CASE("project.store.create") +{ + using namespace std::literals; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::string_view ProjectName("proj1"sv); + std::filesystem::path BasePath = TempDir.Path() / "projectstore"; + ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); + std::filesystem::path RootDir = TempDir.Path() / "root"; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; + std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; + std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; + + Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / ProjectName, + ProjectName, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + CHECK(ProjectStore.DeleteProject(ProjectName)); + CHECK(!Project->Exists(BasePath)); +} + +TEST_CASE("project.store.lifetimes") +{ + using namespace std::literals; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"; + ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); + std::filesystem::path RootDir = TempDir.Path() / "root"; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; + std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; + std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; + + Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog1", {}); + CHECK(Oplog); + + std::filesystem::path DeletePath; + CHECK(Project->PrepareForDelete(DeletePath)); + CHECK(!DeletePath.empty()); + CHECK(!Project->OpenOplog("oplog1", /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true)); + // Oplog is now invalid, but pointer can still be accessed since we store old oplog pointers + CHECK(Oplog->OplogCount() == 0); + // Project is still valid since we have a Ref to it + CHECK(Project->Identifier == "proj1"sv); +} + +TEST_CASE("project.store.gc") +{ + using namespace std::literals; + using namespace testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"; + ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); + std::filesystem::path RootDir = TempDir.Path() / "root"; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; + + std::filesystem::path Project1RootDir = TempDir.Path() / "game1"; + std::filesystem::path Project1FilePath = TempDir.Path() / "game1" / "game.uproject"; + { + CreateDirectories(Project1FilePath.parent_path()); + BasicFile ProjectFile; + ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); + } + std::filesystem::path Project1OplogPath = TempDir.Path() / "game1" / "saves" / "cooked" / ".projectstore"; + { + CreateDirectories(Project1OplogPath.parent_path()); + BasicFile OplogFile; + OplogFile.Open(Project1OplogPath, BasicFile::Mode::kTruncate); + } + + std::filesystem::path Project2RootDir = TempDir.Path() / "game2"; + std::filesystem::path Project2FilePath = TempDir.Path() / "game2" / "game.uproject"; + { + CreateDirectories(Project2FilePath.parent_path()); + BasicFile ProjectFile; + ProjectFile.Open(Project2FilePath, BasicFile::Mode::kTruncate); + } + std::filesystem::path Project2Oplog1Path = TempDir.Path() / "game1" / "saves" / "cooked" / ".projectstore"; + { + CreateDirectories(Project2Oplog1Path.parent_path()); + BasicFile OplogFile; + OplogFile.Open(Project2Oplog1Path, BasicFile::Mode::kTruncate); + } + std::filesystem::path Project2Oplog2Path = TempDir.Path() / "game2" / "saves" / "cooked" / ".projectstore"; + { + CreateDirectories(Project2Oplog2Path.parent_path()); + BasicFile OplogFile; + OplogFile.Open(Project2Oplog2Path, BasicFile::Mode::kTruncate); + } + + { + Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + Project1RootDir.string(), + Project1FilePath.string())); + Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1", Project1OplogPath); + CHECK(Oplog); + + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}))); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122}))); + } + + { + Ref<ProjectStore::Project> Project2(ProjectStore.NewProject(BasePath / "proj2"sv, + "proj2"sv, + RootDir.string(), + EngineRootDir.string(), + Project2RootDir.string(), + Project2FilePath.string())); + { + Ref<ProjectStore::Oplog> Oplog = Project2->NewOplog("oplog2", Project2Oplog1Path); + CHECK(Oplog); + + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{177}))); + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9123, 383, 590, 96}))); + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{535, 221}))); + } + { + Ref<ProjectStore::Oplog> Oplog = Project2->NewOplog("oplog3", Project2Oplog2Path); + CHECK(Oplog); + + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{137}))); + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9723, 683, 594, 98}))); + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{531, 271}))); + } + } + + { + GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), + .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24), + .IsDeleteMode = true}; + GcResult Result = Gc.CollectGarbage(Settings); + CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); + CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); + CHECK_EQ(21u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); + CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); + CHECK(ProjectStore.OpenProject("proj1"sv)); + CHECK(ProjectStore.OpenProject("proj2"sv)); + } + + { + GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24), + .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24), + .IsDeleteMode = true}; + GcResult Result = Gc.CollectGarbage(Settings); + CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); + CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); + CHECK_EQ(21u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); + CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); + CHECK(ProjectStore.OpenProject("proj1"sv)); + CHECK(ProjectStore.OpenProject("proj2"sv)); + } + + RemoveFile(Project1FilePath); + + { + GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), + .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24), + .IsDeleteMode = true}; + GcResult Result = Gc.CollectGarbage(Settings); + CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); + CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); + CHECK_EQ(21u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); + CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); + CHECK(ProjectStore.OpenProject("proj1"sv)); + CHECK(ProjectStore.OpenProject("proj2"sv)); + } + + { + GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24), + .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24), + .CollectSmallObjects = true, + .IsDeleteMode = true}; + GcResult Result = Gc.CollectGarbage(Settings); + CHECK_EQ(4u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); + CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); + CHECK_EQ(21u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); + CHECK_EQ(7u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); + CHECK(!ProjectStore.OpenProject("proj1"sv)); + CHECK(ProjectStore.OpenProject("proj2"sv)); + } + + RemoveFile(Project2Oplog1Path); + { + GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), + .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24), + .CollectSmallObjects = true, + .IsDeleteMode = true}; + GcResult Result = Gc.CollectGarbage(Settings); + CHECK_EQ(3u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); + CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); + CHECK_EQ(14u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); + CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); + CHECK(!ProjectStore.OpenProject("proj1"sv)); + CHECK(ProjectStore.OpenProject("proj2"sv)); + } + + { + GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24), + .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24), + .CollectSmallObjects = true, + .IsDeleteMode = true}; + GcResult Result = Gc.CollectGarbage(Settings); + CHECK_EQ(3u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); + CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); + CHECK_EQ(14u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); + CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); + CHECK(!ProjectStore.OpenProject("proj1"sv)); + CHECK(ProjectStore.OpenProject("proj2"sv)); + } + + RemoveFile(Project2FilePath); + { + GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24), + .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24), + .CollectSmallObjects = true, + .IsDeleteMode = true}; + GcResult Result = Gc.CollectGarbage(Settings); + CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); + CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); + CHECK_EQ(14u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); + CHECK_EQ(14u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); + CHECK(!ProjectStore.OpenProject("proj1"sv)); + CHECK(!ProjectStore.OpenProject("proj2"sv)); + } +} + +TEST_CASE("project.store.gc.prep") +{ + using namespace std::literals; + using namespace testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"; + ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); + std::filesystem::path RootDir = TempDir.Path() / "root"; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; + + std::filesystem::path Project1RootDir = TempDir.Path() / "game1"; + std::filesystem::path Project1FilePath = TempDir.Path() / "game1" / "game.uproject"; + { + CreateDirectories(Project1FilePath.parent_path()); + BasicFile ProjectFile; + ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); + } + std::filesystem::path Project1OplogPath = TempDir.Path() / "game1" / "saves" / "cooked" / ".projectstore"; + { + CreateDirectories(Project1OplogPath.parent_path()); + BasicFile OplogFile; + OplogFile.Open(Project1OplogPath, BasicFile::Mode::kTruncate); + } + + std::vector<std::pair<Oid, CompressedBuffer>> OpAttachments = CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}); + std::vector<IoHash> OpChunkHashes; + for (const auto& Chunk : OpAttachments) + { + OpChunkHashes.push_back(Chunk.second.DecodeRawHash()); + } + + { + Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + Project1RootDir.string(), + Project1FilePath.string())); + Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); + } + { + Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); + Project1->DeleteOplog("oplog1"sv); + } + + // Equivalent of a `prep` existance check call + for (auto Attachment : OpAttachments) + { + CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + } + + { + GcSettings Settings = {.CacheExpireTime = GcClock::Now(), + .ProjectStoreExpireTime = GcClock::Now(), + .CollectSmallObjects = true, + .IsDeleteMode = true}; + GcResult Result = Gc.CollectGarbage(Settings); + } + + // If a gc comes in between our prep and op write the chunks will be removed + for (auto Attachment : OpAttachments) + { + CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + } + + { + // Make sure the chunks are stored but not the referencing op + Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); + Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); + Project1->DeleteOplog("oplog1"sv); + } + { + Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); + Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); + + // Equivalent of a `prep` call with tracking of ops + CHECK(Oplog->CheckPendingChunkReferences(OpChunkHashes, std::chrono::hours(1)).empty()); + } + + for (auto Attachment : OpAttachments) + { + CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + } + + { + GcSettings Settings = {.CacheExpireTime = GcClock::Now(), + .ProjectStoreExpireTime = GcClock::Now(), + .CollectSmallObjects = true, + .IsDeleteMode = true}; + GcResult Result = Gc.CollectGarbage(Settings); + } + + // Attachments should now be retained + for (auto Attachment : OpAttachments) + { + CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + } + + { + GcSettings Settings = {.CacheExpireTime = GcClock::Now(), + .ProjectStoreExpireTime = GcClock::Now(), + .CollectSmallObjects = true, + .IsDeleteMode = true}; + GcResult Result = Gc.CollectGarbage(Settings); + } + + // Attachments should now be retained across multiple GCs if retain time is still valud + for (auto Attachment : OpAttachments) + { + CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + } + + { + Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); + Ref<ProjectStore::Oplog> Oplog = Project1->OpenOplog("oplog1"sv, true, true); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); + Oplog->RemovePendingChunkReferences(OpChunkHashes); + CHECK(Oplog->GetPendingChunkReferencesLocked().size() == 0); + } + for (auto Attachment : OpAttachments) + { + CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + } + { + Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); + Project1->DeleteOplog("oplog1"sv); + } + + { + GcSettings Settings = {.CacheExpireTime = GcClock::Now(), + .ProjectStoreExpireTime = GcClock::Now(), + .CollectSmallObjects = true, + .IsDeleteMode = true}; + GcResult Result = Gc.CollectGarbage(Settings); + } + + for (auto Attachment : OpAttachments) + { + CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + } + + { + Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); + Project1->DeleteOplog("oplog1"sv); + } + { + // Make sure the chunks are stored but not the referencing op + Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); + Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); + Project1->DeleteOplog("oplog1"sv); + } + + // Caution - putting breakpoints and stepping through this part of the test likely makes it fails due to expiry time of pending + // chunks + { + Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); + Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); + + CHECK(Oplog->CheckPendingChunkReferences(OpChunkHashes, std::chrono::milliseconds(100)).empty()); + } + + // This pass they should be retained and while the ops are picked up in GC we are blocked from adding our op + { + GcSettings Settings = {.CacheExpireTime = GcClock::Now(), + .ProjectStoreExpireTime = GcClock::Now(), + .CollectSmallObjects = true, + .IsDeleteMode = true}; + GcResult Result = Gc.CollectGarbage(Settings); + } + for (auto Attachment : OpAttachments) + { + CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + } + + Sleep(200); + // This pass they should also be retained since our age retention has kept them alive and they will now be picked up and the + // retention cleared + { + GcSettings Settings = {.CacheExpireTime = GcClock::Now(), + .ProjectStoreExpireTime = GcClock::Now(), + .CollectSmallObjects = true, + .IsDeleteMode = true}; + GcResult Result = Gc.CollectGarbage(Settings); + } + for (auto Attachment : OpAttachments) + { + CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + } + + // This pass the retention time has expired and the last GC pass cleared the entries + { + GcSettings Settings = {.CacheExpireTime = GcClock::Now(), + .ProjectStoreExpireTime = GcClock::Now(), + .CollectSmallObjects = true, + .IsDeleteMode = true}; + GcResult Result = Gc.CollectGarbage(Settings); + } + + for (auto Attachment : OpAttachments) + { + CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + } +} + +TEST_CASE("project.store.rpc.getchunks") +{ + using namespace std::literals; + using namespace testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; + ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); + std::filesystem::path RootDir = TempDir.Path() / "root"sv; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; + + std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv; + std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv; + { + CreateDirectories(Project1FilePath.parent_path()); + BasicFile ProjectFile; + ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); + } + + std::vector<Oid> OpIds; + OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()}); + std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; + Oid FilesOpId = Oid::NewOid(); + std::vector<std::pair<Oid, std::filesystem::path>> FilesOpIdAttachments; + { + Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + Project1RootDir.string(), + Project1FilePath.string())); + Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, {}); + CHECK(Oplog); + Attachments[OpIds[0]] = {}; + Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77}); + Attachments[OpIds[2]] = + CreateAttachments(std::initializer_list<size_t>{200 * 1024, 314 * 1024, 690, 99}, OodleCompressionLevel::VeryFast, 128 * 1024); + Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122}); + for (auto It : Attachments) + { + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(It.first, It.second)); + } + + std::filesystem::path UncompressedFilePath = RootDir / "content" / "uncompressed_file"; + CreateDirectories(UncompressedFilePath.parent_path()); + IoBuffer FileBlob = CreateRandomBlob(81823 * 2); + WriteFile(UncompressedFilePath, FileBlob); + FilesOpIdAttachments.push_back({Oid::NewOid(), UncompressedFilePath}); + Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(FilesOpId, RootDir, FilesOpIdAttachments)); + } + + auto GetChunks = [](zen::ProjectStore& ProjectStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, const CbObject& Cb) { + std::vector<ProjectStore::ChunkRequest> Requests = ProjectStore.ParseChunksRequests(Project, Oplog, Cb); + std::vector<ProjectStore::ChunkResult> Results = + Requests.empty() ? std::vector<ProjectStore::ChunkResult>{} : ProjectStore.GetChunks(Project, Oplog, Requests); + return ProjectStore.WriteChunksRequestResponse(Project, Oplog, std::move(Requests), std::move(Results)); + }; + + Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); + CHECK(Project1); + Ref<ProjectStore::Oplog> Oplog1 = Project1->OpenOplog("oplog1"sv, false, true); + CHECK(Oplog1); + // Invalid request + { + CbObjectWriter Request; + Request.BeginObject("WrongName"sv); + Request.EndObject(); + CbPackage Response; + CHECK_THROWS(GetChunks(ProjectStore, *Project1, *Oplog1, Request.Save())); + } + + // Empty request + { + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, std::vector<IoHash>{}, {}, {})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(0, Chunks.Num()); + } + // Single non-existing chunk by RawHash + IoHash NotFoundIoHash = IoHash::Max; + { + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundIoHash}, {}, {})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(0, Chunks.Num()); + } + { + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundIoHash}, {}, {})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(0, Chunks.Num()); + } + // Single non-existing chunk by Id + Oid NotFoundId = Oid::NewOid(); + { + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundId}, {}, {})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(0, Chunks.Num()); + } + { + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundId}, {}, {})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(0, Chunks.Num()); + } + // Single existing chunk by RawHash + { + // Fresh fetch + IoHash FirstAttachmentHash = Attachments[OpIds[2]][1].second.DecodeRawHash(); + uint64_t ResponseModTag = 0; + { + const CbPackage& Response = GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {})); + + CHECK_EQ(1, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + IoHash Id = Chunk["Id"].AsHash(); + CHECK_EQ(FirstAttachmentHash, Id); + ResponseModTag = Chunk["ModTag"].AsUInt64(); + CHECK_NE(0, ResponseModTag); + IoHash AttachmentHash = Chunk["RawHash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompressedBuffer Buffer = Attachment->AsCompressedBinary(); + CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Fetch with matching ModTag + { + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {ResponseModTag})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + IoHash Id = Chunk["Id"].AsHash(); + CHECK_EQ(FirstAttachmentHash, Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Fetch with mismatching ModTag + { + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)})); + + CHECK_EQ(1, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + IoHash Id = Chunk["Id"].AsHash(); + CHECK_EQ(FirstAttachmentHash, Id); + ResponseModTag = Chunk["ModTag"].AsUInt64(); + CHECK_NE(0, ResponseModTag); + IoHash AttachmentHash = Chunk["RawHash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompressedBuffer Buffer = Attachment->AsCompressedBinary(); + CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Fresh modtime query + { + const CbPackage& Response = GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + IoHash Id = Chunk["Id"].AsHash(); + CHECK_EQ(FirstAttachmentHash, Id); + uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); + CHECK_EQ(ResponseModTag, ResponseModTag2); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Modtime query with matching ModTag + { + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {ResponseModTag})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + IoHash Id = Chunk["Id"].AsHash(); + CHECK_EQ(FirstAttachmentHash, Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Modtime query with mismatching ModTag + { + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + IoHash Id = Chunk["Id"].AsHash(); + CHECK_EQ(FirstAttachmentHash, Id); + uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); + CHECK_EQ(ResponseModTag, ResponseModTag2); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + // Single existing CID chunk by Id + { + Oid FirstAttachmentId = Attachments[OpIds[2]][1].first; + uint64_t ResponseModTag = 0; + { + // Full chunk request + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {})); + CHECK_EQ(1, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + ResponseModTag = Chunk["ModTag"].AsUInt64(); + CHECK_NE(0, ResponseModTag); + IoHash AttachmentHash = Chunk["RawHash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompressedBuffer Buffer = Attachment->AsCompressedBinary(); + CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + { + // Partial chunk request + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{130 * 1024, 8100}}, {})); + CHECK_EQ(1, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + ResponseModTag = Chunk["ModTag"].AsUInt64(); + CHECK_NE(0, ResponseModTag); + IoHash AttachmentHash = Chunk["FragmentHash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + std::uint64_t FragmentStart = Chunk["FragmentOffset"].AsUInt64(); + CompressedBuffer Buffer = Attachment->AsCompressedBinary(); + CHECK(FragmentStart <= 130 * 1024); + CHECK(FragmentStart + Buffer.DecodeRawSize() >= 130 * 1024 + 8100); + auto ResponseDecompressedBuffer = Buffer.Decompress(130 * 1024 - FragmentStart, 8100); + auto ExpectedDecompressedBuffer = Attachments[OpIds[2]][1].second.Decompress(130 * 1024, 8100); + CHECK(ResponseDecompressedBuffer.AsIoBuffer().GetView().EqualBytes(ExpectedDecompressedBuffer.AsIoBuffer().GetView())); + CHECK_EQ(Chunk["RawSize"sv].AsUInt64(), Attachments[OpIds[2]][1].second.DecodeRawSize()); + CHECK(!Chunk.FindView("Size")); + } + { + // Fetch with matching ModTag + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + { + // Fetch with mismatching ModTag + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)})); + CHECK_EQ(1, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + uint64_t ResponseModTag3 = Chunk["ModTag"].AsUInt64(); + CHECK_EQ(ResponseModTag, ResponseModTag3); + IoHash AttachmentHash = Chunk["RawHash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompressedBuffer Buffer = Attachment->AsCompressedBinary(); + CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Fresh modtime query + { + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); + CHECK_EQ(ResponseModTag, ResponseModTag2); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Modtime query with matching ModTag + { + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Modtime query with mismatching ModTag + { + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); + CHECK_EQ(ResponseModTag, ResponseModTag2); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + + // Single existing file chunk by Id + { + Oid FirstAttachmentId = FilesOpIdAttachments[0].first; + uint64_t ResponseModTag = 0; + { + // Full chunk request + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {})); + CHECK_EQ(1, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + ResponseModTag = Chunk["ModTag"].AsUInt64(); + CHECK_NE(0, ResponseModTag); + IoHash AttachmentHash = Chunk["Hash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompositeBuffer Buffer = Attachment->AsCompositeBinary(); + CHECK_EQ(IoHash::HashBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten()), IoHash::HashBuffer(Buffer)); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + { + // Partial chunk request + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{81823, 5434}}, {})); + CHECK_EQ(1, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + ResponseModTag = Chunk["ModTag"].AsUInt64(); + CHECK_NE(0, ResponseModTag); + + IoHash AttachmentHash = Chunk["Hash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompositeBuffer Buffer = Attachment->AsCompositeBinary(); + CHECK_EQ(IoHash::HashBuffer(IoBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten(), 81823, 5434)), + IoHash::HashBuffer(Buffer)); + CHECK_EQ(Chunk["Size"sv].AsUInt64(), FileSizeFromPath(FilesOpIdAttachments[0].second)); + CHECK(!Chunk.FindView("RawSize")); + } + { + // Fetch with matching ModTag + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("Hash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + { + // Fetch with mismatching ModTag + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)})); + CHECK_EQ(1, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + uint64_t ResponseModTag3 = Chunk["ModTag"].AsUInt64(); + CHECK_EQ(ResponseModTag, ResponseModTag3); + IoHash AttachmentHash = Chunk["Hash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompositeBuffer Buffer = Attachment->AsCompositeBinary(); + CHECK_EQ(IoHash::HashBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten()), IoHash::HashBuffer(Buffer)); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Fresh modtime query + { + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); + CHECK_EQ(ResponseModTag, ResponseModTag2); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Modtime query with matching ModTag + { + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("Hash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Modtime query with mismatching ModTag + { + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)})); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); + CHECK_EQ(ResponseModTag, ResponseModTag2); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + + // Multi RawHash Request + { + std::vector<CompressedBuffer> AttachmentBuffers{Attachments[OpIds[1]][0].second, + Attachments[OpIds[2]][0].second, + Attachments[OpIds[2]][1].second}; + std::vector<IoHash> AttachmentHashes{AttachmentBuffers[0].DecodeRawHash(), + AttachmentBuffers[1].DecodeRawHash(), + AttachmentBuffers[2].DecodeRawHash()}; + std::vector<uint64_t> ResponseModTags(3, 0); + { + // Fresh fetch + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, {})); + + CHECK_EQ(3, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + IoHash Id = Chunk["Id"].AsHash(); + + auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); + CHECK(It != AttachmentHashes.end()); + ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); + CHECK_EQ(AttachmentHashes[Index], Id); + ResponseModTags[Index] = Chunk["ModTag"].AsUInt64(); + CHECK_NE(0, ResponseModTags[Index]); + IoHash AttachmentHash = Chunk["RawHash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompressedBuffer Buffer = Attachment->AsCompressedBinary(); + CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); + CHECK(AttachmentBuffers[Index].GetCompressed().Flatten().GetView().EqualBytes(Buffer.GetCompressed().Flatten().GetView())); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + { + // Fetch with matching ModTag + const CbPackage& Response = GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, ResponseModTags)); + + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + IoHash Id = Chunk["Id"].AsHash(); + + auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); + CHECK(It != AttachmentHashes.end()); + ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); + CHECK_EQ(AttachmentHashes[Index], Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + { + // Fresh modtime query + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, {})); + + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + IoHash Id = Chunk["Id"].AsHash(); + + auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); + CHECK(It != AttachmentHashes.end()); + ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); + CHECK_EQ(AttachmentHashes[Index], Id); + CHECK_EQ(ResponseModTags[Index], Chunk["ModTag"].AsUInt64()); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + { + // Modtime query with matching ModTags + const CbPackage& Response = GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, ResponseModTags)); + + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + IoHash Id = Chunk["Id"].AsHash(); + + auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); + CHECK(It != AttachmentHashes.end()); + ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); + CHECK_EQ(AttachmentHashes[Index], Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + { + // Modtime query with mismatching ModTags + std::vector<uint64_t> MismatchingModTags(ResponseModTags); + for (uint64_t& Tag : MismatchingModTags) + { + Tag++; + } + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, MismatchingModTags)); + + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + IoHash Id = Chunk["Id"].AsHash(); + + auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); + CHECK(It != AttachmentHashes.end()); + ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); + CHECK_EQ(AttachmentHashes[Index], Id); + CHECK(Chunk["ModTag"].AsUInt64() == ResponseModTags[Index]); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + } + // Multi Id Request + { + std::vector<CompressedBuffer> AttachmentBuffers{Attachments[OpIds[1]][0].second, + Attachments[OpIds[2]][0].second, + Attachments[OpIds[2]][1].second}; + std::vector<IoHash> AttachmentHashes{AttachmentBuffers[0].DecodeRawHash(), + AttachmentBuffers[1].DecodeRawHash(), + AttachmentBuffers[2].DecodeRawHash()}; + std::vector<Oid> AttachedIds{Attachments[OpIds[1]][0].first, Attachments[OpIds[2]][0].first, Attachments[OpIds[2]][1].first}; + std::vector<uint64_t> ResponseModTags(3, 0); + { + // Fresh fetch + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, {})); + + CHECK_EQ(3, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + Oid Id = Chunk["Id"].AsObjectId(); + + auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); + CHECK(It != AttachedIds.end()); + ptrdiff_t Index = std::distance(AttachedIds.begin(), It); + CHECK_EQ(AttachedIds[Index], Id); + ResponseModTags[Index] = Chunk["ModTag"].AsUInt64(); + CHECK_NE(0, ResponseModTags[Index]); + IoHash AttachmentHash = Chunk["RawHash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompressedBuffer Buffer = Attachment->AsCompressedBinary(); + CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); + CHECK(AttachmentBuffers[Index].GetCompressed().Flatten().GetView().EqualBytes(Buffer.GetCompressed().Flatten().GetView())); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + { + // Fetch with matching ModTag + const CbPackage& Response = GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, ResponseModTags)); + + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + Oid Id = Chunk["Id"].AsObjectId(); + + auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); + CHECK(It != AttachedIds.end()); + ptrdiff_t Index = std::distance(AttachedIds.begin(), It); + CHECK_EQ(AttachedIds[Index], Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + { + // Fresh modtime query + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, {})); + + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + Oid Id = Chunk["Id"].AsObjectId(); + + auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); + CHECK(It != AttachedIds.end()); + ptrdiff_t Index = std::distance(AttachedIds.begin(), It); + CHECK_EQ(AttachedIds[Index], Id); + CHECK_EQ(ResponseModTags[Index], Chunk["ModTag"].AsUInt64()); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + { + // Modtime query with matching ModTags + const CbPackage& Response = GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, ResponseModTags)); + + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + Oid Id = Chunk["Id"].AsObjectId(); + + auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); + CHECK(It != AttachedIds.end()); + ptrdiff_t Index = std::distance(AttachedIds.begin(), It); + CHECK_EQ(AttachedIds[Index], Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + { + // Modtime query with mismatching ModTags + std::vector<uint64_t> MismatchingModTags(ResponseModTags); + for (uint64_t& Tag : MismatchingModTags) + { + Tag++; + } + const CbPackage& Response = GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, MismatchingModTags)); + + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + Oid Id = Chunk["Id"].AsObjectId(); + + auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); + CHECK(It != AttachedIds.end()); + ptrdiff_t Index = std::distance(AttachedIds.begin(), It); + CHECK_EQ(AttachedIds[Index], Id); + CHECK(Chunk["ModTag"].AsUInt64() == ResponseModTags[Index]); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + } +} + +TEST_CASE("project.store.partial.read") +{ + using namespace std::literals; + using namespace testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; + ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); + std::filesystem::path RootDir = TempDir.Path() / "root"sv; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; + + std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv; + std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv; + { + CreateDirectories(Project1FilePath.parent_path()); + BasicFile ProjectFile; + ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); + } + + std::vector<Oid> OpIds; + OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()}); + std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; + { + Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + Project1RootDir.string(), + Project1FilePath.string())); + Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, {}); + CHECK(Oplog); + Attachments[OpIds[0]] = {}; + Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77}); + Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{7123, 9583, 690, 99}); + Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122}); + for (auto It : Attachments) + { + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(It.first, It.second)); + } + } + Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); + CHECK(Project1); + Ref<ProjectStore::Oplog> Oplog1 = Project1->OpenOplog("oplog1"sv, false, true); + CHECK(Oplog1); + { + uint64_t ModificationTag = 0; + + auto Result = ProjectStore.GetChunkRange(Log(), + *Project1, + *Oplog1, + Attachments[OpIds[1]][0].first, + 0, + ~0ull, + ZenContentType::kCompressedBinary, + &ModificationTag); + + CHECK_EQ(ProjectStore::GetChunkRangeResult::EError::Ok, Result.Error); + + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Attachment = CompressedBuffer::FromCompressed(Result.Chunk, RawHash, RawSize); + CHECK(RawSize == Attachments[OpIds[1]][0].second.DecodeRawSize()); + + auto Result2 = ProjectStore.GetChunkRange(Log(), + *Project1, + *Oplog1, + Attachments[OpIds[1]][0].first, + 0, + ~0ull, + ZenContentType::kCompressedBinary, + &ModificationTag); + CHECK_EQ(ProjectStore::GetChunkRangeResult::EError::NotModified, Result2.Error); + } + + { + uint64_t FullChunkModificationTag = 0; + { + auto Result = ProjectStore.GetChunkRange(Log(), + *Project1, + *Oplog1, + Attachments[OpIds[2]][1].first, + 0, + ~0ull, + ZenContentType::kCompressedBinary, + &FullChunkModificationTag); + CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::Ok); + CHECK(Result.Chunk); + CHECK(CompressedBuffer::FromCompressedNoValidate(std::move(Result.Chunk)).DecodeRawSize() == + Attachments[OpIds[2]][1].second.DecodeRawSize()); + } + { + auto Result = ProjectStore.GetChunkRange(Log(), + *Project1, + *Oplog1, + Attachments[OpIds[2]][1].first, + 0, + ~0ull, + ZenContentType::kCompressedBinary, + &FullChunkModificationTag); + CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::NotModified); + } + } + { + uint64_t PartialChunkModificationTag = 0; + { + auto Result = ProjectStore.GetChunkRange(Log(), + *Project1, + *Oplog1, + Attachments[OpIds[2]][1].first, + 5, + 1773, + ZenContentType::kCompressedBinary, + &PartialChunkModificationTag); + CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::Ok); + + IoHash PartialRawHash; + uint64_t PartialRawSize; + CompressedBuffer PartialCompressedResult = CompressedBuffer::FromCompressed(Result.Chunk, PartialRawHash, PartialRawSize); + CHECK(PartialRawSize >= 1773); + + uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5); + SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed); + SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress(); + const uint8_t* FullDataPtr = &(reinterpret_cast<const uint8_t*>(FullDecompressed.GetView().GetData())[5]); + const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData()); + CHECK(FullDataPtr[0] == PartialDataPtr[0]); + } + + { + auto Result = ProjectStore.GetChunkRange(Log(), + *Project1, + *Oplog1, + Attachments[OpIds[2]][1].first, + 0, + 1773, + ZenContentType::kCompressedBinary, + &PartialChunkModificationTag); + CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::NotModified); + } + } +} + +TEST_CASE("project.store.block") +{ + using namespace std::literals; + using namespace testutils; + + std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, 2257, 3685, 3489, + 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251, + 491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225}); + + std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes); + std::vector<std::pair<IoHash, FetchChunkFunc>> Chunks; + Chunks.reserve(AttachmentSizes.size()); + for (const auto& It : AttachmentsWithId) + { + Chunks.push_back( + std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { + return {Buffer.DecodeRawSize(), Buffer}; + })); + } + ChunkBlockDescription Block; + CompressedBuffer BlockBuffer = GenerateChunkBlock(std::move(Chunks), Block); + uint64_t HeaderSize; + CHECK(IterateChunkBlock( + BlockBuffer.Decompress(), + [](CompressedBuffer&&, const IoHash&) {}, + HeaderSize)); +} + +TEST_CASE("project.store.iterateoplog") +{ + using namespace std::literals; + using namespace testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; + ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); + std::filesystem::path RootDir = TempDir.Path() / "root"sv; + std::filesystem::path EngineRootDir = TempDir.Path() / "enginesv"; + + std::filesystem::path ProjectRootDir = TempDir.Path() / "game"sv; + std::filesystem::path ProjectFilePath = TempDir.Path() / "game"sv / "game.uproject"sv; + { + CreateDirectories(ProjectFilePath.parent_path()); + BasicFile ProjectFile; + ProjectFile.Open(ProjectFilePath, BasicFile::Mode::kTruncate); + } + std::filesystem::path ProjectOplogPath = TempDir.Path() / "game"sv / "saves"sv / "cooked"sv / ".projectstore"sv; + { + CreateDirectories(ProjectOplogPath.parent_path()); + BasicFile OplogFile; + OplogFile.Open(ProjectOplogPath, BasicFile::Mode::kTruncate); + } + + Ref<ProjectStore::Project> TestProject(ProjectStore.NewProject(BasePath / "proj"sv, + "proj"sv, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + Ref<ProjectStore::Oplog> Oplog = TestProject->NewOplog("oplog"sv, ProjectOplogPath); + CHECK(Oplog); + + struct TestOidData + { + Oid KeyAsOidNotOplogId = Oid::NewOid(); + std::string Key = KeyAsOidNotOplogId.ToString(); + bool bFound = false; + }; + constexpr int NumTestOids = 4; + TestOidData TestOids[NumTestOids]; + for (const TestOidData& TestOid : TestOids) + { + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(TestOid.KeyAsOidNotOplogId, {})); + } + int Count = 0; + + auto ResetTest = [&Count, &TestOids]() { + Count = 0; + for (TestOidData& TestOid : TestOids) + { + TestOid.bFound = false; + } + }; + auto IncrementCount = [&Count](CbObjectView /* Op */) { ++Count; }; + auto MarkFound = [&TestOids, &Count](ProjectStore::LogSequenceNumber /* LSN */, const Oid& /* InId */, CbObjectView Op) { + for (TestOidData& TestOid : TestOids) + { + if (Op["key"sv].AsString() == TestOid.Key) + { + TestOid.bFound = true; + ++Count; + } + } + }; + + // Tests of IterateOpLog and IterateOplogWithKey, with various Paging arguments + { + ResetTest(); + Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{}); + CHECK(Count == NumTestOids); + ResetTest(); + Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{}); + CHECK(Count == NumTestOids); + ResetTest(); + Oplog->IterateOplogWithKey(MarkFound); + CHECK(Count == NumTestOids); + + Count = 0; + for (int Start = 0; Start < NumTestOids; ++Start) + { + for (int Size = 0; Size < NumTestOids - Start; ++Size) + { + ResetTest(); + Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{Start, Size}); + CHECK(Count == Size); + ResetTest(); + Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{Start, Size}); + CHECK(Count == Size); + } + + // Out of range Size arguments + ResetTest(); + Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{Start, -1}); + CHECK(Count == NumTestOids - Start); + ResetTest(); + Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{Start, NumTestOids * 2}); + CHECK(Count == NumTestOids - Start); + } + + // Out of range Start arguments + for (int Size = 0; Size < NumTestOids; ++Size) + { + ResetTest(); + Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{-1, Size}); + CHECK(Count == Size); + ResetTest(); + Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{-1, Size}); + CHECK(Count == Size); + + ResetTest(); + Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{NumTestOids, Size}); + CHECK(Count == 0); + ResetTest(); + Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{NumTestOids, Size}); + CHECK(Count == 0); + } + // Out of range Start and Size arguments + ResetTest(); + Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{-1, -1}); + CHECK(Count == NumTestOids); + ResetTest(); + Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{-1, -1}); + CHECK(Count == NumTestOids); + + ResetTest(); + Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{-1, 2 * NumTestOids}); + CHECK(Count == NumTestOids); + ResetTest(); + Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{-1, 2 * NumTestOids}); + CHECK(Count == NumTestOids); + + ResetTest(); + Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{NumTestOids, -1}); + CHECK(Count == 0); + ResetTest(); + Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{NumTestOids, -1}); + CHECK(Count == 0); + + ResetTest(); + Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{NumTestOids, 2 * NumTestOids}); + CHECK(Count == 0); + ResetTest(); + Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{NumTestOids, 2 * NumTestOids}); + CHECK(Count == 0); + } +} + +#endif + +void +prj_forcelink() +{ +} + +} // namespace zen |