// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "referencemetadata.h" ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include # include # include #endif // ZEN_WITH_TESTS namespace zen { const FLLMTag& GetProjectstoreTag() { static FLLMTag _("store", FLLMTag("project")); return _; } namespace { bool PrepareDirectoryDelete(const std::filesystem::path& Dir, std::filesystem::path& OutDeleteDir) { std::filesystem::path DroppedBucketPath; do { if (!IsDir(Dir)) { return true; } StringBuilder<64> MovedId; Oid::NewOid().ToString(MovedId); std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), MovedId); DroppedBucketPath = Dir.parent_path() / DroppedName; if (IsDir(DroppedBucketPath)) { if (!DeleteDirectories(DroppedBucketPath)) { ZEN_INFO("Drop directory '{}' for '{}' already exists but could not be removed, attempting different name.", DroppedBucketPath, Dir); continue; } if (IsDir(DroppedBucketPath)) { ZEN_INFO("Drop directory '{}' for '{}' still exists after remove, attempting different name.", DroppedBucketPath, Dir); continue; } } int RenameAttempt = 0; do { std::error_code Ec; RenameDirectory(Dir, DroppedBucketPath, Ec); if (!Ec) { OutDeleteDir = DroppedBucketPath; return true; } if (IsDir(DroppedBucketPath)) { ZEN_INFO("Can't rename '{}' to still existing drop directory '{}'. Reason: '{}'. Attempting different name.", Dir, DroppedBucketPath, Ec.message()); break; } if (++RenameAttempt == 10) { ZEN_INFO("Can't rename '{}' to drop directory '{}' after {} attempts. Reason: {}.", Dir, DroppedBucketPath, RenameAttempt, Ec.message()); return false; } ZEN_INFO("Can't rename '{}' to drop directory '{}', pausing and retrying. Reason: {}.", Dir, DroppedBucketPath, Ec.message()); Sleep(100); } while (true); } while (true); return false; } bool IsFileOlderThan(const std::filesystem::path& CheckPath, const std::filesystem::path& ReferencePath) { std::error_code Ec; std::filesystem::file_time_type CheckWriteTime = std::filesystem::last_write_time(CheckPath, Ec); if (Ec) { return true; } std::filesystem::file_time_type ReferenceWriteTime = std::filesystem::last_write_time(ReferencePath, Ec); if (Ec) { return true; } return CheckWriteTime < ReferenceWriteTime; } std::pair GetPagedRange(int32_t TotalSize, const ProjectStore::Oplog::Paging& EntryPaging) { int32_t Start = std::clamp(EntryPaging.Start, 0, TotalSize); int32_t End = EntryPaging.Count < 0 ? TotalSize : (Start + std::min(EntryPaging.Count, TotalSize - Start)); return {Start, End}; } #pragma pack(push) #pragma pack(1) struct OplogIndexHeader { OplogIndexHeader() {} struct BodyV1 { uint64_t LogPosition = 0; uint32_t LSNCount = 0; uint64_t KeyCount = 0; uint32_t OpAddressMapCount = 0; uint32_t LatestOpMapCount = 0; uint64_t ChunkMapCount = 0; uint64_t MetaMapCount = 0; uint64_t FileMapCount = 0; }; struct BodyV2 { uint64_t LogPosition = 0; uint64_t KeyCount = 0; uint32_t OpPayloadIndexCount = 0; uint32_t OpPayloadCount = 0; uint32_t ChunkMapCount = 0; uint32_t MetaMapCount = 0; uint32_t FileMapCount = 0; uint32_t MaxLSN = 0; uint32_t NextOpCoreOffset = 0; // note: Multiple of oplog data alignment! uint32_t Reserved[2] = {0, 0}; }; static constexpr uint32_t ExpectedMagic = 0x7569647a; // 'zidx'; static constexpr uint32_t Version1 = 1; static constexpr uint32_t Version2 = 2; static constexpr uint32_t CurrentVersion = Version2; static constexpr uint64_t DataAlignment = 8; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; union { BodyV1 V1; BodyV2 V2; }; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const OplogIndexHeader& Header) { return XXH32(&Header.Magic, sizeof(OplogIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } }; #pragma pack(pop) static_assert(sizeof(OplogIndexHeader) == 64); static std::uint64_t GetModificationTagFromRawHash(const IoHash& Hash) { IoHash::Hasher H; return H(Hash); } static std::uint64_t GetModificationTagFromModificationTime(IoBuffer FileBuffer) { IoBufferFileReference FileRef; if (FileBuffer.GetFileReference(FileRef)) { std::error_code Ec; uint64_t ModificationTick = GetModificationTickFromHandle(FileRef.FileHandle, Ec); if (!Ec) { return ModificationTick; } } return {}; } } // namespace ////////////////////////////////////////////////////////////////////////// Oid ComputeOpKey(const CbObjectView& Op) { using namespace std::literals; eastl::fixed_vector KeyData; Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { auto Begin = reinterpret_cast(Data); auto End = Begin + Size; KeyData.insert(KeyData.end(), Begin, End); }); XXH3_128 KeyHash128; // This logic currently exists to work around a problem caused by misusing the xxhash // functions in the past. Short keys are evaluated using the old and buggy // path but longer paths are evaluated properly. In the future all key lengths // should be evaluated using the proper path, this is a temporary workaround to // maintain compatibility with existing disk state. if (KeyData.size() < 240) { XXH3_128Stream_deprecated KeyHasher; KeyHasher.Append(KeyData.data(), KeyData.size()); KeyHash128 = KeyHasher.GetHash(); } else { KeyHash128 = XXH3_128::HashMemory(KeyData.data(), KeyData.size()); } Oid KeyHash; memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); return KeyHash; } struct ProjectStore::OplogStorage : public RefCounted { OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath) { } ~OplogStorage() { ZEN_DEBUG("oplog '{}/{}': closing oplog storage at {}", m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), m_OplogStoragePath); try { m_Oplog.Close(); m_OpBlobs.Close(); } catch (const std::exception& Ex) { ZEN_WARN("oplog '{}/{}': flushing oplog at '{}' failed. Reason: '{}'", m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), m_OplogStoragePath, Ex.what()); } } [[nodiscard]] bool Exists() const { return Exists(m_OplogStoragePath); } [[nodiscard]] static bool Exists(const std::filesystem::path& BasePath) { return IsFile(GetLogPath(BasePath)) && IsFile(GetBlobsPath(BasePath)); } [[nodiscard]] bool IsValid() const { return IsValid(m_OplogStoragePath); } [[nodiscard]] static bool IsValid(const std::filesystem::path& BasePath) { return TCasLogFile::IsValid(GetLogPath(BasePath)); } void WipeState() const { std::error_code Ec; RemoveFile(GetLogPath(), Ec); RemoveFile(GetBlobsPath(), Ec); } static bool Delete(const std::filesystem::path& BasePath) { return DeleteDirectories(BasePath); } uint64_t OpBlobsSize() const { return FileSizeFromPath(GetBlobsPath()); } uint64_t OpsSize() const { return OpsSize(m_OplogStoragePath); } static uint64_t OpsSize(const std::filesystem::path& BasePath) { if (Exists(BasePath)) { std::error_code DummyEc; return FileSizeFromPath(GetLogPath(BasePath)) + FileSizeFromPath(GetBlobsPath(BasePath)); } return 0; } uint32_t MaxLSN() const { return m_MaxLsn; } void SetMaxLSNAndNextWriteAddress(uint32_t MaxLSN, const OplogEntryAddress& NextOpFileOffset) { m_MaxLsn.store(MaxLSN); m_NextOpsOffset = RoundUp((NextOpFileOffset.Offset * m_OpsAlign) + NextOpFileOffset.Size, m_OpsAlign); } void SetMaxLSNAndNextOpOffset(uint32_t MaxLSN, uint32_t NextOpOffset) { m_MaxLsn.store(MaxLSN); m_NextOpsOffset = NextOpOffset * m_OpsAlign; } uint32_t GetNextOpOffset() const { return gsl::narrow(m_NextOpsOffset / m_OpsAlign); } enum EMode { Create, Read, Write }; void Open(EMode InMode) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OplogStorage::Open"); CasLogFile::Mode LogMode = CasLogFile::Mode::kRead; BasicFile::Mode BlobsMode = BasicFile::Mode::kRead; if (InMode == EMode::Create) { ZEN_DEBUG("oplog '{}/{}': initializing storage at '{}'", m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), m_OplogStoragePath); DeleteDirectories(m_OplogStoragePath); CreateDirectories(m_OplogStoragePath); LogMode = CasLogFile::Mode::kTruncate; BlobsMode = BasicFile::Mode::kTruncate; } else if (InMode == EMode::Write) { LogMode = CasLogFile::Mode::kWrite; BlobsMode = BasicFile::Mode::kWrite; ZEN_DEBUG("oplog '{}/{}': opening storage at '{}'", m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), m_OplogStoragePath); } else if (InMode == EMode::Read) { ZEN_DEBUG("oplog '{}/{}': opening read only storage at '{}'", m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), m_OplogStoragePath); } m_Oplog.Open(GetLogPath(m_OplogStoragePath), LogMode); m_Oplog.Initialize(); m_OpBlobs.Open(GetBlobsPath(m_OplogStoragePath), BlobsMode); ZEN_ASSERT(IsPow2(m_OpsAlign)); ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1))); } IoBuffer GetOpBuffer(BasicFileBuffer& OpBlobsBuffer, const OplogEntry& LogEntry) const { ZEN_MEMSCOPE(GetProjectstoreTag()); const uint64_t OpFileOffset = LogEntry.OpCoreAddress.Offset * m_OpsAlign; const MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreAddress.Size, OpFileOffset); if (OpBufferView.GetSize() == LogEntry.OpCoreAddress.Size) { return IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize()); } else { IoBuffer OpBuffer(LogEntry.OpCoreAddress.Size); OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreAddress.Size, OpFileOffset); return OpBuffer; } } uint64_t GetEffectiveBlobsSize(std::span Addresses) const { uint64_t EffectiveSize = 0; for (const Oplog::OplogPayload& OpPayload : Addresses) { EffectiveSize += RoundUp(OpPayload.Address.Size, m_OpsAlign); } return EffectiveSize; } void Compact(std::span LSNs, std::function&& Callback, bool RetainLSNs, bool DryRun) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OplogStorage::Compact"); ZEN_INFO("oplog '{}/{}': compacting at '{}'", m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), m_OplogStoragePath); Stopwatch Timer; StringBuilder<64> OplogName; Oid::NewOid().ToString(OplogName); std::filesystem::path OplogPath = m_OplogStoragePath / OplogName.c_str(); std::error_code Ec; TCasLogFile Oplog; Oplog.Open(OplogPath, CasLogFile::Mode::kTruncate); (void)Oplog.Initialize(); TemporaryFile OpBlobs; OpBlobs.CreateTemporary(m_OplogStoragePath, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to create temp file for op blob at '{}'", m_OplogStoragePath)); } try { std::vector Ops; Ops.reserve(LSNs.size()); ProjectStore::LsnMap LSNToIndex; LSNToIndex.reserve(LSNs.size()); for (ProjectStore::LogSequenceNumber LSN : LSNs) { LSNToIndex[LSN] = (size_t)-1; } RwLock::ExclusiveLockScope Lock(m_RwLock); const uint64_t SkipEntryCount = 0; m_Oplog.Replay( [&](const OplogEntry& LogEntry) { if (auto It = LSNToIndex.find(LogEntry.OpLsn); It != LSNToIndex.end()) { if (It->second != (size_t)-1) { Ops[It->second] = LogEntry; } else { LSNToIndex[LogEntry.OpLsn] = Ops.size(); Ops.push_back(LogEntry); } } }, SkipEntryCount); std::sort(Ops.begin(), Ops.end(), [&](const OplogEntry& Lhs, const OplogEntry& Rhs) { return Lhs.OpCoreAddress.Offset < Rhs.OpCoreAddress.Offset; }); std::vector OldLSNs; OldLSNs.reserve(Ops.size()); uint64_t OpWriteOffset = 0; uint32_t MaxLSN = 0; { BasicFileBuffer OldBlobsBuffer(m_OpBlobs, 65536); BasicFileWriter NewOpBlobsBuffer(OpBlobs, 65536); for (OplogEntry& LogEntry : Ops) { OldLSNs.push_back(LogEntry.OpLsn); IoBuffer OpBuffer = GetOpBuffer(OldBlobsBuffer, LogEntry); if (RetainLSNs) { MaxLSN = Max(MaxLSN, LogEntry.OpLsn.Number); } else { LogEntry.OpLsn = ProjectStore::LogSequenceNumber(++MaxLSN); } LogEntry.OpCoreAddress.Offset = gsl::narrow(OpWriteOffset / m_OpsAlign); NewOpBlobsBuffer.Write(OpBuffer.GetData(), LogEntry.OpCoreAddress.Size, OpWriteOffset); OpWriteOffset = RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign); } Oplog.Append(Ops); } uint64_t OldOpLogSize = m_Oplog.GetLogSize(); uint64_t OldOpBlobsSize = m_OpBlobs.FileSize(); if (!DryRun) { m_Oplog.Close(); m_OpBlobs.Close(); Oplog.Close(); RenameFile(OplogPath, GetLogPath(), Ec); if (Ec) { throw std::system_error( Ec, fmt::format("Oplog::Compact failed to rename temporary oplog blob storage file from '{}' to '{}'", OplogPath, GetLogPath())); } OpBlobs.MoveTemporaryIntoPlace(GetBlobsPath(), Ec); if (Ec) { // We failed late - clean everything up as best we can RemoveFile(OpBlobs.GetPath(), Ec); RemoveFile(GetLogPath(), Ec); RemoveFile(GetBlobsPath(), Ec); throw std::system_error(Ec, fmt::format("Oplog::Compact failed to rename temporary oplog file from '{}' to '{}'", OpBlobs.GetPath(), GetBlobsPath())); } m_Oplog.Open(GetLogPath(), CasLogFile::Mode::kWrite); m_Oplog.Initialize(); m_OpBlobs.Open(GetBlobsPath(), BasicFile::Mode::kWrite); m_MaxLsn.store(MaxLSN); m_NextOpsOffset.store(OpWriteOffset); } for (size_t Index = 0; Index < Ops.size(); Index++) { const OplogEntry& LogEntry = Ops[Index]; Callback(LogEntry.OpKeyHash, OldLSNs[Index], LogEntry.OpLsn, LogEntry.OpCoreAddress); } ZEN_INFO("oplog '{}/{}': compact completed in {} - Max LSN# {}, New size: {}, old size {}.", m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()), m_MaxLsn.load(), NiceBytes(m_Oplog.GetLogSize() + m_OpBlobs.FileSize()), NiceBytes(OldOpLogSize + OldOpBlobsSize)); } catch (const std::exception& /*Ex*/) { RemoveFile(OpBlobs.GetPath(), Ec); throw; } } static std::filesystem::path GetLogPath(const std::filesystem::path& OplogStoragePath) { using namespace std::literals; return OplogStoragePath / "ops.zlog"sv; } static std::filesystem::path GetBlobsPath(const std::filesystem::path& OplogStoragePath) { using namespace std::literals; return OplogStoragePath / "ops.zops"sv; } std::filesystem::path GetLogPath() const { return GetLogPath(m_OplogStoragePath); } std::filesystem::path GetBlobsPath() const { return GetBlobsPath(m_OplogStoragePath); } void ReadOplogEntriesFromLog(std::vector& OutOpLogEntries, uint64_t& OutInvalidEntries, uint64_t SkipEntryCount) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OplogStorage::ReadOplogEntriesFromLog"); if (m_Oplog.GetLogCount() == SkipEntryCount) { return; } Stopwatch Timer; uint64_t OpsBlockSize = m_OpBlobs.FileSize(); { tsl::robin_map LatestKeys; if (m_Oplog.GetLogCount() > SkipEntryCount) { LatestKeys.reserve(m_Oplog.GetLogCount() - SkipEntryCount); } m_Oplog.Replay( [&](const OplogEntry& LogEntry) { if (LogEntry.IsTombstone()) { if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It == LatestKeys.end()) { ZEN_SCOPED_WARN("found tombstone referencing unknown key {}", LogEntry.OpKeyHash); ++OutInvalidEntries; return; } } else if (LogEntry.OpCoreAddress.Size == 0) { ZEN_SCOPED_WARN("skipping zero size op {}", LogEntry.OpKeyHash); ++OutInvalidEntries; return; } else if (LogEntry.OpLsn.Number == 0) { ZEN_SCOPED_WARN("skipping zero lsn op {}", LogEntry.OpKeyHash); ++OutInvalidEntries; return; } const uint64_t OpFileOffset = LogEntry.OpCoreAddress.Offset * m_OpsAlign; if ((OpFileOffset + LogEntry.OpCoreAddress.Size) > OpsBlockSize) { ZEN_SCOPED_WARN("skipping out of bounds op {}", LogEntry.OpKeyHash); ++OutInvalidEntries; return; } if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It != LatestKeys.end()) { OplogEntry& Entry = OutOpLogEntries[It->second]; if (LogEntry.IsTombstone() && Entry.IsTombstone()) { ZEN_SCOPED_WARN("found double tombstone - {}", LogEntry.OpKeyHash); } Entry = LogEntry; } else { const size_t OpIndex = OutOpLogEntries.size(); LatestKeys[LogEntry.OpKeyHash] = OpIndex; OutOpLogEntries.push_back(LogEntry); } }, SkipEntryCount); } } void ReplayLog(std::function&& Handler, uint64_t SkipEntryCount = 0) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OplogStorage::ReplayLog"); if (m_Oplog.GetLogCount() == SkipEntryCount) { return; } Stopwatch Timer; std::vector OpLogEntries; uint64_t InvalidEntries = 0; ReadOplogEntriesFromLog(OpLogEntries, InvalidEntries, SkipEntryCount); uint64_t TombstoneEntries = 0; BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); uint32_t MaxOpLsn = m_MaxLsn; uint64_t NextOpFileOffset = m_NextOpsOffset; std::sort(OpLogEntries.begin(), OpLogEntries.end(), [&](const OplogEntry& Lhs, const OplogEntry& Rhs) { return Lhs.OpCoreAddress.Offset < Rhs.OpCoreAddress.Offset; }); for (const OplogEntry& LogEntry : OpLogEntries) { if (LogEntry.IsTombstone()) { TombstoneEntries++; } else { IoBuffer OpBuffer = GetOpBuffer(OpBlobsBuffer, LogEntry); // Verify checksum, ignore op data if incorrect const uint32_t ExpectedOpCoreHash = LogEntry.OpCoreHash; const uint32_t OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.GetData(), LogEntry.OpCoreAddress.Size) & 0xffffFFFF); if (OpCoreHash != ExpectedOpCoreHash) { ZEN_WARN("oplog '{}/{}': skipping bad checksum op - {}. Expected: {}, found: {}", m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), LogEntry.OpKeyHash, ExpectedOpCoreHash, OpCoreHash); } else if (CbValidateError Err = ValidateCompactBinary(OpBuffer.GetView(), CbValidateMode::Default); Err != CbValidateError::None) { ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Error: '{}'", m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), LogEntry.OpKeyHash, ToString(Err)); } else { CbObjectView OpView(OpBuffer.GetData()); if (OpView.GetSize() != OpBuffer.GetSize()) { ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), LogEntry.OpKeyHash, OpView.GetSize(), OpBuffer.GetSize()); } else { Handler(OpView, LogEntry); MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn.Number); const uint64_t EntryNextOpFileOffset = RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign); NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset); } } } } m_MaxLsn = MaxOpLsn; m_NextOpsOffset = NextOpFileOffset; ZEN_INFO("oplog '{}/{}': replay from '{}' completed in {} - Max LSN# {}, Next offset: {}, {} tombstones, {} invalid entries", m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), m_OplogStoragePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs()), m_MaxLsn.load(), m_NextOpsOffset.load(), TombstoneEntries, InvalidEntries); } void ReplayLogEntries(const std::span Entries, const std::span Order, std::function&& Handler) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries"); BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); const uint64_t BlobsSize = m_OpBlobs.FileSize(); for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order) { const Oplog::OplogPayload& Entry = Entries[EntryOffset]; const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign; if (OpFileOffset + Entry.Address.Size > BlobsSize) { ZEN_WARN("oplog '{}/{}': skipping op outside of file size - {}. Op offset: {}, Op size: {}, file size {}", m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), Entry.Lsn.Number, OpFileOffset, Entry.Address.Size, BlobsSize); } else { MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset); if (OpBufferView.GetSize() == Entry.Address.Size) { if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None) { CbObjectView OpView(OpBufferView.GetData()); if (OpView.GetSize() != OpBufferView.GetSize()) { ZEN_WARN( "oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), Entry.Lsn.Number, OpView.GetSize(), OpBufferView.GetSize()); } else { Handler(Entry.Lsn, OpView); } } else { ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation failure: '{}'", m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), Entry.Lsn.Number, ToString(Error)); } } else { IoBuffer OpBuffer(Entry.Address.Size); OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset); OpBufferView = OpBuffer.GetView(); if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None) { CbObjectView OpView(OpBuffer.Data()); if (OpView.GetSize() != OpBuffer.GetSize()) { ZEN_WARN( "oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), Entry.Lsn.Number, OpView.GetSize(), OpBuffer.GetSize()); } else { Handler(Entry.Lsn, OpView); } } else { ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation failure: '{}'", m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), Entry.Lsn.Number, ToString(Error)); } } } } } void ReplayLogEntries(const std::span Entries, const std::span Order, std::function&& Handler) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries"); BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); const uint64_t BlobsSize = m_OpBlobs.FileSize(); for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order) { const Oplog::OplogPayload& Entry = Entries[EntryOffset]; const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign; if (OpFileOffset + Entry.Address.Size > BlobsSize) { Handler(Entry.Lsn, {}); } else { MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset); if (OpBufferView.GetSize() == Entry.Address.Size) { IoBuffer Buffer = IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize()); Handler(Entry.Lsn, Buffer); } else { IoBuffer OpBuffer(Entry.Address.Size); OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset); Handler(Entry.Lsn, OpBuffer); } } } } CbObject GetOp(const OplogEntryAddress& Entry) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OplogStorage::GetOp"); IoBuffer OpBuffer(Entry.Size); const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign; m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset); return CbObject(SharedBuffer(std::move(OpBuffer))); } struct AppendOpData { MemoryView Buffer; uint32_t OpCoreHash; Oid KeyHash; }; static OplogStorage::AppendOpData GetAppendOpData(const CbObjectView& Core) { ZEN_MEMSCOPE(GetProjectstoreTag()); using namespace std::literals; AppendOpData OpData; OpData.Buffer = Core.GetView(); const uint64_t WriteSize = OpData.Buffer.GetSize(); OpData.OpCoreHash = uint32_t(XXH3_64bits(OpData.Buffer.GetData(), WriteSize) & 0xffffFFFF); ZEN_ASSERT(WriteSize != 0); OpData.KeyHash = ComputeOpKey(Core); return OpData; } OplogEntry AppendOp(const AppendOpData& OpData) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OplogStorage::AppendOp"); uint64_t WriteSize = OpData.Buffer.GetSize(); RwLock::ExclusiveLockScope Lock(m_RwLock); const uint64_t WriteOffset = m_NextOpsOffset; const LogSequenceNumber OpLsn(++m_MaxLsn); if (!OpLsn.Number) { ZEN_ERROR("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId()); throw std::runtime_error(fmt::format("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId())); } m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign); Lock.ReleaseNow(); ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign)); OplogEntry Entry = {.OpLsn = OpLsn, .OpCoreAddress = {.Offset = gsl::narrow_cast(WriteOffset / m_OpsAlign), .Size = uint32_t(WriteSize)}, .OpCoreHash = OpData.OpCoreHash, .OpKeyHash = OpData.KeyHash}; m_OpBlobs.Write(OpData.Buffer.GetData(), WriteSize, WriteOffset); m_Oplog.Append(Entry); return Entry; } std::vector AppendOps(std::span Ops) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OplogStorage::AppendOps"); size_t OpCount = Ops.size(); std::vector> OffsetAndSizes; std::vector OpLsns; OffsetAndSizes.resize(OpCount); OpLsns.resize(OpCount); for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) { OffsetAndSizes[OpIndex].second = Ops[OpIndex].Buffer.GetSize(); } uint64_t WriteStart = 0; uint64_t WriteLength = 0; { RwLock::ExclusiveLockScope Lock(m_RwLock); WriteStart = m_NextOpsOffset; ZEN_ASSERT(IsMultipleOf(WriteStart, m_OpsAlign)); uint64_t WriteOffset = WriteStart; for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) { OffsetAndSizes[OpIndex].first = WriteOffset - WriteStart; OpLsns[OpIndex] = LogSequenceNumber(++m_MaxLsn); if (!OpLsns[OpIndex]) { ZEN_ERROR("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId()); throw std::runtime_error(fmt::format("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId())); } WriteOffset = RoundUp(WriteOffset + OffsetAndSizes[OpIndex].second, m_OpsAlign); } WriteLength = WriteOffset - WriteStart; m_NextOpsOffset = RoundUp(WriteOffset, m_OpsAlign); } IoBuffer WriteBuffer(WriteLength); std::vector Entries; Entries.resize(OpCount); for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) { MutableMemoryView WriteBufferView = WriteBuffer.GetMutableView().RightChop(OffsetAndSizes[OpIndex].first); WriteBufferView.CopyFrom(Ops[OpIndex].Buffer); Entries[OpIndex] = { .OpLsn = OpLsns[OpIndex], .OpCoreAddress = {.Offset = gsl::narrow_cast((WriteStart + OffsetAndSizes[OpIndex].first) / m_OpsAlign), .Size = uint32_t(OffsetAndSizes[OpIndex].second)}, .OpCoreHash = Ops[OpIndex].OpCoreHash, .OpKeyHash = Ops[OpIndex].KeyHash}; } m_OpBlobs.Write(WriteBuffer.GetData(), WriteBuffer.GetSize(), WriteStart); m_Oplog.Append(Entries); return Entries; } void AppendTombstone(Oid KeyHash) { OplogEntry Entry = {.OpKeyHash = KeyHash}; Entry.MakeTombstone(); m_Oplog.Append(Entry); } void Flush() { m_Oplog.Flush(); m_OpBlobs.Flush(); } uint64_t LogCount() const { return m_Oplog.GetLogCount(); } LoggerRef Log() { return m_OwnerOplog->Log(); } private: ProjectStore::Oplog* m_OwnerOplog; std::filesystem::path m_OplogStoragePath; mutable RwLock m_RwLock; TCasLogFile m_Oplog; BasicFile m_OpBlobs; std::atomic m_NextOpsOffset{0}; uint64_t m_OpsAlign = 32; std::atomic m_MaxLsn{0}; }; ////////////////////////////////////////////////////////////////////////// ProjectStore::Oplog::Oplog(const LoggerRef& InLog, std::string_view ProjectIdentifier, std::string_view Id, CidStore& Store, const std::filesystem::path& BasePath, const std::filesystem::path& MarkerPath, EMode State) : m_Log(InLog) , m_OuterProjectId(ProjectIdentifier) , m_OplogId(Id) , m_CidStore(Store) , m_BasePath(BasePath) , m_MarkerPath(MarkerPath) , m_TempPath(m_BasePath / std::string_view("temp")) , m_MetaPath(m_BasePath / std::string_view("ops.meta")) , m_Mode(State) , m_MetaValid(false) , m_PendingPrepOpAttachmentsRetainEnd(GcClock::Now()) { ZEN_MEMSCOPE(GetProjectstoreTag()); using namespace std::literals; m_Storage = new OplogStorage(this, m_BasePath); OplogStorage::EMode StorageMode = OplogStorage::EMode::Write; if (m_Mode == EMode::kBasicReadOnly) { StorageMode = OplogStorage::EMode::Read; } else { bool StoreExists = m_Storage->Exists(); if (StoreExists) { if (!m_Storage->IsValid()) { ZEN_WARN("Invalid oplog found at '{}'. Wiping state for oplog.", m_BasePath); m_Storage->WipeState(); std::error_code DummyEc; RemoveFile(m_MetaPath, DummyEc); StoreExists = false; } } if (!StoreExists) { StorageMode = OplogStorage::EMode::Create; } } m_Storage->Open(StorageMode); m_MetaValid = !IsFileOlderThan(m_MetaPath, m_Storage->GetBlobsPath()); CleanDirectory(m_TempPath, /*ForceRemoveReadOnlyFiles*/ false); } ProjectStore::Oplog::~Oplog() { try { RwLock::ExclusiveLockScope Lock(m_OplogLock); if (m_Storage) { if (m_Mode == EMode::kFull) { m_Storage->Flush(); if (!m_MetaValid) { std::error_code DummyEc; RemoveFile(m_MetaPath, DummyEc); } uint64_t LogCount = m_Storage->LogCount(); if (m_LogFlushPosition != LogCount || m_IsLegacySnapshot) { WriteIndexSnapshot(Lock); } } m_Storage = {}; } } catch (const std::exception& Ex) { ZEN_ERROR("Oplog::~Oplog threw exception: '{}'", Ex.what()); } } void ProjectStore::Oplog::Flush() { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Oplog::Flush"); if (m_Mode == EMode::kFull) { RwLock::ExclusiveLockScope Lock(m_OplogLock); if (!m_Storage) { return; } m_Storage->Flush(); if (!m_MetaValid) { std::error_code DummyEc; RemoveFile(m_MetaPath, DummyEc); } uint64_t LogCount = m_Storage->LogCount(); if (m_LogFlushPosition != LogCount || m_IsLegacySnapshot) { WriteIndexSnapshot(Lock); } } } void ProjectStore::Oplog::RefreshLsnToPayloadOffsetMap(RwLock::ExclusiveLockScope&) { if (!m_LsnToPayloadOffsetMap) { m_LsnToPayloadOffsetMap = std::make_unique>(); m_LsnToPayloadOffsetMap->reserve(m_OpLogPayloads.size()); for (uint32_t PayloadOffset = 0; PayloadOffset < m_OpLogPayloads.size(); PayloadOffset++) { const OplogPayload& Payload = m_OpLogPayloads[PayloadOffset]; if (Payload.Lsn.Number != 0 && Payload.Address.Size != 0) { m_LsnToPayloadOffsetMap->insert_or_assign(Payload.Lsn, PayloadIndex(PayloadOffset)); } } } } void ProjectStore::Oplog::Scrub(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_INFO("scrubbing oplog '{}/{}'", m_OuterProjectId, m_OplogId); ZEN_ASSERT(m_Mode == EMode::kFull); Stopwatch Timer; std::atomic_uint64_t OpCount = 0; std::atomic_uint64_t VerifiedOpBytes = 0; auto LogStats = MakeGuard([&] { const uint32_t DurationMs = gsl::narrow(Timer.GetElapsedTimeMs()); ZEN_INFO("oplog '{}/{}' scrubbed {} in {} from {} ops ({})", m_OuterProjectId, m_OplogId, NiceBytes(VerifiedOpBytes.load()), NiceTimeSpanMs(DurationMs), OpCount.load(), NiceRate(VerifiedOpBytes, DurationMs)); }); std::vector> BadEntries; using namespace std::literals; IterateOplogWithKeyRaw([&](LogSequenceNumber Lsn, const Oid& Key, const IoBuffer& Buffer) { Ctx.ThrowIfDeadlineExpired(); OpCount++; VerifiedOpBytes += Buffer.GetSize(); if (!Buffer) { ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) could not be read from disk", Key, Lsn.Number); BadEntries.push_back({Lsn, Key}); return; } { MemoryView OpBufferView = Buffer.GetView(); if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error != CbValidateError::None) { ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) is not valid compact binary. Error: {}", Key, Lsn.Number, ToString(Error)); BadEntries.push_back({Lsn, Key}); return; } } CbObjectView OpView(Buffer.GetData()); if (OpView.GetSize() != Buffer.GetSize()) { ZEN_WARN("Scrub: oplog payload size {} for op {} (Lns: {}) does not match object size {}", Buffer.GetSize(), Key, Lsn.Number, OpView.GetSize()); BadEntries.push_back({Lsn, Key}); return; } { const Oid KeyHash = ComputeOpKey(OpView); if (KeyHash != Key) { BadEntries.push_back({Lsn, Key}); ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) does not match information from index (op:{} != index:{})", Key, Lsn.Number, KeyHash, Key); return; } } }); if (!BadEntries.empty()) { if (Ctx.RunRecovery()) { ZEN_WARN("oplog '{}/{}': scrubbing found {} bad ops in oplog @ '{}', these will be removed from the index", m_OuterProjectId, m_OplogId, BadEntries.size(), m_BasePath); // Actually perform some clean-up RwLock::ExclusiveLockScope Lock(m_OplogLock); RefreshLsnToPayloadOffsetMap(Lock); for (const auto& BadEntry : BadEntries) { const LogSequenceNumber BadLsn = BadEntry.first; const Oid& BadKey = BadEntry.second; if (auto It = m_OpToPayloadOffsetMap.find(BadKey); It != m_OpToPayloadOffsetMap.end()) { OplogPayload& Payload = m_OpLogPayloads[It->second]; if (Payload.Lsn == BadLsn) { m_OpToPayloadOffsetMap.erase(It); m_Storage->AppendTombstone(BadKey); } } if (auto LsnIt = m_LsnToPayloadOffsetMap->find(BadLsn); LsnIt != m_LsnToPayloadOffsetMap->end()) { const PayloadIndex LsnPayloadOffset = LsnIt->second; OplogPayload& LsnPayload = m_OpLogPayloads[LsnPayloadOffset]; LsnPayload = {.Lsn = LogSequenceNumber(0), .Address = {.Offset = 0, .Size = 0}}; } m_LsnToPayloadOffsetMap->erase(BadLsn); } if (!BadEntries.empty()) { m_MetaValid = false; } } else { ZEN_WARN("oplog '{}/{}': scrubbing found {} bad ops in oplog @ '{}' but no cleanup will be performed", m_OuterProjectId, m_OplogId, BadEntries.size(), m_BasePath); } } } uint64_t ProjectStore::Oplog::TotalSize(const std::filesystem::path& BasePath) { ZEN_MEMSCOPE(GetProjectstoreTag()); using namespace std::literals; uint64_t Size = OplogStorage::OpsSize(BasePath); std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; if (IsFile(StateFilePath)) { Size += FileSizeFromPath(StateFilePath); } std::filesystem::path MetaFilePath = BasePath / "ops.meta"sv; if (IsFile(MetaFilePath)) { Size += FileSizeFromPath(MetaFilePath); } std::filesystem::path IndexFilePath = BasePath / "ops.zidx"sv; if (IsFile(IndexFilePath)) { Size += FileSizeFromPath(IndexFilePath); } return Size; } uint64_t ProjectStore::Oplog::TotalSize() const { return TotalSize(m_BasePath); } void ProjectStore::Oplog::ResetState() { RwLock::ExclusiveLockScope _(m_OplogLock); m_ChunkMap.clear(); m_MetaMap.clear(); m_FileMap.clear(); m_OpToPayloadOffsetMap.clear(); m_OpLogPayloads.clear(); m_LsnToPayloadOffsetMap.reset(); m_Storage = {}; } bool ProjectStore::Oplog::PrepareForDelete(std::filesystem::path& OutRemoveDirectory) { ZEN_MEMSCOPE(GetProjectstoreTag()); RwLock::ExclusiveLockScope _(m_OplogLock); m_UpdateCaptureRefCounter = 0; m_CapturedOps.reset(); m_CapturedAttachments.reset(); m_PendingPrepOpAttachments.clear(); m_ChunkMap.clear(); m_MetaMap.clear(); m_FileMap.clear(); m_OpToPayloadOffsetMap.clear(); m_OpLogPayloads.clear(); m_LsnToPayloadOffsetMap.reset(); m_Storage = {}; if (PrepareDirectoryDelete(m_BasePath, OutRemoveDirectory)) { return true; } return false; } bool ProjectStore::Oplog::ExistsAt(const std::filesystem::path& BasePath) { using namespace std::literals; std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; return IsFile(StateFilePath); } bool ProjectStore::Oplog::Exists() const { return ExistsAt(m_BasePath); } void ProjectStore::Oplog::Read() { ZEN_MEMSCOPE(GetProjectstoreTag()); using namespace std::literals; ZEN_TRACE_CPU("Oplog::Read"); ZEN_LOG_SCOPE("Oplog::Read '{}'", m_OplogId); ZEN_DEBUG("oplog '{}/{}': reading from '{}'. State: {}", m_OuterProjectId, m_OplogId, m_BasePath, m_Mode == EMode::kFull ? "Full" : "BasicNoLookups"); std::optional Config = ReadStateFile(m_BasePath, [this]() { return Log(); }); if (Config.has_value()) { if (Config.value().GetSize() == 0) { // Invalid config file return; } m_MarkerPath = Config.value()["gcpath"sv].AsU8String(); } if (!m_MetaValid) { std::error_code DummyEc; RemoveFile(m_MetaPath, DummyEc); } ZEN_ASSERT(!m_LsnToPayloadOffsetMap); RwLock::ExclusiveLockScope Lock(m_OplogLock); ReadIndexSnapshot(Lock); if (m_Mode == EMode::kFull) { m_Storage->ReplayLog( [&](CbObjectView Op, const OplogEntry& OpEntry) { const OplogEntryMapping OpMapping = GetMapping(Op); // Update chunk id maps for (const ChunkMapping& Chunk : OpMapping.Chunks) { m_ChunkMap.insert_or_assign(Chunk.Id, Chunk.Hash); } for (const FileMapping& File : OpMapping.Files) { if (File.Hash != IoHash::Zero) { m_ChunkMap.insert_or_assign(File.Id, File.Hash); } m_FileMap.insert_or_assign(File.Id, FileMapEntry{.ServerPath = File.Hash == IoHash::Zero ? File.ServerPath : std::string(), .ClientPath = File.ClientPath}); } for (const ChunkMapping& Meta : OpMapping.Meta) { m_MetaMap.insert_or_assign(Meta.Id, Meta.Hash); } const PayloadIndex PayloadOffset(m_OpLogPayloads.size()); m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset); m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress}); }, m_LogFlushPosition); if (m_Storage->LogCount() != m_LogFlushPosition || m_IsLegacySnapshot) { WriteIndexSnapshot(Lock); } } else { std::vector OpLogEntries; uint64_t InvalidEntries; m_Storage->ReadOplogEntriesFromLog(OpLogEntries, InvalidEntries, m_LogFlushPosition); for (const OplogEntry& OpEntry : OpLogEntries) { const PayloadIndex PayloadOffset(m_OpLogPayloads.size()); m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset); m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress}); } } } void ProjectStore::Oplog::Write() { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_ASSERT(m_Mode != EMode::kBasicReadOnly); using namespace std::literals; BinaryWriter Mem; CbObjectWriter Cfg; Cfg << "gcpath"sv << PathToUtf8(m_MarkerPath); Cfg.Save(Mem); std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; ZEN_DEBUG("oplog '{}/{}': persisting config to '{}'", m_OuterProjectId, m_OplogId, StateFilePath); TemporaryFile::SafeWriteFile(StateFilePath, Mem.GetView()); } void ProjectStore::Oplog::Update(const std::filesystem::path& MarkerPath) { if (m_MarkerPath == MarkerPath) { return; } Write(); } bool ProjectStore::Oplog::Reset() { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_ASSERT(m_Mode == EMode::kFull); std::filesystem::path MovedDir; { RwLock::ExclusiveLockScope OplogLock(m_OplogLock); m_Storage = {}; if (!PrepareDirectoryDelete(m_BasePath, MovedDir)) { m_Storage = new OplogStorage(this, m_BasePath); const bool StoreExists = m_Storage->Exists(); m_Storage->Open(StoreExists ? OplogStorage::EMode::Write : OplogStorage::EMode::Create); m_MetaValid = !IsFileOlderThan(m_MetaPath, m_Storage->GetBlobsPath()); return false; } m_ChunkMap.clear(); m_MetaMap.clear(); m_FileMap.clear(); m_OpToPayloadOffsetMap.clear(); m_OpLogPayloads.clear(); m_LsnToPayloadOffsetMap.reset(); m_Storage = new OplogStorage(this, m_BasePath); m_Storage->Open(OplogStorage::EMode::Create); m_MetaValid = false; CleanDirectory(m_TempPath, /*ForceRemoveReadOnlyFiles*/ false); Write(); } // Erase content on disk if (!MovedDir.empty()) { OplogStorage::Delete(MovedDir); } return true; } bool ProjectStore::Oplog::CanUnload() { ZEN_MEMSCOPE(GetProjectstoreTag()); RwLock::SharedLockScope _(m_OplogLock); uint64_t LogCount = m_Storage->LogCount(); if (m_LogFlushPosition != LogCount) { return false; // The oplog is not flushed so likely this is an active oplog } if (!m_PendingPrepOpAttachments.empty()) { return false; // We have a pending oplog prep operation in flight } if (m_UpdateCaptureRefCounter > 0) { return false; // GC capture is enable for the oplog } return true; } std::optional ProjectStore::Oplog::ReadStateFile(const std::filesystem::path& BasePath, std::function&& Log) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Oplog::ReadStateFile"); using namespace std::literals; std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; if (IsFile(StateFilePath)) { // ZEN_INFO("oplog '{}/{}': config read from '{}'", m_OuterProjectId, m_OplogId, StateFilePath); BasicFile Blob; Blob.Open(StateFilePath, BasicFile::Mode::kRead); IoBuffer Obj = Blob.ReadAll(); CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); if (ValidationError != CbValidateError::None) { ZEN_ERROR("validation error {} hit for oplog config at '{}'", ToString(ValidationError), StateFilePath); return CbObject(); } return LoadCompactBinaryObject(Obj); } ZEN_INFO("config for oplog not found at '{}'. Assuming legacy store", StateFilePath); return {}; } ProjectStore::Oplog::ValidationResult ProjectStore::Oplog::Validate(const std::filesystem::path& ProjectRootDir, std::atomic_bool& IsCancelledFlag, WorkerThreadPool* OptionalWorkerPool) { ZEN_MEMSCOPE(GetProjectstoreTag()); using namespace std::literals; ValidationResult Result; const size_t OpCount = OplogCount(); std::vector KeyHashes; std::vector Keys; std::vector> Attachments; std::vector Mappings; KeyHashes.reserve(OpCount); Keys.reserve(OpCount); Mappings.reserve(OpCount); IterateOplogWithKey([&](LogSequenceNumber LSN, const Oid& Key, CbObjectView OpView) { Result.LSNLow = Min(Result.LSNLow, LSN); Result.LSNHigh = Max(Result.LSNHigh, LSN); KeyHashes.push_back(Key); Keys.emplace_back(std::string(OpView["key"sv].AsString())); std::vector OpAttachments; OpView.IterateAttachments([&OpAttachments](CbFieldView Attachment) { OpAttachments.push_back(Attachment.AsAttachment()); }); Attachments.emplace_back(std::move(OpAttachments)); Mappings.push_back(GetMapping(OpView)); }); Result.OpCount = gsl::narrow(Keys.size()); RwLock ResultLock; auto ValidateOne = [&](uint32_t OpIndex) { const Oid& KeyHash = KeyHashes[OpIndex]; const std::string& Key = Keys[OpIndex]; const OplogEntryMapping& Mapping(Mappings[OpIndex]); bool HasMissingEntries = false; for (const ChunkMapping& Chunk : Mapping.Chunks) { if (!m_CidStore.ContainsChunk(Chunk.Hash)) { ResultLock.WithExclusiveLock([&]() { Result.MissingChunks.push_back({KeyHash, Chunk}); }); HasMissingEntries = true; } } for (const ChunkMapping& Meta : Mapping.Meta) { if (!m_CidStore.ContainsChunk(Meta.Hash)) { ResultLock.WithExclusiveLock([&]() { Result.MissingMetas.push_back({KeyHash, Meta}); }); HasMissingEntries = true; } } for (const FileMapping& File : Mapping.Files) { if (File.Hash == IoHash::Zero) { std::filesystem::path FilePath = ProjectRootDir / File.ServerPath; if (!IsFile(FilePath)) { ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); HasMissingEntries = true; } } else if (!m_CidStore.ContainsChunk(File.Hash)) { ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); HasMissingEntries = true; } } const std::vector& OpAttachments = Attachments[OpIndex]; for (const IoHash& Attachment : OpAttachments) { if (!m_CidStore.ContainsChunk(Attachment)) { ResultLock.WithExclusiveLock([&]() { Result.MissingAttachments.push_back({KeyHash, Attachment}); }); HasMissingEntries = true; } } if (HasMissingEntries) { ResultLock.WithExclusiveLock([&]() { Result.OpKeys.push_back({KeyHash, Key}); }); } }; std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) { if (AbortFlag) { break; } if (OptionalWorkerPool) { Work.ScheduleWork(*OptionalWorkerPool, [&ValidateOne, Index = OpIndex](std::atomic& AbortFlag) { ZEN_MEMSCOPE(GetProjectstoreTag()); if (AbortFlag) { return; } ValidateOne(Index); }); } else { ValidateOne(OpIndex); } } } catch (const std::exception& Ex) { AbortFlag.store(true); ZEN_WARN("Failed validating oplogs in {}. Reason: '{}'", m_BasePath, Ex.what()); } Work.Wait(); { // Check if we were deleted while we were checking the references without a lock... RwLock::SharedLockScope _(m_OplogLock); if (!m_Storage) { Result = {}; } } return Result; } void ProjectStore::Oplog::WriteIndexSnapshot(RwLock::ExclusiveLockScope&) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Oplog::WriteIndexSnapshot"); ZEN_ASSERT(m_Mode == EMode::kFull); ZEN_DEBUG("oplog '{}/{}': write store snapshot at '{}'", m_OuterProjectId, m_OplogId, m_BasePath); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("oplog '{}/{}': wrote store snapshot for '{}' containing {} entries in {}", m_OuterProjectId, m_OplogId, m_BasePath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); namespace fs = std::filesystem; const fs::path IndexPath = m_BasePath / "ops.zidx"; try { std::vector Keys; std::vector OpPayloadOffsets; std::vector ChunkMapEntries; std::vector MetaMapEntries; std::vector FilePathLengths; std::vector FilePaths; uint64_t IndexLogPosition = 0; { IndexLogPosition = m_Storage->LogCount(); Keys.reserve(m_OpToPayloadOffsetMap.size() + m_ChunkMap.size() + m_MetaMap.size() + m_FileMap.size()); OpPayloadOffsets.reserve(m_OpToPayloadOffsetMap.size()); for (const auto& Kv : m_OpToPayloadOffsetMap) { Keys.push_back(Kv.first); OpPayloadOffsets.push_back(Kv.second); } ChunkMapEntries.reserve(m_ChunkMap.size()); for (const auto& It : m_ChunkMap) { Keys.push_back(It.first); ChunkMapEntries.push_back(It.second); } MetaMapEntries.reserve(m_MetaMap.size()); for (const auto& It : m_MetaMap) { Keys.push_back(It.first); MetaMapEntries.push_back(It.second); } FilePathLengths.reserve(m_FileMap.size() * 2); FilePaths.reserve(m_FileMap.size() * 2); for (const auto& It : m_FileMap) { Keys.push_back(It.first); FilePathLengths.push_back(gsl::narrow(It.second.ServerPath.length())); FilePathLengths.push_back(gsl::narrow(It.second.ClientPath.length())); FilePaths.push_back(It.second.ServerPath); FilePaths.push_back(It.second.ClientPath); } } TemporaryFile ObjectIndexFile; std::error_code Ec; ObjectIndexFile.CreateTemporary(IndexPath.parent_path(), Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to create temp file for index snapshot at '{}'", IndexPath)); } { BasicFileWriter IndexFile(ObjectIndexFile, 65536u); OplogIndexHeader Header; Header.V2 = {.LogPosition = IndexLogPosition, .KeyCount = gsl::narrow(Keys.size()), .OpPayloadIndexCount = gsl::narrow(OpPayloadOffsets.size()), .OpPayloadCount = gsl::narrow(m_OpLogPayloads.size()), .ChunkMapCount = gsl::narrow(ChunkMapEntries.size()), .MetaMapCount = gsl::narrow(MetaMapEntries.size()), .FileMapCount = gsl::narrow(FilePathLengths.size() / 2), .MaxLSN = m_Storage->MaxLSN(), .NextOpCoreOffset = m_Storage->GetNextOpOffset()}; Header.Checksum = OplogIndexHeader::ComputeChecksum(Header); uint64_t Offset = 0; IndexFile.Write(&Header, sizeof(OplogIndexHeader), Offset); Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(Keys.data(), Keys.size() * sizeof(Oid), Offset); Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(OpPayloadOffsets.data(), OpPayloadOffsets.size() * sizeof(PayloadIndex), Offset); Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(m_OpLogPayloads.data(), m_OpLogPayloads.size() * sizeof(OplogPayload), Offset); Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset); Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset); Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset); Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); for (const auto& FilePath : FilePaths) { IndexFile.Write(FilePath.c_str(), FilePath.length(), Offset); Offset += FilePath.length(); } } ObjectIndexFile.Flush(); ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", ObjectIndexFile.GetPath(), IndexPath, Ec.message())); } EntryCount = m_OpLogPayloads.size(); m_LogFlushPosition = IndexLogPosition; m_IsLegacySnapshot = false; } catch (const std::exception& Err) { ZEN_WARN("oplog '{}/{}': snapshot FAILED, reason: '{}'", m_OuterProjectId, m_OplogId, Err.what()); } } void ProjectStore::Oplog::ReadIndexSnapshot(RwLock::ExclusiveLockScope&) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Oplog::ReadIndexSnapshot"); const std::filesystem::path IndexPath = m_BasePath / "ops.zidx"; if (IsFile(IndexPath)) { uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_DEBUG("oplog '{}/{}': index read from '{}' containing {} entries in {}", m_OuterProjectId, m_OplogId, IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); try { BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); uint64_t Size = ObjectIndexFile.FileSize(); if (Size >= sizeof(OplogIndexHeader)) { OplogIndexHeader Header; uint64_t Offset = 0; ObjectIndexFile.Read(&Header, sizeof(Header), 0); Offset += sizeof(OplogIndexHeader); Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); if (Header.Magic == OplogIndexHeader::ExpectedMagic) { uint32_t Checksum = OplogIndexHeader::ComputeChecksum(Header); if (Header.Checksum != Checksum) { ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Checksum mismatch. Expected: {}, Found: {}", m_OuterProjectId, m_OplogId, IndexPath, Header.Checksum, Checksum); return; } if (Header.Version == OplogIndexHeader::Version1) { uint32_t MaxLSN = 0; OplogEntryAddress LastOpAddress{.Offset = 0, .Size = 0}; if (Header.V1.LatestOpMapCount + Header.V1.ChunkMapCount + Header.V1.MetaMapCount + Header.V1.FileMapCount != Header.V1.KeyCount) { ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Key count mismatch. Expected: {}, Found: {}", m_OuterProjectId, m_OplogId, IndexPath, Header.V1.LatestOpMapCount + Header.V1.ChunkMapCount + Header.V1.MetaMapCount + Header.V1.FileMapCount, Header.V1.KeyCount); return; } std::vector LSNEntries(Header.V1.LSNCount); ObjectIndexFile.Read(LSNEntries.data(), LSNEntries.size() * sizeof(uint32_t), Offset); Offset += LSNEntries.size() * sizeof(uint32_t); Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); size_t LSNOffset = 0; std::vector Keys(Header.V1.KeyCount); ObjectIndexFile.Read(Keys.data(), Keys.size() * sizeof(Oid), Offset); Offset += Keys.size() * sizeof(Oid); Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); size_t KeyOffset = 0; { std::vector AddressMapEntries(Header.V1.OpAddressMapCount); ObjectIndexFile.Read(AddressMapEntries.data(), AddressMapEntries.size() * sizeof(OplogEntryAddress), Offset); Offset += AddressMapEntries.size() * sizeof(OplogEntryAddress); Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); tsl::robin_map LsnToPayloadOffset; LsnToPayloadOffset.reserve(AddressMapEntries.size()); m_OpLogPayloads.reserve(AddressMapEntries.size()); for (const OplogEntryAddress& Address : AddressMapEntries) { const uint32_t LSN = LSNEntries[LSNOffset++]; LsnToPayloadOffset.insert_or_assign(LSN, PayloadIndex(m_OpLogPayloads.size())); m_OpLogPayloads.push_back({.Lsn = LogSequenceNumber(LSN), .Address = Address}); if (Address.Offset > LastOpAddress.Offset) { LastOpAddress = Address; } MaxLSN = Max(MaxLSN, LSN); } { std::vector LatestOpMapEntries(Header.V1.LatestOpMapCount); ObjectIndexFile.Read(LatestOpMapEntries.data(), LatestOpMapEntries.size() * sizeof(uint32_t), Offset); Offset += LatestOpMapEntries.size() * sizeof(uint32_t); Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); m_OpToPayloadOffsetMap.reserve(LatestOpMapEntries.size()); for (uint32_t Lsn : LatestOpMapEntries) { if (auto It = LsnToPayloadOffset.find(Lsn); It != LsnToPayloadOffset.end()) { m_OpToPayloadOffsetMap.insert_or_assign(Keys[KeyOffset++], It->second); MaxLSN = Max(MaxLSN, Lsn); } } } } if (m_Mode == EMode::kFull) { { std::vector ChunkMapEntries(Header.V1.ChunkMapCount); ObjectIndexFile.Read(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset); Offset += ChunkMapEntries.size() * sizeof(IoHash); Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); m_ChunkMap.reserve(ChunkMapEntries.size()); for (const IoHash& ChunkId : ChunkMapEntries) { m_ChunkMap.insert_or_assign(Keys[KeyOffset++], ChunkId); } } { std::vector MetaMapEntries(Header.V1.MetaMapCount); ObjectIndexFile.Read(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset); Offset += MetaMapEntries.size() * sizeof(IoHash); Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); m_MetaMap.reserve(MetaMapEntries.size()); for (const IoHash& ChunkId : MetaMapEntries) { m_MetaMap.insert_or_assign(Keys[KeyOffset++], ChunkId); } } { m_FileMap.reserve(Header.V1.FileMapCount); std::vector FilePathLengths(Header.V1.FileMapCount * 2); ObjectIndexFile.Read(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset); Offset += FilePathLengths.size() * sizeof(uint32_t); Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); BasicFileBuffer IndexFile(ObjectIndexFile, 65536); auto ReadString([&IndexFile, &Offset](uint32_t Length) -> std::string { MemoryView StringData = IndexFile.MakeView(Length, Offset); if (StringData.GetSize() != Length) { throw std::runtime_error(fmt::format("Invalid format. Expected to read {} bytes but got {}", Length, uint32_t(StringData.GetSize()))); } Offset += Length; return std::string((const char*)StringData.GetData(), Length); }); for (uint64_t FileLengthOffset = 0; FileLengthOffset < FilePathLengths.size();) { std::string ServerPath = ReadString(FilePathLengths[FileLengthOffset++]); std::string ClientPath = ReadString(FilePathLengths[FileLengthOffset++]); m_FileMap.insert_or_assign( Keys[KeyOffset++], FileMapEntry{.ServerPath = std::move(ServerPath), .ClientPath = std::move(ClientPath)}); } } } m_LogFlushPosition = Header.V1.LogPosition; m_Storage->SetMaxLSNAndNextWriteAddress(MaxLSN, LastOpAddress); EntryCount = Header.V1.LSNCount; m_IsLegacySnapshot = true; } else if (Header.Version == OplogIndexHeader::Version2) { std::vector Keys(Header.V2.KeyCount); ObjectIndexFile.Read(Keys.data(), Keys.size() * sizeof(Oid), Offset); Offset += Keys.size() * sizeof(Oid); Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); std::vector OpPayloadOffsets(Header.V2.OpPayloadIndexCount); ObjectIndexFile.Read(OpPayloadOffsets.data(), OpPayloadOffsets.size() * sizeof(PayloadIndex), Offset); Offset += OpPayloadOffsets.size() * sizeof(PayloadIndex); Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); m_OpLogPayloads.resize(Header.V2.OpPayloadCount); ObjectIndexFile.Read(m_OpLogPayloads.data(), m_OpLogPayloads.size() * sizeof(OplogPayload), Offset); Offset += m_OpLogPayloads.size() * sizeof(OplogPayload); Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); size_t KeyOffset = 0; m_OpToPayloadOffsetMap.reserve(Header.V2.OpPayloadIndexCount); for (const PayloadIndex PayloadOffset : OpPayloadOffsets) { const Oid& Key = Keys[KeyOffset++]; ZEN_ASSERT_SLOW(PayloadOffset.Index < Header.V2.OpPayloadCount); m_OpToPayloadOffsetMap.insert({Key, PayloadOffset}); } if (m_Mode == EMode::kFull) { { std::vector ChunkMapEntries(Header.V2.ChunkMapCount); ObjectIndexFile.Read(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset); Offset += ChunkMapEntries.size() * sizeof(IoHash); Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); m_ChunkMap.reserve(ChunkMapEntries.size()); for (const IoHash& ChunkId : ChunkMapEntries) { m_ChunkMap.insert_or_assign(Keys[KeyOffset++], ChunkId); } } { std::vector MetaMapEntries(Header.V2.MetaMapCount); ObjectIndexFile.Read(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset); Offset += MetaMapEntries.size() * sizeof(IoHash); Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); m_MetaMap.reserve(MetaMapEntries.size()); for (const IoHash& ChunkId : MetaMapEntries) { m_MetaMap.insert_or_assign(Keys[KeyOffset++], ChunkId); } } { m_FileMap.reserve(Header.V2.FileMapCount); std::vector FilePathLengths(Header.V2.FileMapCount * 2); ObjectIndexFile.Read(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset); Offset += FilePathLengths.size() * sizeof(uint32_t); Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); BasicFileBuffer IndexFile(ObjectIndexFile, 65536); auto ReadString([&IndexFile, &Offset](uint32_t Length) -> std::string { MemoryView StringData = IndexFile.MakeView(Length, Offset); if (StringData.GetSize() != Length) { throw std::runtime_error(fmt::format("Invalid format. Expected to read {} bytes but got {}", Length, uint32_t(StringData.GetSize()))); } Offset += Length; return std::string((const char*)StringData.GetData(), Length); }); for (uint64_t FileLengthOffset = 0; FileLengthOffset < FilePathLengths.size();) { std::string ServerPath = ReadString(FilePathLengths[FileLengthOffset++]); std::string ClientPath = ReadString(FilePathLengths[FileLengthOffset++]); m_FileMap.insert_or_assign( Keys[KeyOffset++], FileMapEntry{.ServerPath = std::move(ServerPath), .ClientPath = std::move(ClientPath)}); } } } m_LogFlushPosition = Header.V2.LogPosition; m_Storage->SetMaxLSNAndNextOpOffset(Header.V2.MaxLSN, Header.V2.NextOpCoreOffset); EntryCount = Header.V2.OpPayloadCount; m_IsLegacySnapshot = false; } else { ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Unsupported version number {}", m_OuterProjectId, m_OplogId, IndexPath, Header.Version); return; } } else { ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'", m_OuterProjectId, m_OplogId, IndexPath); } } } catch (const std::exception& Ex) { m_OpToPayloadOffsetMap.clear(); m_OpLogPayloads.clear(); m_ChunkMap.clear(); m_MetaMap.clear(); m_FileMap.clear(); m_LogFlushPosition = 0; ZEN_ERROR("oplog '{}/{}': failed reading index snapshot from '{}'. Reason: '{}'", m_OuterProjectId, m_OplogId, IndexPath, Ex.what()); } } } uint32_t ProjectStore::Oplog::GetUnusedSpacePercent() const { RwLock::SharedLockScope OplogLock(m_OplogLock); return GetUnusedSpacePercentLocked(); } uint32_t ProjectStore::Oplog::GetUnusedSpacePercentLocked() const { ZEN_MEMSCOPE(GetProjectstoreTag()); const uint64_t ActualBlobsSize = m_Storage->OpBlobsSize(); if (ActualBlobsSize == 0) { return 0; } const uint64_t EffectiveBlobsSize = m_Storage->GetEffectiveBlobsSize(m_OpLogPayloads); if (EffectiveBlobsSize < ActualBlobsSize) { return gsl::narrow((100 * (ActualBlobsSize - EffectiveBlobsSize)) / ActualBlobsSize); } return 0; } void ProjectStore::Oplog::Compact(bool DryRun, bool RetainLSNs, std::string_view LogPrefix) { RwLock::ExclusiveLockScope Lock(m_OplogLock); Compact(Lock, DryRun, RetainLSNs, LogPrefix); } void ProjectStore::Oplog::Compact(RwLock::ExclusiveLockScope& Lock, bool DryRun, bool RetainLSNs, std::string_view LogPrefix) { ZEN_ASSERT(m_Mode == EMode::kFull); ZEN_MEMSCOPE(GetProjectstoreTag()); Stopwatch Timer; std::vector LatestLSNs; LatestLSNs.reserve(m_OpLogPayloads.size()); for (const auto& Kv : m_OpToPayloadOffsetMap) { const OplogPayload& OpPayload = m_OpLogPayloads[Kv.second]; LatestLSNs.push_back(OpPayload.Lsn); } std::vector OpPayloads; OpPayloads.reserve(LatestLSNs.size()); OidMap OpToPayloadOffsetMap; OpToPayloadOffsetMap.reserve(LatestLSNs.size()); uint64_t PreSize = TotalSize(); m_Storage->Compact( LatestLSNs, [&](const Oid& OpKeyHash, LogSequenceNumber, LogSequenceNumber NewLsn, const OplogEntryAddress& NewAddress) { OpToPayloadOffsetMap.insert({OpKeyHash, PayloadIndex(OpPayloads.size())}); OpPayloads.push_back({.Lsn = NewLsn, .Address = NewAddress}); }, RetainLSNs, /*DryRun*/ DryRun); if (!DryRun) { m_OpToPayloadOffsetMap.swap(OpToPayloadOffsetMap); m_OpLogPayloads.swap(OpPayloads); m_LsnToPayloadOffsetMap.reset(); WriteIndexSnapshot(Lock); } uint64_t PostSize = TotalSize(); uint64_t FreedSize = (PreSize > PostSize) ? (PreSize - PostSize) : 0; ZEN_INFO("{} oplog '{}/{}': Compacted in {}. New size: {}, freeing: {}", LogPrefix, m_OuterProjectId, m_OplogId, NiceTimeSpanMs(Timer.GetElapsedTimeMs()), NiceBytes(PostSize), NiceBytes(FreedSize)); } IoBuffer ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash) { ZEN_ASSERT(m_Mode == EMode::kFull); return m_CidStore.FindChunkByCid(RawHash); } bool ProjectStore::Oplog::IterateChunks(std::span RawHashes, bool IncludeModTag, const std::function& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit) { ZEN_ASSERT(m_Mode == EMode::kFull); return m_CidStore.IterateChunks( RawHashes, [&](size_t Index, const IoBuffer& Payload) { return AsyncCallback(Index, Payload, IncludeModTag ? GetModificationTagFromRawHash(RawHashes[Index]) : 0); }, OptionalWorkerPool, LargeSizeLimit); } bool ProjectStore::Oplog::IterateChunks(const std::filesystem::path& ProjectRootDir, std::span ChunkIds, bool IncludeModTag, const std::function& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit) { ZEN_ASSERT(m_Mode == EMode::kFull); ZEN_MEMSCOPE(GetProjectstoreTag()); std::vector CidChunkIndexes; std::vector CidChunkHashes; std::vector FileChunkIndexes; std::vector FileChunkPaths; { RwLock::SharedLockScope OplogLock(m_OplogLock); for (size_t ChunkIndex = 0; ChunkIndex < ChunkIds.size(); ChunkIndex++) { const Oid& ChunkId = ChunkIds[ChunkIndex]; if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) { CidChunkIndexes.push_back(ChunkIndex); CidChunkHashes.push_back(ChunkIt->second); } else if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) { CidChunkIndexes.push_back(ChunkIndex); CidChunkHashes.push_back(ChunkIt->second); } else if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) { FileChunkIndexes.push_back(ChunkIndex); FileChunkPaths.emplace_back(ProjectRootDir / FileIt->second.ServerPath); } } } if (OptionalWorkerPool) { std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) { if (AbortFlag) { break; } Work.ScheduleWork( *OptionalWorkerPool, [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback]( std::atomic& AbortFlag) { if (AbortFlag) { return; } size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; try { IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); if (!Payload) { ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[FileChunkIndex], FilePath); } if (!AsyncCallback(FileChunkIndex, Payload, IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0)) { AbortFlag.store(true); } } catch (const std::exception& Ex) { ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'", m_OuterProjectId, m_OplogId, FileChunkIndex, FilePath, Ex.what()); } }); } if (!CidChunkHashes.empty() && !AbortFlag) { m_CidStore.IterateChunks( CidChunkHashes, [&](size_t Index, const IoBuffer& Payload) { size_t CidChunkIndex = CidChunkIndexes[Index]; if (AbortFlag) { return false; } return AsyncCallback(CidChunkIndex, Payload, IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0); }, OptionalWorkerPool, LargeSizeLimit); } } catch (const std::exception& Ex) { AbortFlag.store(true); ZEN_WARN("Failed iterating oplog chunks in {}. Reason: '{}'", m_BasePath, Ex.what()); } Work.Wait(); return !AbortFlag; } else { if (!CidChunkHashes.empty()) { m_CidStore.IterateChunks( CidChunkHashes, [&](size_t Index, const IoBuffer& Payload) { size_t CidChunkIndex = CidChunkIndexes[Index]; return AsyncCallback(CidChunkIndex, Payload, IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0); }, OptionalWorkerPool, LargeSizeLimit); } for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) { size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); if (Payload) { bool Result = AsyncCallback(FileChunkIndex, Payload, IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0); if (!Result) { return false; } } } } return true; } IoBuffer ProjectStore::Oplog::FindChunk(const std::filesystem::path& ProjectRootDir, const Oid& ChunkId, uint64_t* OptOutModificationTag) { ZEN_ASSERT(m_Mode == EMode::kFull); RwLock::SharedLockScope OplogLock(m_OplogLock); if (!m_Storage) { return IoBuffer{}; } if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) { IoHash ChunkHash = ChunkIt->second; OplogLock.ReleaseNow(); IoBuffer Result = m_CidStore.FindChunkByCid(ChunkHash); if (Result && OptOutModificationTag != nullptr) { *OptOutModificationTag = GetModificationTagFromRawHash(ChunkHash); } return Result; } if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) { std::filesystem::path FilePath = ProjectRootDir / FileIt->second.ServerPath; OplogLock.ReleaseNow(); IoBuffer Result = IoBufferBuilder::MakeFromFile(FilePath); if (!Result) { ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkId, FilePath); } else if (OptOutModificationTag != nullptr) { *OptOutModificationTag = GetModificationTagFromModificationTime(Result); } return Result; } if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) { IoHash ChunkHash = MetaIt->second; OplogLock.ReleaseNow(); IoBuffer Result = m_CidStore.FindChunkByCid(ChunkHash); if (Result && OptOutModificationTag != nullptr) { *OptOutModificationTag = GetModificationTagFromRawHash(ChunkHash); } return Result; } return {}; } std::vector ProjectStore::Oplog::GetAllChunksInfo(const std::filesystem::path& ProjectRootDir) { ZEN_ASSERT(m_Mode == EMode::kFull); ZEN_MEMSCOPE(GetProjectstoreTag()); // First just capture all the chunk ids std::vector InfoArray; { RwLock::SharedLockScope _(m_OplogLock); if (m_Storage) { const size_t NumEntries = m_FileMap.size() + m_ChunkMap.size(); InfoArray.reserve(NumEntries); for (const auto& Kv : m_FileMap) { InfoArray.push_back({.ChunkId = Kv.first}); } for (const auto& Kv : m_ChunkMap) { InfoArray.push_back({.ChunkId = Kv.first}); } } } for (ChunkInfo& Info : InfoArray) { if (IoBuffer Chunk = FindChunk(ProjectRootDir, Info.ChunkId, nullptr)) { Info.ChunkSize = Chunk.GetSize(); } } return InfoArray; } void ProjectStore::Oplog::IterateChunkMap(std::function&& Fn) { ZEN_ASSERT(m_Mode == EMode::kFull); RwLock::SharedLockScope _(m_OplogLock); if (!m_Storage) { return; } for (const auto& Kv : m_ChunkMap) { Fn(Kv.first, Kv.second); } } void ProjectStore::Oplog::IterateFileMap( std::function&& Fn) { ZEN_ASSERT(m_Mode == EMode::kFull); RwLock::SharedLockScope _(m_OplogLock); if (!m_Storage) { return; } for (const auto& Kv : m_FileMap) { Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath); } } void ProjectStore::Oplog::IterateOplog(std::function&& Handler, const Paging& EntryPaging) { RwLock::SharedLockScope _(m_OplogLock); IterateOplogLocked(std::move(Handler), EntryPaging); } std::vector ProjectStore::Oplog::GetSortedOpPayloadRangeLocked(const Paging& EntryPaging, tsl::robin_map* OutOptionalReverseKeyMap) { std::pair StartAndEnd = GetPagedRange(int32_t(m_OpToPayloadOffsetMap.size()), EntryPaging); if (StartAndEnd.first == StartAndEnd.second) { return {}; } const int32_t ReplayCount = StartAndEnd.second - StartAndEnd.first; auto Start = m_OpToPayloadOffsetMap.cbegin(); std::advance(Start, StartAndEnd.first); auto End = Start; std::advance(End, ReplayCount); std::vector ReplayOrder; ReplayOrder.reserve(ReplayCount); for (auto It = Start; It != End; It++) { const PayloadIndex PayloadOffset = It->second; if (OutOptionalReverseKeyMap != nullptr) { OutOptionalReverseKeyMap->insert_or_assign(PayloadOffset, It->first); } ReplayOrder.push_back(PayloadOffset); } std::sort(ReplayOrder.begin(), ReplayOrder.end(), [this](const PayloadIndex Lhs, const PayloadIndex Rhs) { const OplogEntryAddress& LhsEntry = m_OpLogPayloads[Lhs].Address; const OplogEntryAddress& RhsEntry = m_OpLogPayloads[Rhs].Address; return LhsEntry.Offset < RhsEntry.Offset; }); return ReplayOrder; } void ProjectStore::Oplog::IterateOplogLocked(std::function&& Handler, const Paging& EntryPaging) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::Oplog::IterateOplogLocked"); if (!m_Storage) { return; } std::vector ReplayOrder = GetSortedOpPayloadRangeLocked(EntryPaging, nullptr); if (!ReplayOrder.empty()) { m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber, CbObjectView Op) { Handler(Op); }); } } void ProjectStore::Oplog::IterateOplogWithKey(std::function&& Handler) { IterateOplogWithKey(std::move(Handler), Paging{}); } void ProjectStore::Oplog::IterateOplogWithKey(std::function&& Handler, const Paging& EntryPaging) { ZEN_MEMSCOPE(GetProjectstoreTag()); tsl::robin_map ReverseKeyMap; std::vector ReplayOrder; { RwLock::SharedLockScope _(m_OplogLock); if (m_Storage) { ReplayOrder = GetSortedOpPayloadRangeLocked(EntryPaging, &ReverseKeyMap); if (!ReplayOrder.empty()) { uint32_t EntryIndex = 0; m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, CbObjectView Op) { const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex]; Handler(Lsn, ReverseKeyMap.at(PayloadOffset), Op); EntryIndex++; }); } } } } void ProjectStore::Oplog::IterateOplogWithKeyRaw(std::function&& Handler) { ZEN_MEMSCOPE(GetProjectstoreTag()); tsl::robin_map ReverseKeyMap; std::vector ReplayOrder; { RwLock::SharedLockScope _(m_OplogLock); if (m_Storage) { ReplayOrder = GetSortedOpPayloadRangeLocked({}, &ReverseKeyMap); if (!ReplayOrder.empty()) { uint32_t EntryIndex = 0; m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, const IoBuffer& Buffer) { const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex]; Handler(Lsn, ReverseKeyMap.at(PayloadOffset), Buffer); EntryIndex++; }); } } } } static constexpr uint32_t OplogMetaDataExpectedMagic = 0x6f'74'6d'62; // 'omta'; void ProjectStore::Oplog::GetAttachmentsLocked(std::vector& OutAttachments, bool StoreMetaDataOnDisk) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::Oplog::GetAttachmentsLocked"); if (!m_Storage) { return; } if (StoreMetaDataOnDisk && m_MetaValid) { IoBuffer MetadataPayload = IoBufferBuilder::MakeFromFile(m_MetaPath); if (MetadataPayload) { ZEN_TRACE_CPU("Store::Oplog::GetAttachmentsFromMetaData"); if (GetAttachmentsFromMetaData( MetadataPayload, OplogMetaDataExpectedMagic, [&](std::span Keys, std::span AttachmentCounts, std::span Attachments) { ZEN_UNUSED(Keys); ZEN_UNUSED(AttachmentCounts); OutAttachments.insert(OutAttachments.end(), Attachments.begin(), Attachments.end()); })) { return; } } } std::vector Keys; std::vector AttachmentCounts; size_t AttachmentOffset = OutAttachments.size(); IterateOplogLocked( [&](CbObjectView Op) { using namespace std::literals; size_t CurrentAttachmentCount = OutAttachments.size(); Op.IterateAttachments([&](CbFieldView Visitor) { OutAttachments.emplace_back(Visitor.AsAttachment()); }); if (StoreMetaDataOnDisk) { const Oid KeyHash = ComputeOpKey(Op); Keys.push_back(KeyHash); AttachmentCounts.push_back(gsl::narrow(OutAttachments.size() - CurrentAttachmentCount)); } }, Paging{}); if (StoreMetaDataOnDisk) { const IoHash* FirstAttachment = OutAttachments.data() + AttachmentOffset; size_t AttachmentCount = OutAttachments.size() - AttachmentOffset; IoBuffer MetaPayload = BuildReferenceMetaData(OplogMetaDataExpectedMagic, Keys, AttachmentCounts, std::span(FirstAttachment, AttachmentCount)) .Flatten() .AsIoBuffer(); const std::filesystem::path MetaPath = m_MetaPath; std::error_code Ec; TemporaryFile::SafeWriteFile(MetaPath, MetaPayload.GetView(), Ec); if (Ec) { m_MetaValid = false; ZEN_WARN("oplog '{}/{}': unable to set meta data meta path: '{}'. Reason: '{}'", m_OuterProjectId, m_OplogId, MetaPath, Ec.message()); } else { m_MetaValid = true; } } } size_t ProjectStore::Oplog::GetOplogEntryCount() const { RwLock::SharedLockScope _(m_OplogLock); if (!m_Storage) { return 0; } return m_OpToPayloadOffsetMap.size(); } ProjectStore::LogSequenceNumber ProjectStore::Oplog::GetOpIndexByKey(const Oid& Key) { RwLock::SharedLockScope _(m_OplogLock); if (!m_Storage) { return {}; } if (const auto LatestOp = m_OpToPayloadOffsetMap.find(Key); LatestOp != m_OpToPayloadOffsetMap.end()) { return m_OpLogPayloads[LatestOp->second].Lsn; } return {}; } std::optional ProjectStore::Oplog::GetOpByKey(const Oid& Key) { RwLock::SharedLockScope _(m_OplogLock); if (!m_Storage) { return {}; } if (const auto LatestOp = m_OpToPayloadOffsetMap.find(Key); LatestOp != m_OpToPayloadOffsetMap.end()) { return m_Storage->GetOp(m_OpLogPayloads[LatestOp->second].Address); } return {}; } std::optional ProjectStore::Oplog::GetOpByIndex(ProjectStore::LogSequenceNumber Index) { { RwLock::SharedLockScope _(m_OplogLock); if (!m_Storage) { return {}; } if (m_LsnToPayloadOffsetMap) { if (auto It = m_LsnToPayloadOffsetMap->find(Index); It != m_LsnToPayloadOffsetMap->end()) { return m_Storage->GetOp(m_OpLogPayloads[It->second].Address); } } } RwLock::ExclusiveLockScope Lock(m_OplogLock); if (!m_Storage) { return {}; } RefreshLsnToPayloadOffsetMap(Lock); if (auto It = m_LsnToPayloadOffsetMap->find(Index); It != m_LsnToPayloadOffsetMap->end()) { return m_Storage->GetOp(m_OpLogPayloads[It->second].Address); } return {}; } void ProjectStore::Oplog::CaptureAddedAttachments(std::span AttachmentHashes) { ZEN_MEMSCOPE(GetProjectstoreTag()); m_OplogLock.WithExclusiveLock([this, AttachmentHashes]() { if (m_CapturedAttachments) { m_CapturedAttachments->reserve(m_CapturedAttachments->size() + AttachmentHashes.size()); m_CapturedAttachments->insert(m_CapturedAttachments->end(), AttachmentHashes.begin(), AttachmentHashes.end()); } }); } void ProjectStore::Oplog::EnableUpdateCapture() { ZEN_MEMSCOPE(GetProjectstoreTag()); m_OplogLock.WithExclusiveLock([&]() { if (m_UpdateCaptureRefCounter == 0) { ZEN_ASSERT(!m_CapturedOps); ZEN_ASSERT(!m_CapturedAttachments); m_CapturedOps = std::make_unique>(); m_CapturedAttachments = std::make_unique>(); } else { ZEN_ASSERT(m_CapturedOps); ZEN_ASSERT(m_CapturedAttachments); } m_UpdateCaptureRefCounter++; }); } void ProjectStore::Oplog::DisableUpdateCapture() { ZEN_MEMSCOPE(GetProjectstoreTag()); m_OplogLock.WithExclusiveLock([&]() { ZEN_ASSERT(m_CapturedOps); ZEN_ASSERT(m_CapturedAttachments); ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); m_UpdateCaptureRefCounter--; if (m_UpdateCaptureRefCounter == 0) { m_CapturedOps.reset(); m_CapturedAttachments.reset(); } }); } void ProjectStore::Oplog::IterateCapturedOpsLocked( std::function&& Callback) { ZEN_MEMSCOPE(GetProjectstoreTag()); if (m_CapturedOps) { if (!m_Storage) { return; } for (const Oid& OpKey : *m_CapturedOps) { if (const auto AddressEntryIt = m_OpToPayloadOffsetMap.find(OpKey); AddressEntryIt != m_OpToPayloadOffsetMap.end()) { const OplogPayload& OpPayload = m_OpLogPayloads[AddressEntryIt->second]; Callback(OpKey, OpPayload.Lsn, m_Storage->GetOp(OpPayload.Address)); } } } } std::vector ProjectStore::Oplog::GetCapturedAttachmentsLocked() { ZEN_MEMSCOPE(GetProjectstoreTag()); if (m_CapturedAttachments) { return *m_CapturedAttachments; } return {}; } std::vector ProjectStore::Oplog::CheckPendingChunkReferences(std::span ChunkHashes, const GcClock::Duration& RetainTime) { ZEN_MEMSCOPE(GetProjectstoreTag()); m_OplogLock.WithExclusiveLock([&]() { GcClock::TimePoint Now = GcClock::Now(); if (m_PendingPrepOpAttachmentsRetainEnd < Now) { m_PendingPrepOpAttachments.clear(); } m_PendingPrepOpAttachments.insert(ChunkHashes.begin(), ChunkHashes.end()); GcClock::TimePoint NewEndPoint = Now + RetainTime; if (m_PendingPrepOpAttachmentsRetainEnd < NewEndPoint) { m_PendingPrepOpAttachmentsRetainEnd = NewEndPoint; } }); std::vector MissingChunks; MissingChunks.reserve(ChunkHashes.size()); for (const IoHash& FileHash : ChunkHashes) { if (!m_CidStore.ContainsChunk(FileHash)) { MissingChunks.push_back(FileHash); } } return MissingChunks; } void ProjectStore::Oplog::RemovePendingChunkReferences(std::span ChunkHashes) { ZEN_MEMSCOPE(GetProjectstoreTag()); m_OplogLock.WithExclusiveLock([&]() { GcClock::TimePoint Now = GcClock::Now(); if (m_PendingPrepOpAttachmentsRetainEnd < Now) { m_PendingPrepOpAttachments.clear(); } else { for (const IoHash& Chunk : ChunkHashes) { m_PendingPrepOpAttachments.erase(Chunk); } } }); } std::vector ProjectStore::Oplog::GetPendingChunkReferencesLocked() { ZEN_MEMSCOPE(GetProjectstoreTag()); std::vector Result; Result.reserve(m_PendingPrepOpAttachments.size()); Result.insert(Result.end(), m_PendingPrepOpAttachments.begin(), m_PendingPrepOpAttachments.end()); GcClock::TimePoint Now = GcClock::Now(); if (m_PendingPrepOpAttachmentsRetainEnd < Now) { m_PendingPrepOpAttachments.clear(); } return Result; } void ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, const Oid& FileId, const IoHash& Hash, std::string_view ServerPath, std::string_view ClientPath) { FileMapEntry Entry; if (Hash != IoHash::Zero) { m_ChunkMap.insert_or_assign(FileId, Hash); } else { Entry.ServerPath = ServerPath; } Entry.ClientPath = ClientPath; m_FileMap[FileId] = std::move(Entry); } void ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, const Oid& ChunkId, const IoHash& Hash) { m_ChunkMap.insert_or_assign(ChunkId, Hash); } void ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, const Oid& ChunkId, const IoHash& Hash) { m_MetaMap.insert_or_assign(ChunkId, Hash); } ProjectStore::Oplog::OplogEntryMapping ProjectStore::Oplog::GetMapping(CbObjectView Core) { ZEN_MEMSCOPE(GetProjectstoreTag()); using namespace std::literals; OplogEntryMapping Result; // Update chunk id maps for (CbFieldView Field : Core) { std::string_view FieldName = Field.GetName(); if (FieldName == "package"sv) { CbObjectView PackageObj = Field.AsObjectView(); Oid Id = PackageObj["id"sv].AsObjectId(); IoHash Hash = PackageObj["data"sv].AsBinaryAttachment(); Result.Chunks.emplace_back(ChunkMapping{Id, Hash}); ZEN_DEBUG("oplog {}/{}: package data {} -> {}", m_OuterProjectId, m_OplogId, Id, Hash); continue; } if (FieldName == "bulkdata"sv) { CbArrayView BulkDataArray = Field.AsArrayView(); for (CbFieldView& Entry : BulkDataArray) { CbObjectView BulkObj = Entry.AsObjectView(); Oid Id = BulkObj["id"sv].AsObjectId(); IoHash Hash = BulkObj["data"sv].AsBinaryAttachment(); Result.Chunks.emplace_back(ChunkMapping{Id, Hash}); ZEN_DEBUG("oplog {}/{}: bulkdata {} -> {}", m_OuterProjectId, m_OplogId, Id, Hash); } continue; } if (FieldName == "packagedata"sv) { CbArrayView PackageDataArray = Field.AsArrayView(); for (CbFieldView& Entry : PackageDataArray) { CbObjectView PackageDataObj = Entry.AsObjectView(); Oid Id = PackageDataObj["id"sv].AsObjectId(); IoHash Hash = PackageDataObj["data"sv].AsBinaryAttachment(); Result.Chunks.emplace_back(ChunkMapping{Id, Hash}); ZEN_DEBUG("oplog {}/{}: package {} -> {}", m_OuterProjectId, m_OplogId, Id, Hash); } continue; } if (FieldName == "files"sv) { CbArrayView FilesArray = Field.AsArrayView(); Result.Files.reserve(FilesArray.Num()); for (CbFieldView& Entry : FilesArray) { CbObjectView FileObj = Entry.AsObjectView(); std::string_view ServerPath = FileObj["serverpath"sv].AsString(); std::string_view ClientPath = FileObj["clientpath"sv].AsString(); Oid Id = FileObj["id"sv].AsObjectId(); IoHash Hash = FileObj["data"sv].AsBinaryAttachment(); if (ServerPath.empty() && Hash == IoHash::Zero) { ZEN_WARN("oplog {}/{}: invalid file for entry '{}', missing both 'serverpath' and 'data' fields", m_OuterProjectId, m_OplogId, Id); continue; } if (ClientPath.empty()) { ZEN_WARN("oplog {}/{}: invalid file for entry '{}', missing 'clientpath' field", m_OuterProjectId, m_OplogId, Id); continue; } Result.Files.emplace_back(FileMapping{Id, Hash, std::string(ServerPath), std::string(ClientPath)}); ZEN_DEBUG("oplog {}/{}: file {} -> {}, ServerPath: {}, ClientPath: {}", m_OuterProjectId, m_OplogId, Id, Hash, ServerPath, ClientPath); } continue; } if (FieldName == "meta"sv) { CbArrayView MetaArray = Field.AsArrayView(); Result.Meta.reserve(MetaArray.Num()); for (CbFieldView& Entry : MetaArray) { CbObjectView MetaObj = Entry.AsObjectView(); Oid Id = MetaObj["id"sv].AsObjectId(); IoHash Hash = MetaObj["data"sv].AsBinaryAttachment(); Result.Meta.emplace_back(ChunkMapping{Id, Hash}); auto NameString = MetaObj["name"sv].AsString(); ZEN_DEBUG("oplog {}/{}: meta data ({}) {} -> {}", m_OuterProjectId, m_OplogId, NameString, Id, Hash); } continue; } } return Result; } ProjectStore::LogSequenceNumber ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, const OplogEntryMapping& OpMapping, const OplogEntry& OpEntry) { ZEN_MEMSCOPE(GetProjectstoreTag()); // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however using namespace std::literals; // Update chunk id maps for (const ChunkMapping& Chunk : OpMapping.Chunks) { AddChunkMapping(OplogLock, Chunk.Id, Chunk.Hash); } for (const FileMapping& File : OpMapping.Files) { AddFileMapping(OplogLock, File.Id, File.Hash, File.ServerPath, File.ClientPath); } for (const ChunkMapping& Meta : OpMapping.Meta) { AddMetaMapping(OplogLock, Meta.Id, Meta.Hash); } const PayloadIndex PayloadOffset(m_OpLogPayloads.size()); m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset); m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress}); if (m_LsnToPayloadOffsetMap) { m_LsnToPayloadOffsetMap->insert_or_assign(OpEntry.OpLsn, PayloadOffset); } return OpEntry.OpLsn; } ProjectStore::LogSequenceNumber ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) { ZEN_ASSERT(m_Mode == EMode::kFull); ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntry"); const CbObject& Core = OpPackage.GetObject(); const ProjectStore::LogSequenceNumber EntryId = AppendNewOplogEntry(Core); if (!EntryId) { // The oplog has been deleted so just drop this return EntryId; } // Persist attachments after oplog entry so GC won't find attachments without references uint64_t AttachmentBytes = 0; uint64_t NewAttachmentBytes = 0; auto Attachments = OpPackage.GetAttachments(); if (!Attachments.empty()) { std::vector WriteAttachmentBuffers; std::vector WriteRawHashes; std::vector WriteRawSizes; WriteAttachmentBuffers.reserve(Attachments.size()); WriteRawHashes.reserve(Attachments.size()); WriteRawSizes.reserve(Attachments.size()); for (const auto& Attach : Attachments) { ZEN_ASSERT(Attach.IsCompressedBinary()); const CompressedBuffer& AttachmentData = Attach.AsCompressedBinary(); const uint64_t AttachmentSize = AttachmentData.DecodeRawSize(); WriteAttachmentBuffers.push_back(AttachmentData.GetCompressed().Flatten().AsIoBuffer()); WriteRawHashes.push_back(Attach.GetHash()); WriteRawSizes.push_back(AttachmentSize); AttachmentBytes += AttachmentSize; } std::vector InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (size_t Index = 0; Index < InsertResults.size(); Index++) { if (InsertResults[Index].New) { NewAttachmentBytes += WriteRawSizes[Index]; } } } ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId.Number, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes)); return EntryId; } RefPtr ProjectStore::Oplog::GetStorage() { ZEN_MEMSCOPE(GetProjectstoreTag()); RefPtr Storage; { RwLock::SharedLockScope _(m_OplogLock); Storage = m_Storage; } return Storage; } ProjectStore::LogSequenceNumber ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core) { ZEN_ASSERT(m_Mode == EMode::kFull); ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntry"); using namespace std::literals; RefPtr Storage = GetStorage(); if (!Storage) { return {}; } OplogEntryMapping Mapping = GetMapping(Core); OplogStorage::AppendOpData OpData = OplogStorage::GetAppendOpData(Core); const OplogEntry OpEntry = Storage->AppendOp(OpData); RwLock::ExclusiveLockScope OplogLock(m_OplogLock); const ProjectStore::LogSequenceNumber EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry); if (m_CapturedOps) { m_CapturedOps->push_back(OpData.KeyHash); } m_MetaValid = false; return EntryId; } std::vector ProjectStore::Oplog::AppendNewOplogEntries(std::span Cores) { ZEN_ASSERT(m_Mode == EMode::kFull); ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntries"); using namespace std::literals; RefPtr Storage = GetStorage(); if (!Storage) { return std::vector(Cores.size(), LogSequenceNumber{}); } size_t OpCount = Cores.size(); std::vector Mappings; std::vector OpDatas; Mappings.resize(OpCount); OpDatas.resize(OpCount); for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) { const CbObjectView& Core = Cores[OpIndex]; OpDatas[OpIndex] = OplogStorage::GetAppendOpData(Core); Mappings[OpIndex] = GetMapping(Core); } std::vector OpEntries = Storage->AppendOps(OpDatas); std::vector EntryIds; EntryIds.resize(OpCount); { { RwLock::ExclusiveLockScope OplogLock(m_OplogLock); if (m_CapturedOps) { m_CapturedOps->reserve(m_CapturedOps->size() + OpCount); } for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) { EntryIds[OpIndex] = RegisterOplogEntry(OplogLock, Mappings[OpIndex], OpEntries[OpIndex]); if (m_CapturedOps) { m_CapturedOps->push_back(OpDatas[OpIndex].KeyHash); } } } m_MetaValid = false; } return EntryIds; } ////////////////////////////////////////////////////////////////////////// ProjectStore::Project::Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath) : m_ProjectStore(PrjStore) , m_CidStore(Store) , m_OplogStoragePath(BasePath) , m_LastAccessTimes({std::make_pair(std::string(), GcClock::TickCount())}) { } ProjectStore::Project::~Project() { // Only write access times if we have not been explicitly deleted if (!m_OplogStoragePath.empty()) { WriteAccessTimes(); } } bool ProjectStore::Project::Exists(const std::filesystem::path& BasePath) { return IsFile(BasePath / "Project.zcb"); } void ProjectStore::Project::Read() { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Project::Read"); using namespace std::literals; std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv; ZEN_DEBUG("project '{}': reading config from '{}'", Identifier, ProjectStateFilePath); BasicFile Blob; Blob.Open(ProjectStateFilePath, BasicFile::Mode::kRead); IoBuffer Obj = Blob.ReadAll(); CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); if (ValidationError == CbValidateError::None) { CbObject Cfg = LoadCompactBinaryObject(Obj); Identifier = std::filesystem::path(Cfg["id"sv].AsU8String()).string(); RootDir = std::filesystem::path(Cfg["root"sv].AsU8String()).string(); ProjectRootDir = std::filesystem::path(Cfg["project"sv].AsU8String()).string(); EngineRootDir = std::filesystem::path(Cfg["engine"sv].AsU8String()).string(); ProjectFilePath = std::filesystem::path(Cfg["projectfile"sv].AsU8String()).string(); } else { ZEN_ERROR("validation error {} hit for '{}'", ToString(ValidationError), ProjectStateFilePath); } ReadAccessTimes(); } void ProjectStore::Project::Write() { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Project::Write"); using namespace std::literals; BinaryWriter Mem; CbObjectWriter Cfg; Cfg << "id"sv << Identifier; Cfg << "root"sv << PathToUtf8(RootDir); Cfg << "project"sv << PathToUtf8(ProjectRootDir); Cfg << "engine"sv << PathToUtf8(EngineRootDir); Cfg << "projectfile"sv << PathToUtf8(ProjectFilePath); Cfg.Save(Mem); CreateDirectories(m_OplogStoragePath); std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv; ZEN_INFO("project '{}': persisting config to '{}'", Identifier, ProjectStateFilePath); TemporaryFile::SafeWriteFile(ProjectStateFilePath, Mem.GetView()); } void ProjectStore::Project::ReadAccessTimes() { ZEN_MEMSCOPE(GetProjectstoreTag()); using namespace std::literals; std::filesystem::path ProjectAccessTimesFilePath = m_OplogStoragePath / "AccessTimes.zcb"sv; if (!IsFile(ProjectAccessTimesFilePath)) { return; } ZEN_DEBUG("project '{}': reading access times '{}'", Identifier, ProjectAccessTimesFilePath); BasicFile Blob; Blob.Open(ProjectAccessTimesFilePath, BasicFile::Mode::kRead); IoBuffer Obj = Blob.ReadAll(); CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); if (ValidationError == CbValidateError::None) { CbObject Reader = LoadCompactBinaryObject(Obj); uint64_t Count = Reader["count"sv].AsUInt64(0); if (Count > 0) { std::vector Ticks; Ticks.reserve(Count); CbArrayView TicksArray = Reader["ticks"sv].AsArrayView(); for (CbFieldView& TickView : TicksArray) { Ticks.emplace_back(TickView.AsUInt64()); } CbArrayView IdArray = Reader["ids"sv].AsArrayView(); uint64_t Index = 0; for (CbFieldView& IdView : IdArray) { std::string_view Id = IdView.AsString(); m_LastAccessTimes.insert_or_assign(std::string(Id), Ticks[Index++]); } } ////// Legacy format read { CbArrayView LastAccessTimes = Reader["lastaccess"sv].AsArrayView(); for (CbFieldView& Entry : LastAccessTimes) { CbObjectView AccessTime = Entry.AsObjectView(); std::string_view Id = AccessTime["id"sv].AsString(); GcClock::Tick AccessTick = AccessTime["tick"sv].AsUInt64(); m_LastAccessTimes.insert_or_assign(std::string(Id), AccessTick); } } } else { ZEN_WARN("project '{}': validation error {} hit for '{}'", Identifier, ToString(ValidationError), ProjectAccessTimesFilePath); } } void ProjectStore::Project::WriteAccessTimes() { ZEN_MEMSCOPE(GetProjectstoreTag()); using namespace std::literals; CbObjectWriter Writer(32 + (m_LastAccessTimes.size() * 16)); { RwLock::SharedLockScope _(m_LastAccessTimesLock); Writer.AddInteger("count", gsl::narrow(m_LastAccessTimes.size())); Writer.BeginArray("ids"); for (const auto& It : m_LastAccessTimes) { Writer << It.first; } Writer.EndArray(); Writer.BeginArray("ticks"); for (const auto& It : m_LastAccessTimes) { Writer << gsl::narrow(It.second); } Writer.EndArray(); } CbObject Data = Writer.Save(); try { CreateDirectories(m_OplogStoragePath); std::filesystem::path ProjectAccessTimesFilePath = m_OplogStoragePath / "AccessTimes.zcb"sv; ZEN_DEBUG("project '{}': persisting access times for '{}'", Identifier, ProjectAccessTimesFilePath); TemporaryFile::SafeWriteFile(ProjectAccessTimesFilePath, Data.GetView()); } catch (const std::exception& Err) { ZEN_WARN("project '{}': writing access times FAILED, reason: '{}'", Identifier, Err.what()); } } LoggerRef ProjectStore::Project::Log() const { return m_ProjectStore->Log(); } std::filesystem::path ProjectStore::Project::BasePathForOplog(std::string_view OplogId) const { return m_OplogStoragePath / OplogId; } Ref ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath) { ZEN_MEMSCOPE(GetProjectstoreTag()); RwLock::ExclusiveLockScope _(m_ProjectLock); std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); try { Stopwatch Timer; Ref NewLog(new Oplog(Log(), Identifier, OplogId, m_CidStore, OplogBasePath, MarkerPath, Oplog::EMode::kFull)); m_Oplogs.insert_or_assign(std::string{OplogId}, NewLog); NewLog->Write(); ZEN_INFO("oplog '{}/{}': created oplog at '{}' in {}", Identifier, OplogId, OplogBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); if (m_CapturedOplogs) { m_CapturedOplogs->push_back(std::string(OplogId)); } return NewLog; } catch (const std::exception&) { // In case of failure we need to ensure there's no half constructed entry around // // (This is probably already ensured by the try_emplace implementation?) m_Oplogs.erase(std::string{OplogId}); return {}; } } Ref ProjectStore::Project::ReadOplog(std::string_view OplogId) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OpenOplog"); std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); RwLock::SharedLockScope Lock(m_ProjectLock); if (auto It = m_Oplogs.find(std::string{OplogId}); It != m_Oplogs.end()) { if (Oplog::ExistsAt(OplogBasePath)) { return It->second; } else { return {}; } } if (Oplog::ExistsAt(OplogBasePath)) { Stopwatch Timer; Ref ExistingLog(new Oplog(m_ProjectStore->Log(), Identifier, OplogId, m_CidStore, OplogBasePath, std::filesystem::path{}, Oplog::EMode::kBasicReadOnly)); ExistingLog->Read(); Lock.ReleaseNow(); ZEN_INFO("oplog '{}/{}': read oplog at '{}' in {}", Identifier, OplogId, OplogBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); return ExistingLog; } return {}; } Ref ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bool VerifyPathOnDisk) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OpenOplog"); { RwLock::SharedLockScope ProjectLock(m_ProjectLock); auto OplogIt = m_Oplogs.find(std::string(OplogId)); if (OplogIt != m_Oplogs.end()) { bool ReOpen = false; if (VerifyPathOnDisk) { std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); if (!Oplog::ExistsAt(OplogBasePath)) { // Somebody deleted the oplog on disk behind our back ProjectLock.ReleaseNow(); std::filesystem::path DeletePath; if (!RemoveOplog(OplogId, DeletePath)) { ZEN_WARN("Failed to clean up deleted oplog {}/{}", Identifier, OplogId, OplogBasePath); } ReOpen = true; } } if (!ReOpen) { return OplogIt->second; } } } std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); RwLock::ExclusiveLockScope Lock(m_ProjectLock); if (auto It = m_Oplogs.find(std::string{OplogId}); It != m_Oplogs.end()) { return It->second; } if (Oplog::ExistsAt(OplogBasePath)) { try { Stopwatch Timer; Ref ExistingLog(new Oplog(m_ProjectStore->Log(), Identifier, OplogId, m_CidStore, OplogBasePath, std::filesystem::path{}, Oplog::EMode::kFull)); m_Oplogs.insert_or_assign(std::string{OplogId}, ExistingLog); ExistingLog->Read(); Lock.ReleaseNow(); ZEN_INFO("oplog '{}/{}': opened oplog at '{}' in {}", Identifier, OplogId, OplogBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); if (AllowCompact) { const uint32_t CompactUnusedThreshold = 50; ExistingLog->CompactIfUnusedExceeds(/*DryRun*/ false, CompactUnusedThreshold, fmt::format("Compact on initial open of oplog {}/{}: ", Identifier, OplogId)); } return ExistingLog; } catch (const std::exception& Ex) { ZEN_WARN("oplog '{}/{}': failed to open oplog at '{}': {}", Identifier, OplogId, OplogBasePath, Ex.what()); m_Oplogs.erase(std::string{OplogId}); } } return {}; } void ProjectStore::Oplog::CompactIfUnusedExceeds(bool DryRun, uint32_t CompactUnusedThreshold, std::string_view LogPrefix) { ZEN_MEMSCOPE(GetProjectstoreTag()); RwLock::ExclusiveLockScope OplogLock(m_OplogLock); if (!m_Storage) { return; } uint32_t UnusedPercent = GetUnusedSpacePercentLocked(); if (UnusedPercent >= CompactUnusedThreshold) { Compact(OplogLock, DryRun, /*RetainLSNs*/ m_Storage->MaxLSN() <= 0xff000000ul, // If we have less than 16 miln entries left of our LSN range, allow renumbering of LSNs LogPrefix); } } bool ProjectStore::Project::TryUnloadOplog(std::string_view OplogId) { ZEN_MEMSCOPE(GetProjectstoreTag()); RwLock::ExclusiveLockScope _(m_ProjectLock); if (auto OplogIt = m_Oplogs.find(std::string(OplogId)); OplogIt != m_Oplogs.end()) { Ref& Oplog = OplogIt->second; if (Oplog->CanUnload()) { m_Oplogs.erase(OplogIt); return true; } return false; } return false; } bool ProjectStore::Project::RemoveOplog(std::string_view OplogId, std::filesystem::path& OutDeletePath) { ZEN_MEMSCOPE(GetProjectstoreTag()); { RwLock::ExclusiveLockScope _(m_ProjectLock); if (auto OplogIt = m_Oplogs.find(std::string(OplogId)); OplogIt == m_Oplogs.end()) { std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); if (Oplog::ExistsAt(OplogBasePath)) { if (!PrepareDirectoryDelete(OplogBasePath, OutDeletePath)) { return false; } } } else { Ref& Oplog = OplogIt->second; if (!Oplog->PrepareForDelete(OutDeletePath)) { return false; } m_Oplogs.erase(OplogIt); } } m_LastAccessTimesLock.WithExclusiveLock([&]() { m_LastAccessTimes.erase(std::string(OplogId)); }); return true; } bool ProjectStore::Project::DeleteOplog(std::string_view OplogId) { ZEN_MEMSCOPE(GetProjectstoreTag()); std::filesystem::path DeletePath; if (!RemoveOplog(OplogId, DeletePath)) { return false; } // Erase content on disk if (!DeletePath.empty()) { if (!OplogStorage::Delete(DeletePath)) { ZEN_WARN("oplog '{}/{}': failed to remove old oplog path '{}'", Identifier, OplogId, DeletePath); return false; } } return true; } std::vector ProjectStore::Project::ScanForOplogs() const { ZEN_MEMSCOPE(GetProjectstoreTag()); RwLock::SharedLockScope _(m_ProjectLock); std::vector Oplogs; if (Project::Exists(m_OplogStoragePath)) { DirectoryContent DirContent; GetDirectoryContent(m_OplogStoragePath, DirectoryContentFlags::IncludeDirs, DirContent); Oplogs.reserve(DirContent.Directories.size()); for (const std::filesystem::path& DirPath : DirContent.Directories) { std::string DirName = PathToUtf8(DirPath.filename()); if (DirName.starts_with("[dropped]")) { continue; } if (Oplog::ExistsAt(DirPath)) { Oplogs.push_back(DirPath.filename().string()); } } } return Oplogs; } void ProjectStore::Project::IterateOplogs(std::function&& Fn) const { ZEN_MEMSCOPE(GetProjectstoreTag()); RwLock::SharedLockScope Lock(m_ProjectLock); for (auto& Kv : m_Oplogs) { Fn(Lock, *Kv.second); } } void ProjectStore::Project::IterateOplogs(std::function&& Fn) { ZEN_MEMSCOPE(GetProjectstoreTag()); RwLock::SharedLockScope Lock(m_ProjectLock); for (auto& Kv : m_Oplogs) { Fn(Lock, *Kv.second); } } void ProjectStore::Project::Flush() { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Project::Flush"); // We only need to flush oplogs that we have already loaded IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) { Ops.Flush(); }); WriteAccessTimes(); } void ProjectStore::Project::Scrub(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_INFO("scrubbing '{}'", m_OplogStoragePath); // Scrubbing needs to check all existing oplogs std::vector OpLogs = ScanForOplogs(); RwLock::SharedLockScope _(m_ProjectLock); std::atomic Abort; std::atomic Pause; ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog); try { for (const std::string& OpLogId : OpLogs) { Ref OpLog; { if (auto OpIt = m_Oplogs.find(OpLogId); OpIt != m_Oplogs.end()) { OpLog = OpIt->second; } else { std::filesystem::path OplogBasePath = BasePathForOplog(OpLogId); if (ProjectStore::Oplog::ExistsAt(OplogBasePath)) { OpLog = new ProjectStore::Oplog( Log(), Identifier, OpLogId, m_CidStore, OplogBasePath, std::filesystem::path{}, ProjectStore::Oplog::EMode::kFull); // We need it to be a full read so we can write a new index snapshot OpLog->Read(); } } } if (OpLog) { Work.ScheduleWork(Ctx.ThreadPool(), [OpLog, &Ctx](std::atomic& AbortFlag) { if (!AbortFlag) { OpLog->Scrub(Ctx); } }); } } Work.Wait(); } catch (const ScrubDeadlineExpiredException&) { ZEN_INFO("Scrubbing deadline expired, operation incomplete"); Abort = true; Work.Wait(); } catch (const std::exception& Ex) { ZEN_WARN("Scrubbing failed: '{}'", Ex.what()); Abort = true; Work.Wait(); } } uint64_t ProjectStore::Project::TotalSize(const std::filesystem::path& BasePath) { ZEN_MEMSCOPE(GetProjectstoreTag()); using namespace std::literals; uint64_t Size = 0; std::filesystem::path AccessTimesFilePath = BasePath / "AccessTimes.zcb"sv; if (IsFile(AccessTimesFilePath)) { Size += FileSizeFromPath(AccessTimesFilePath); } std::filesystem::path ProjectFilePath = BasePath / "Project.zcb"sv; if (IsFile(ProjectFilePath)) { Size += FileSizeFromPath(ProjectFilePath); } return Size; } uint64_t ProjectStore::Project::TotalSize() const { ZEN_MEMSCOPE(GetProjectstoreTag()); uint64_t Result = TotalSize(m_OplogStoragePath); { std::vector OpLogs = ScanForOplogs(); for (const std::string& OpLogId : OpLogs) { std::filesystem::path OplogBasePath = BasePathForOplog(OpLogId); Result += Oplog::TotalSize(OplogBasePath); } } return Result; } bool ProjectStore::Project::PrepareForDelete(std::filesystem::path& OutDeletePath) { ZEN_MEMSCOPE(GetProjectstoreTag()); RwLock::ExclusiveLockScope _(m_ProjectLock); for (auto& It : m_Oplogs) { It.second->ResetState(); } m_Oplogs.clear(); bool Success = PrepareDirectoryDelete(m_OplogStoragePath, OutDeletePath); if (!Success) { return false; } m_OplogStoragePath.clear(); return true; } void ProjectStore::Project::EnableUpdateCapture() { ZEN_MEMSCOPE(GetProjectstoreTag()); m_ProjectLock.WithExclusiveLock([&]() { if (m_UpdateCaptureRefCounter == 0) { ZEN_ASSERT(!m_CapturedOplogs); m_CapturedOplogs = std::make_unique>(); } else { ZEN_ASSERT(m_CapturedOplogs); } m_UpdateCaptureRefCounter++; }); } void ProjectStore::Project::DisableUpdateCapture() { ZEN_MEMSCOPE(GetProjectstoreTag()); m_ProjectLock.WithExclusiveLock([&]() { ZEN_ASSERT(m_CapturedOplogs); ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); m_UpdateCaptureRefCounter--; if (m_UpdateCaptureRefCounter == 0) { m_CapturedOplogs.reset(); } }); } std::vector ProjectStore::Project::GetCapturedOplogsLocked() { ZEN_MEMSCOPE(GetProjectstoreTag()); if (m_CapturedOplogs) { return *m_CapturedOplogs; } return {}; } std::vector ProjectStore::Project::GetGcReferencerLocks() { ZEN_MEMSCOPE(GetProjectstoreTag()); std::vector Locks; Locks.emplace_back(RwLock::SharedLockScope(m_ProjectLock)); Locks.reserve(1 + m_Oplogs.size()); for (auto& Kv : m_Oplogs) { Locks.emplace_back(Kv.second->GetGcReferencerLock()); } return Locks; } bool ProjectStore::Project::IsExpired(const std::string& EntryName, const std::filesystem::path& MarkerPath, const GcClock::TimePoint ExpireTime) const { ZEN_MEMSCOPE(GetProjectstoreTag()); if (!MarkerPath.empty()) { std::error_code Ec; if (IsFile(MarkerPath, Ec)) { if (Ec) { ZEN_WARN("{} '{}{}{}', Failed to check expiry via marker file '{}', assuming not expired", EntryName.empty() ? "project"sv : "oplog"sv, Identifier, EntryName.empty() ? ""sv : "/"sv, EntryName, MarkerPath.string()); return false; } return false; } } const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); RwLock::SharedLockScope _(m_LastAccessTimesLock); if (auto It = m_LastAccessTimes.find(EntryName); It != m_LastAccessTimes.end()) { if (It->second <= ExpireTicks) { return true; } } return false; } bool ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime) const { return IsExpired(std::string(), ProjectFilePath, ExpireTime); } bool ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog) const { return IsExpired(Oplog.OplogId(), Oplog.MarkerPath(), ExpireTime); } bool ProjectStore::Project::IsOplogTouchedSince(const GcClock::TimePoint TouchTime, std::string_view Oplog) const { const GcClock::Tick TouchTicks = TouchTime.time_since_epoch().count(); RwLock::ExclusiveLockScope _(m_LastAccessTimesLock); if (auto It = m_LastAccessTimes.find(std::string(Oplog)); It != m_LastAccessTimes.end()) { if (It->second > TouchTicks) { return true; } } return false; } bool ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime, std::string_view OplogId) const { ZEN_MEMSCOPE(GetProjectstoreTag()); using namespace std::literals; { RwLock::SharedLockScope Lock(m_ProjectLock); auto OplogIt = m_Oplogs.find(std::string(OplogId)); if (OplogIt != m_Oplogs.end()) { Lock.ReleaseNow(); return IsExpired(ExpireTime, *OplogIt->second); } } std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); std::optional OplogConfig = Oplog::ReadStateFile(OplogBasePath, [this]() { return Log(); }); std::filesystem::path MarkerPath = OplogConfig.has_value() ? OplogConfig.value()["gcpath"sv].AsU8String() : std::u8string(); return IsExpired(std::string(OplogId), MarkerPath, ExpireTime); } void ProjectStore::Project::TouchProject() { ZEN_MEMSCOPE(GetProjectstoreTag()); RwLock::ExclusiveLockScope _(m_LastAccessTimesLock); m_LastAccessTimes.insert_or_assign(std::string(), GcClock::TickCount()); } void ProjectStore::Project::TouchOplog(std::string_view Oplog) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_ASSERT(!Oplog.empty()); RwLock::ExclusiveLockScope _(m_LastAccessTimesLock); m_LastAccessTimes.insert_or_assign(std::string(Oplog), GcClock::TickCount()); } GcClock::TimePoint ProjectStore::Project::LastOplogAccessTime(std::string_view Oplog) const { RwLock::SharedLockScope _(m_LastAccessTimesLock); if (auto It = m_LastAccessTimes.find(std::string(Oplog)); It != m_LastAccessTimes.end()) { return GcClock::TimePointFromTick(It->second); } return GcClock::TimePoint::min(); } ////////////////////////////////////////////////////////////////////////// ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, const Configuration& Config) : m_Log(logging::Get("project")) , m_Gc(Gc) , m_CidStore(Store) , m_ProjectBasePath(BasePath) , m_Config(Config) , m_DiskWriteBlocker(Gc.GetDiskWriteBlocker()) { ZEN_INFO("initializing project store at '{}'", m_ProjectBasePath); // m_Log.set_level(spdlog::level::debug); m_Gc.AddGcStorage(this); m_Gc.AddGcReferencer(*this); m_Gc.AddGcReferenceLocker(*this); } ProjectStore::~ProjectStore() { ZEN_INFO("closing project store at '{}'", m_ProjectBasePath); m_Gc.RemoveGcReferenceLocker(*this); m_Gc.RemoveGcReferencer(*this); m_Gc.RemoveGcStorage(this); } std::filesystem::path ProjectStore::BasePathForProject(std::string_view ProjectId) { return m_ProjectBasePath / ProjectId; } void ProjectStore::DiscoverProjects() { ZEN_MEMSCOPE(GetProjectstoreTag()); if (!IsDir(m_ProjectBasePath)) { return; } DirectoryContent DirContent; GetDirectoryContent(m_ProjectBasePath, DirectoryContentFlags::IncludeDirs, DirContent); for (const std::filesystem::path& DirPath : DirContent.Directories) { std::string DirName = PathToUtf8(DirPath.filename()); if (DirName.starts_with("[dropped]")) { continue; } OpenProject(DirName); } } void ProjectStore::IterateProjects(std::function&& Fn) { RwLock::SharedLockScope _(m_ProjectsLock); for (auto& Kv : m_Projects) { Fn(*Kv.second.Get()); } } void ProjectStore::Flush() { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::Flush"); ZEN_INFO("flushing project store at '{}'", m_ProjectBasePath); std::vector> Projects; { RwLock::SharedLockScope _(m_ProjectsLock); Projects.reserve(m_Projects.size()); for (auto& Kv : m_Projects) { Projects.push_back(Kv.second); } } WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); std::atomic AbortFlag; std::atomic PauseFlag; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { for (const Ref& Project : Projects) { Work.ScheduleWork( WorkerPool, [this, Project](std::atomic&) { try { Project->Flush(); } catch (const std::exception& Ex) { ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what()); } }, 0); } } catch (const std::exception& Ex) { AbortFlag.store(true); ZEN_WARN("Failed projects in {}. Reason: '{}'", m_ProjectBasePath, Ex.what()); } Work.Wait(); } void ProjectStore::ScrubStorage(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_INFO("scrubbing '{}'", m_ProjectBasePath); DiscoverProjects(); std::vector> Projects; { RwLock::SharedLockScope Lock(m_ProjectsLock); Projects.reserve(m_Projects.size()); for (auto& Kv : m_Projects) { if (Kv.second->IsExpired(GcClock::TimePoint::min())) { continue; } Projects.push_back(Kv.second); } } for (const Ref& Project : Projects) { Project->Scrub(Ctx); } } GcStorageSize ProjectStore::StorageSize() const { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::StorageSize"); using namespace std::literals; GcStorageSize Result; { if (IsDir(m_ProjectBasePath)) { DirectoryContent ProjectsFolderContent; GetDirectoryContent(m_ProjectBasePath, DirectoryContentFlags::IncludeDirs, ProjectsFolderContent); for (const std::filesystem::path& ProjectBasePath : ProjectsFolderContent.Directories) { std::filesystem::path ProjectStateFilePath = ProjectBasePath / "Project.zcb"sv; if (IsFile(ProjectStateFilePath)) { Result.DiskSize += Project::TotalSize(ProjectBasePath); DirectoryContent DirContent; GetDirectoryContent(ProjectBasePath, DirectoryContentFlags::IncludeDirs, DirContent); for (const std::filesystem::path& OplogBasePath : DirContent.Directories) { Result.DiskSize += Oplog::TotalSize(OplogBasePath); } } } } } return Result; } Ref ProjectStore::OpenProject(std::string_view ProjectId) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OpenProject"); { RwLock::SharedLockScope _(m_ProjectsLock); if (auto ProjIt = m_Projects.find(std::string{ProjectId}); ProjIt != m_Projects.end()) { return ProjIt->second; } } RwLock::ExclusiveLockScope _(m_ProjectsLock); if (auto ProjIt = m_Projects.find(std::string{ProjectId}); ProjIt != m_Projects.end()) { return ProjIt->second; } std::filesystem::path BasePath = BasePathForProject(ProjectId); if (Project::Exists(BasePath)) { try { ZEN_INFO("project '{}': opening project at '{}'", ProjectId, BasePath); Ref& Prj = m_Projects .try_emplace(std::string{ProjectId}, Ref(new ProjectStore::Project(this, m_CidStore, BasePath))) .first->second; Prj->Identifier = ProjectId; Prj->Read(); return Prj; } catch (const std::exception& e) { ZEN_WARN("project '{}': failed to open at {} ({})", ProjectId, BasePath, e.what()); m_Projects.erase(std::string{ProjectId}); } } return {}; } Ref ProjectStore::NewProject(const std::filesystem::path& BasePath, std::string_view ProjectId, const std::filesystem::path& RootDir, const std::filesystem::path& EngineRootDir, const std::filesystem::path& ProjectRootDir, const std::filesystem::path& ProjectFilePath) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::NewProject"); RwLock::ExclusiveLockScope _(m_ProjectsLock); ZEN_INFO("project '{}': creating project at '{}'", ProjectId, BasePath); Ref& Prj = m_Projects.try_emplace(std::string{ProjectId}, Ref(new ProjectStore::Project(this, m_CidStore, BasePath))) .first->second; Prj->Identifier = ProjectId; Prj->RootDir = RootDir; Prj->EngineRootDir = EngineRootDir; Prj->ProjectRootDir = ProjectRootDir; Prj->ProjectFilePath = ProjectFilePath; Prj->Write(); if (m_CapturedProjects) { m_CapturedProjects->push_back(std::string(ProjectId)); } return Prj; } bool ProjectStore::UpdateProject(std::string_view ProjectId, const std::filesystem::path& RootDir, const std::filesystem::path& EngineRootDir, const std::filesystem::path& ProjectRootDir, const std::filesystem::path& ProjectFilePath) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::UpdateProject"); RwLock::ExclusiveLockScope ProjectsLock(m_ProjectsLock); auto ProjIt = m_Projects.find(std::string{ProjectId}); if (ProjIt == m_Projects.end()) { return false; } Ref Prj = ProjIt->second; Prj->RootDir = RootDir; Prj->EngineRootDir = EngineRootDir; Prj->ProjectRootDir = ProjectRootDir; Prj->ProjectFilePath = ProjectFilePath; Prj->Write(); ZEN_INFO("project '{}': updated", ProjectId); return true; } bool ProjectStore::RemoveProject(std::string_view ProjectId, std::filesystem::path& OutDeletePath) { ZEN_MEMSCOPE(GetProjectstoreTag()); RwLock::ExclusiveLockScope ProjectsLock(m_ProjectsLock); auto ProjIt = m_Projects.find(std::string{ProjectId}); if (ProjIt == m_Projects.end()) { return true; } bool Success = ProjIt->second->PrepareForDelete(OutDeletePath); if (!Success) { return false; } m_Projects.erase(ProjIt); return true; } bool ProjectStore::DeleteProject(std::string_view ProjectId) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::DeleteProject"); ZEN_INFO("project '{}': deleting", ProjectId); std::filesystem::path DeletePath; if (!RemoveProject(ProjectId, DeletePath)) { return false; } if (!DeletePath.empty()) { if (!DeleteDirectories(DeletePath)) { ZEN_WARN("project '{}': failed to remove old project path '{}'", ProjectId, DeletePath); return false; } } return true; } bool ProjectStore::Exists(std::string_view ProjectId) { return Project::Exists(BasePathForProject(ProjectId)); } CbArray ProjectStore::GetProjectsList() { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::GetProjectsList"); using namespace std::literals; DiscoverProjects(); CbWriter Response; Response.BeginArray(); IterateProjects([&Response](Project& Prj) { Response.BeginObject(); Response << "Id"sv << Prj.Identifier; Response << "RootDir"sv << Prj.RootDir.string(); Response << "ProjectRootDir"sv << PathToUtf8(Prj.ProjectRootDir); Response << "EngineRootDir"sv << PathToUtf8(Prj.EngineRootDir); Response << "ProjectFilePath"sv << PathToUtf8(Prj.ProjectFilePath); Response.EndObject(); }); Response.EndArray(); return Response.Save().AsArray(); } CbObject ProjectStore::GetProjectFiles(LoggerRef InLog, Project& Project, Oplog& Oplog, const std::unordered_set& WantedFieldNames) { auto Log = [&InLog]() { return InLog; }; using namespace std::literals; const bool WantsAllFields = WantedFieldNames.empty(); const bool WantsIdField = WantsAllFields || WantedFieldNames.contains("id"); const bool WantsClientPathField = WantsAllFields || WantedFieldNames.contains("clientpath"); const bool WantsServerPathField = WantsAllFields || WantedFieldNames.contains("serverpath"); const bool WantsRawSizeField = WantsAllFields || WantedFieldNames.contains("rawsize"); const bool WantsSizeField = WantsAllFields || WantedFieldNames.contains("size"); std::vector Ids; std::vector ServerPaths; std::vector ClientPaths; std::vector Sizes; std::vector RawSizes; size_t Count = 0; Oplog.IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) { if (WantsIdField || WantsRawSizeField || WantsSizeField) { Ids.push_back(Id); } if (WantsServerPathField) { ServerPaths.push_back(std::string(ServerPath)); } if (WantsClientPathField) { ClientPaths.push_back(std::string(ClientPath)); } Count++; }); if (WantsRawSizeField || WantsSizeField) { if (WantsSizeField) { Sizes.resize(Ids.size(), (uint64_t)-1); } if (WantsRawSizeField) { RawSizes.resize(Ids.size(), (uint64_t)-1); } Oplog.IterateChunks( Project.RootDir, Ids, false, [&](size_t Index, const IoBuffer& Payload, uint64_t /*ModTag*/) { try { if (Payload) { if (Payload.GetContentType() == ZenContentType::kCompressedBinary) { if (WantsRawSizeField) { IoHash _; if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index], /*OutOptionalTotalCompressedSize*/ nullptr)) { if (WantsSizeField) { Sizes[Index] = Payload.GetSize(); } } else { ZEN_WARN("oplog '{}/{}': payload for project file info for id {} is not a valid compressed binary.", Project.Identifier, Oplog.OplogId(), Ids[Index]); } } else if (WantsSizeField) { Sizes[Index] = Payload.GetSize(); } } else { if (WantsSizeField) { Sizes[Index] = Payload.GetSize(); } if (WantsRawSizeField) { RawSizes[Index] = Payload.GetSize(); } } } else { ZEN_WARN("oplog '{}/{}': failed getting payload for project file info for id {}.", Project.Identifier, Oplog.OplogId(), Ids[Index]); } } catch (const std::exception& Ex) { ZEN_WARN("oplog '{}/{}': failed getting project file info for id {}. Reason: '{}'", Project.Identifier, Oplog.OplogId(), Ids[Index], Ex.what()); } return true; }, &GetSmallWorkerPool(EWorkloadType::Burst), 256u * 1024u); } CbObjectWriter Response(64u + Count * ((WantsIdField ? (5 + sizeof(Oid::OidBits)) : 0) + (WantsServerPathField ? 64 : 0) + (WantsClientPathField ? 64 : 0) + (WantsSizeField ? 16 : 0) + (WantsRawSizeField ? 16 : 0))); Response.BeginArray("files"sv); for (size_t Index = 0; Index < Count; Index++) { Response.BeginObject(); if (WantsIdField) { Response << "id"sv << Ids[Index]; } if (WantsServerPathField) { Response << "serverpath"sv << ServerPaths[Index]; } if (WantsClientPathField) { Response << "clientpath"sv << ClientPaths[Index]; } if (WantsSizeField && Sizes[Index] != (uint64_t)-1) { Response << "size"sv << Sizes[Index]; } if (WantsRawSizeField && RawSizes[Index] != (uint64_t)-1) { Response << "rawsize"sv << RawSizes[Index]; } Response.EndObject(); } Response.EndArray(); return Response.Save(); } CbObject ProjectStore::GetProjectChunkInfos(LoggerRef InLog, Project& Project, Oplog& Oplog, const std::unordered_set& WantedFieldNames) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("ProjectStore::GetProjectChunkInfos"); auto Log = [&InLog]() { return InLog; }; using namespace std::literals; const bool WantsAllFields = WantedFieldNames.empty(); const bool WantsIdField = WantsAllFields || WantedFieldNames.contains("id"); const bool WantsRawHashField = WantsAllFields || WantedFieldNames.contains("rawhash"); const bool WantsRawSizeField = WantsAllFields || WantedFieldNames.contains("rawsize"); const bool WantsSizeField = WantsAllFields || WantedFieldNames.contains("size"); std::vector Ids; std::vector Hashes; std::vector RawSizes; std::vector Sizes; size_t Count = 0; size_t EstimatedCount = Oplog.OplogCount(); if (WantsIdField) { Ids.reserve(EstimatedCount); } if (WantsRawHashField || WantsRawSizeField || WantsSizeField) { Hashes.reserve(EstimatedCount); } Oplog.IterateChunkMap([&](const Oid& Id, const IoHash& Hash) { if (WantsIdField) { Ids.push_back(Id); } if (WantsRawHashField || WantsRawSizeField || WantsSizeField) { Hashes.push_back(Hash); } Count++; }); if (WantsRawSizeField || WantsSizeField) { if (WantsRawSizeField) { RawSizes.resize(Hashes.size(), (uint64_t)-1); } if (WantsSizeField) { Sizes.resize(Hashes.size(), (uint64_t)-1); } WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); (void)Oplog.IterateChunks( Hashes, false, [&](size_t Index, const IoBuffer& Payload, uint64_t /*ModTag*/) -> bool { try { if (Payload) { if (Payload.GetContentType() == ZenContentType::kCompressedBinary) { if (WantsRawSizeField) { ZEN_ASSERT_SLOW(RawSizes[Index] == (uint64_t)-1); IoHash _; if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index], /*OutOptionalTotalCompressedSize*/ nullptr)) { if (WantsSizeField) { ZEN_ASSERT_SLOW(Sizes[Index] == (uint64_t)-1); Sizes[Index] = Payload.GetSize(); } } else { ZEN_WARN("oplog '{}/{}': payload for project chunk for id {} is not a valid compressed binary.", Project.Identifier, Oplog.OplogId(), Ids[Index]); } } else if (WantsSizeField) { ZEN_ASSERT_SLOW(Sizes[Index] == (uint64_t)-1); Sizes[Index] = Payload.GetSize(); } } else { if (WantsSizeField) { ZEN_ASSERT_SLOW(Sizes[Index] == (uint64_t)-1); Sizes[Index] = Payload.GetSize(); } if (WantsRawSizeField) { ZEN_ASSERT_SLOW(Sizes[Index] == (uint64_t)-1); RawSizes[Index] = Payload.GetSize(); } } } else { ZEN_WARN("oplog '{}/{}': failed getting payload for chunk for id {}", Project.Identifier, Oplog.OplogId(), Ids[Index]); } } catch (const std::exception& Ex) { ZEN_WARN("oplog '{}/{}': failed getting project chunk info for id {}. Reason: '{}'", Project.Identifier, Oplog.OplogId(), Ids[Index], Ex.what()); } return true; }, &WorkerPool, 256u * 1024u); } CbObjectWriter Response(64u + Count * ((WantsIdField ? (5 + sizeof(Oid::OidBits)) : 0) + (WantsRawHashField ? (10 + sizeof(IoHash::Hash)) : 0) + (WantsSizeField ? 16 : 0) + (WantsRawSizeField ? 16 : 0))); Response.BeginArray("chunkinfos"sv); for (size_t Index = 0; Index < Count; Index++) { Response.BeginObject(); if (WantsIdField) { Response << "id"sv << Ids[Index]; } if (WantsRawHashField) { Response << "rawhash"sv << Hashes[Index]; } if (WantsSizeField && Sizes[Index] != (uint64_t)-1) { Response << "size"sv << Sizes[Index]; } if (WantsRawSizeField && RawSizes[Index] != (uint64_t)-1) { Response << "rawsize"sv << RawSizes[Index]; } Response.EndObject(); } Response.EndArray(); return Response.Save(); } CbObject ProjectStore::GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, const Oid& ChunkId) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("ProjectStore::GetChunkInfo"); using namespace std::literals; auto Log = [&InLog]() { return InLog; }; ZEN_UNUSED(Log); IoBuffer Chunk = Oplog.FindChunk(Project.RootDir, ChunkId, nullptr); if (!Chunk) { return {}; } uint64_t ChunkSize = Chunk.GetSize(); if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) { IoHash RawHash; uint64_t RawSize; bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr); if (!IsCompressed) { throw std::runtime_error( fmt::format("Chunk info request for malformed chunk id '{}/{}'/'{}'", Project.Identifier, Oplog.OplogId(), ChunkId)); } ChunkSize = RawSize; } CbObjectWriter Response; Response << "size"sv << ChunkSize; return Response.Save(); } static ProjectStore::GetChunkRangeResult ExtractRange(IoBuffer&& Chunk, uint64_t Offset, uint64_t Size, ZenContentType AcceptType) { ZEN_MEMSCOPE(GetProjectstoreTag()); ProjectStore::GetChunkRangeResult Result; Result.ContentType = Chunk.GetContentType(); if (Result.ContentType == ZenContentType::kCompressedBinary) { IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize); if (!Compressed) { Result.Error = ProjectStore::GetChunkRangeResult::EError::MalformedContent; Result.ErrorDescription = fmt::format("Malformed payload, not a compressed buffer"); return Result; } const bool IsFullRange = (Offset == 0) && ((Size == ~(0ull)) || (Size == RawSize)); if (IsFullRange) { if (AcceptType == ZenContentType::kBinary) { Result.Chunk = Compressed.DecompressToComposite(); Result.ContentType = ZenContentType::kBinary; } else { Result.Chunk = Compressed.GetCompressed(); } Result.RawSize = 0; } else { if (Size == ~(0ull) || (Offset + Size) > RawSize) { if (Offset < RawSize) { Size = RawSize - Offset; } else { Size = 0; } } if (Size == 0) { Result.Error = ProjectStore::GetChunkRangeResult::EError::OutOfRange; Result.ErrorDescription = fmt::format("Chunk request for range outside of compressed chunk. Offset: {}, Size: {}, ChunkSize: {}", Offset, Size, RawSize); return Result; } if (AcceptType == ZenContentType::kBinary) { Result.Chunk = CompositeBuffer(Compressed.Decompress(Offset, Size)); Result.ContentType = ZenContentType::kBinary; } else { // Value will be a range of compressed blocks that covers the requested range // The client will have to compensate for any offsets that do not land on an even block size multiple Result.Chunk = Compressed.GetRange(Offset, Size).GetCompressed(); } Result.RawSize = RawSize; } Result.RawHash = RawHash; } else { const uint64_t ChunkSize = Chunk.GetSize(); const bool IsFullRange = (Offset == 0) && ((Size == ~(0ull)) || (Size == ChunkSize)); if (IsFullRange) { Result.Chunk = CompositeBuffer(SharedBuffer(std::move(Chunk))); Result.RawSize = 0; } else { if (Size == ~(0ull) || (Offset + Size) > ChunkSize) { if (Offset < ChunkSize) { Size = ChunkSize - Offset; } else { Size = 0; } } if (Size == 0) { Result.Error = ProjectStore::GetChunkRangeResult::EError::OutOfRange; Result.ErrorDescription = fmt::format("Chunk request for range outside of compressed chunk. Offset: {}, Size: {}, ChunkSize: {}", Offset, Size, ChunkSize); return Result; } Result.Chunk = CompositeBuffer(SharedBuffer(IoBuffer(std::move(Chunk), Offset, Size))); Result.RawSize = ChunkSize; } } Result.Error = ProjectStore::GetChunkRangeResult::EError::Ok; return Result; } ProjectStore::GetChunkRangeResult ProjectStore::GetChunkRange(LoggerRef InLog, Project& Project, Oplog& Oplog, const Oid& ChunkId, uint64_t Offset, uint64_t Size, ZenContentType AcceptType, uint64_t* OptionalInOutModificationTag) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("ProjectStore::GetChunkRange"); auto Log = [&InLog]() { return InLog; }; ZEN_UNUSED(Log); uint64_t OldTag = OptionalInOutModificationTag == nullptr ? 0 : *OptionalInOutModificationTag; IoBuffer Chunk = Oplog.FindChunk(Project.RootDir, ChunkId, OptionalInOutModificationTag); if (!Chunk) { return GetChunkRangeResult{.Error = GetChunkRangeResult::EError::NotFound, .ErrorDescription = fmt::format("Chunk range request for chunk {}/{}/{} failed, payload not found", Project.Identifier, Oplog.OplogId(), ChunkId)}; } if (OptionalInOutModificationTag != nullptr && OldTag == *OptionalInOutModificationTag) { return {.Error = GetChunkRangeResult::EError::NotModified}; } return ExtractRange(std::move(Chunk), Offset, Size, AcceptType); } IoBuffer ProjectStore::GetChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("ProjectStore::GetChunk"); ZEN_UNUSED(Project, Oplog); IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash); if (!Chunk) { return {}; } Chunk.SetContentType(ZenContentType::kCompressedBinary); return Chunk; } IoBuffer ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const Oid& ChunkId) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("ProjectStore::GetChunk"); Ref Project = OpenProject(ProjectId); if (!Project) { return {}; } Ref Oplog = Project->OpenOplog(OplogId, /*AllowCompact */ false, /*VerifyPathOnDisk*/ false); if (!Oplog) { return {}; } return Oplog->FindChunk(Project->RootDir, ChunkId, /*OptOutModificationTag*/ nullptr); } IoBuffer ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const IoHash& Cid) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("ProjectStore::GetChunk"); Ref Project = OpenProject(ProjectId); if (!Project) { return {}; } Ref Oplog = Project->OpenOplog(OplogId, /*AllowCompact */ false, /*VerifyPathOnDisk*/ false); if (!Oplog) { return {}; } return m_CidStore.FindChunkByCid(Cid); } bool ProjectStore::PutChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash, IoBuffer&& Chunk) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("ProjectStore::PutChunk"); IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); if (RawHash != ChunkHash) { throw std::runtime_error( fmt::format("Chunk request for invalid payload format for chunk {}/{}/{}", Project.Identifier, Oplog.OplogId(), ChunkHash)); } Oplog.CaptureAddedAttachments(std::vector{ChunkHash}); CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk, ChunkHash); return Result.New; } std::vector ProjectStore::GetChunks(Project& Project, Oplog& Oplog, std::span Requests) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("ProjectStore::GetChunks"); ZEN_ASSERT(!Requests.empty()); std::vector Results; size_t RequestCount = Requests.size(); Results.resize(RequestCount); if (RequestCount > 1) { std::vector ChunkRawHashes; std::vector ChunkRawHashesRequestIndex; std::vector ChunkIds; std::vector ChunkIdsRequestIndex; ChunkRawHashes.reserve(RequestCount); ChunkRawHashesRequestIndex.reserve(RequestCount); ChunkIds.reserve(RequestCount); ChunkIdsRequestIndex.reserve(RequestCount); for (size_t RequestIndex = 0; RequestIndex < Requests.size(); RequestIndex++) { const ChunkRequest& Request = Requests[RequestIndex]; if (Request.Id.index() == 0) { ChunkRawHashes.push_back(std::get(Request.Id)); ChunkRawHashesRequestIndex.push_back(RequestIndex); } else { ChunkIds.push_back(std::get(Request.Id)); ChunkIdsRequestIndex.push_back(RequestIndex); } } WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); if (!ChunkRawHashes.empty()) { Oplog.IterateChunks( ChunkRawHashes, true, [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool { if (Payload) { size_t RequestIndex = ChunkRawHashesRequestIndex[Index]; const ChunkRequest& Request = Requests[RequestIndex]; ChunkResult& Result = Results[RequestIndex]; Result.Exists = true; if (!Request.SkipData) { Result.ChunkBuffer = std::move(Payload); Result.ChunkBuffer.MakeOwned(); } Result.ModTag = ModTag; } return true; }, &WorkerPool, 8u * 1024); } if (!ChunkIdsRequestIndex.empty()) { Oplog.IterateChunks( Project.RootDir, ChunkIds, true, [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool { if (Payload) { size_t RequestIndex = ChunkIdsRequestIndex[Index]; const ChunkRequest& Request = Requests[RequestIndex]; ChunkResult& Result = Results[RequestIndex]; Result.Exists = true; if (!Request.SkipData) { Result.ChunkBuffer = std::move(Payload); Result.ChunkBuffer.MakeOwned(); } Result.ModTag = ModTag; } return true; }, &WorkerPool, 8u * 1024); } } else { const ChunkRequest& Request = Requests.front(); ChunkResult& Result = Results.front(); if (Request.Id.index() == 0) { const IoHash& ChunkHash = std::get(Request.Id); IoBuffer Payload = m_CidStore.FindChunkByCid(ChunkHash); if (Payload) { Result.Exists = true; Result.ModTag = GetModificationTagFromRawHash(ChunkHash); if (!Request.SkipData) { Result.ChunkBuffer = std::move(Payload); Result.ChunkBuffer.MakeOwned(); } } } else { const Oid& ChunkId = std::get(Request.Id); uint64_t ModTag = 0; IoBuffer Payload = Oplog.FindChunk(Project.RootDir, ChunkId, &ModTag); if (Payload) { Result.Exists = true; Result.ModTag = ModTag; if (!Request.SkipData) { Result.ChunkBuffer = std::move(Payload); Result.ChunkBuffer.MakeOwned(); } } } } return Results; } std::vector ProjectStore::ParseChunksRequests(Project& Project, Oplog& Oplog, const CbObject& Cb) { ZEN_TRACE_CPU("Store::Rpc::getchunks"); using namespace std::literals; std::vector Requests; if (auto RequestFieldView = Cb["Request"sv]; RequestFieldView.IsObject()) { CbObjectView RequestView = RequestFieldView.AsObjectView(); bool SkipData = RequestView["SkipData"].AsBool(false); CbArrayView ChunksArray = RequestView["Chunks"sv].AsArrayView(); size_t RequestCount = ChunksArray.Num(); if (RequestCount > 0) { Requests.reserve(RequestCount); for (CbFieldView FieldView : ChunksArray) { CbObjectView ChunkObject = FieldView.AsObjectView(); ChunkRequest ChunkRequest = {.Offset = ChunkObject["Offset"sv].AsUInt64(0), .Size = ChunkObject["Size"sv].AsUInt64((uint64_t)-1), .SkipData = SkipData}; if (CbFieldView InputModificationTagView = ChunkObject.FindView("ModTag"); InputModificationTagView.IsInteger()) { ChunkRequest.ModTag = InputModificationTagView.AsUInt64(); } if (CbFieldView RawHashView = ChunkObject.FindView("RawHash"sv); RawHashView.IsHash()) { const IoHash ChunkHash = RawHashView.AsHash(); ChunkRequest.Id = ChunkHash; } else if (CbFieldView IdView = ChunkObject.FindView("Oid"sv); IdView.IsObjectId()) { const Oid ChunkId = IdView.AsObjectId(); ChunkRequest.Id = ChunkId; } else { throw std::runtime_error( fmt::format("oplog '{}/{}': malformed getchunks rpc request object, chunk request has no identifier", Project.Identifier, Oplog.OplogId())); } Requests.emplace_back(std::move(ChunkRequest)); } } } else if (CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); ChunksArray) { // Legacy full chunks only by rawhash size_t RequestCount = ChunksArray.Num(); Requests.reserve(RequestCount); std::vector Cids; Cids.reserve(ChunksArray.Num()); for (CbFieldView FieldView : ChunksArray) { Requests.push_back(ProjectStore::ChunkRequest{.Id = FieldView.AsHash()}); } } else { throw std::runtime_error(fmt::format("oplog '{}/{}': malformed getchunks rpc request object", Project.Identifier, Oplog.OplogId())); } return Requests; } CbPackage ProjectStore::WriteChunksRequestResponse(Project& Project, Oplog& Oplog, std::vector&& Requests, std::vector&& Results) { using namespace std::literals; CbPackage ResponsePackage; CbObjectWriter ResponseWriter(32 + Requests.size() * 64u); ResponseWriter.BeginArray("Chunks"sv); { for (size_t Index = 0; Index < Requests.size(); Index++) { const ChunkRequest& Request = Requests[Index]; ChunkResult& Result = Results[Index]; if (Result.Exists) { ResponseWriter.BeginObject(); { if (Request.Id.index() == 0) { const IoHash& RawHash = std::get(Request.Id); ResponseWriter.AddHash("Id", RawHash); } else { const Oid& Id = std::get(Request.Id); ResponseWriter.AddObjectId("Id", Id); } if (!Request.ModTag.has_value() || Request.ModTag.value() != Result.ModTag) { ResponseWriter.AddInteger("ModTag", Result.ModTag); if (!Request.SkipData) { auto ExtractRangeResult = ExtractRange(std::move(Result.ChunkBuffer), Request.Offset, Request.Size, ZenContentType::kCompressedBinary); if (ExtractRangeResult.Error == GetChunkRangeResult::EError::Ok) { if (ExtractRangeResult.ContentType == ZenContentType::kCompressedBinary) { ZEN_ASSERT(ExtractRangeResult.RawHash != IoHash::Zero); CompressedBuffer CompressedValue = CompressedBuffer::FromCompressedNoValidate(std::move(ExtractRangeResult.Chunk)); ZEN_ASSERT(CompressedValue); if (ExtractRangeResult.RawSize != 0) { // This really could use some thought so we don't send the same data if we get a request for // multiple ranges from the same chunk block uint64_t FragmentRawOffset = 0; OodleCompressor Compressor; OodleCompressionLevel CompressionLevel; uint64_t BlockSize = 0; if (CompressedValue.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) { if (BlockSize > 0) { FragmentRawOffset = (Request.Offset / BlockSize) * BlockSize; } else { FragmentRawOffset = Request.Offset; } uint64_t FragmentRawLength = CompressedValue.DecodeRawSize(); IoHashStream FragmentHashStream; FragmentHashStream.Append(ExtractRangeResult.RawHash.Hash, sizeof(ExtractRangeResult.RawHash.Hash)); FragmentHashStream.Append(&FragmentRawOffset, sizeof(FragmentRawOffset)); FragmentHashStream.Append(&FragmentRawLength, sizeof(FragmentRawLength)); IoHash FragmentHash = FragmentHashStream.GetHash(); ResponseWriter.AddHash("FragmentHash", FragmentHash); ResponseWriter.AddInteger("FragmentOffset", FragmentRawOffset); ResponseWriter.AddInteger("RawSize", ExtractRangeResult.RawSize); ResponsePackage.AddAttachment(CbAttachment(CompressedValue, FragmentHash)); } else { std::string ErrorString = "Failed to get compression parameters from partial compressed buffer"; ResponseWriter.AddString("Error", ErrorString); ZEN_WARN("oplog '{}/{}': {}", Project.Identifier, Oplog.OplogId(), ErrorString); } } else { ResponseWriter.AddHash("RawHash"sv, ExtractRangeResult.RawHash); ResponsePackage.AddAttachment(CbAttachment(std::move(CompressedValue), ExtractRangeResult.RawHash)); } } else { IoHashStream HashStream; ZEN_ASSERT(Request.Id.index() == 1); const Oid& Id = std::get(Request.Id); HashStream.Append(Id.OidBits, sizeof(Id.OidBits)); HashStream.Append(&Request.Offset, sizeof(Request.Offset)); HashStream.Append(&Request.Size, sizeof(Request.Size)); IoHash Hash = HashStream.GetHash(); ResponseWriter.AddHash("Hash"sv, Hash); if (ExtractRangeResult.RawSize != 0) { ResponseWriter.AddInteger("Size", ExtractRangeResult.RawSize); } ResponsePackage.AddAttachment(CbAttachment(std::move(ExtractRangeResult.Chunk), Hash)); } } else { std::string ErrorString = fmt::format("Failed fetching chunk range ({})", ExtractRangeResult.ErrorDescription); ResponseWriter.AddString("Error", ErrorString); ZEN_WARN("oplog '{}/{}': {}", Project.Identifier, Oplog.OplogId(), ErrorString); } } } } ResponseWriter.EndObject(); } } } ResponseWriter.EndArray(); ResponsePackage.SetObject(ResponseWriter.Save()); return ResponsePackage; } bool ProjectStore::AreDiskWritesAllowed() const { return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); } void ProjectStore::EnableUpdateCapture() { ZEN_MEMSCOPE(GetProjectstoreTag()); m_ProjectsLock.WithExclusiveLock([&]() { if (m_UpdateCaptureRefCounter == 0) { ZEN_ASSERT(!m_CapturedProjects); m_CapturedProjects = std::make_unique>(); } else { ZEN_ASSERT(m_CapturedProjects); } m_UpdateCaptureRefCounter++; }); } void ProjectStore::DisableUpdateCapture() { m_ProjectsLock.WithExclusiveLock([&]() { ZEN_ASSERT(m_CapturedProjects); ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); m_UpdateCaptureRefCounter--; if (m_UpdateCaptureRefCounter == 0) { m_CapturedProjects.reset(); } }); } std::vector ProjectStore::GetCapturedProjectsLocked() { ZEN_MEMSCOPE(GetProjectstoreTag()); if (m_CapturedProjects) { return *m_CapturedProjects; } return {}; } std::string ProjectStore::GetGcName(GcCtx&) { ZEN_MEMSCOPE(GetProjectstoreTag()); return fmt::format("projectstore: '{}'", m_ProjectBasePath.string()); } class ProjectStoreGcStoreCompactor : public GcStoreCompactor { public: ProjectStoreGcStoreCompactor(ProjectStore& ProjectStore, const std::filesystem::path& BasePath, std::vector&& OplogPathsToRemove, std::vector&& ProjectPathsToRemove) : m_ProjectStore(ProjectStore) , m_BasePath(BasePath) , m_OplogPathsToRemove(std::move(OplogPathsToRemove)) , m_ProjectPathsToRemove(std::move(ProjectPathsToRemove)) { } virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function&) override { ZEN_TRACE_CPU("Store::CompactStore"); ZEN_MEMSCOPE(GetProjectstoreTag()); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: projectstore [COMPACT] '{}': RemovedDisk: {} in {}", m_BasePath, NiceBytes(Stats.RemovedDisk), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); size_t CompactOplogCount = 0; if (Ctx.Settings.IsDeleteMode) { for (const std::filesystem::path& OplogPath : m_OplogPathsToRemove) { uint64_t OplogSize = ProjectStore::Oplog::TotalSize(OplogPath); if (DeleteDirectories(OplogPath)) { ZEN_DEBUG("GCV2: projectstore [COMPACT] '{}': removed oplog folder '{}', removed {}", m_BasePath, OplogPath, NiceBytes(OplogSize)); Stats.RemovedDisk += OplogSize; } else { ZEN_WARN("GCV2: projectstore [COMPACT] '{}': Failed to remove oplog folder '{}'", m_BasePath, OplogPath); } } for (const std::filesystem::path& ProjectPath : m_ProjectPathsToRemove) { uint64_t ProjectSize = ProjectStore::Project::TotalSize(ProjectPath); if (DeleteDirectories(ProjectPath)) { ZEN_DEBUG("GCV2: projectstore [COMPACT] '{}': removed project folder '{}', removed {}", m_BasePath, ProjectPath, NiceBytes(ProjectSize)); Stats.RemovedDisk += ProjectSize; } else { ZEN_WARN("GCV2: projectstore [COMPACT] '{}': Failed to remove project folder '{}'", m_BasePath, ProjectPath); } } } for (auto ProjectIt : m_ProjectStore.m_Projects) { Ref Project = ProjectIt.second; std::vector OplogsToCompact = Project->GetOplogsToCompact(); CompactOplogCount += OplogsToCompact.size(); for (const std::string& OplogId : OplogsToCompact) { Ref OpLog; { RwLock::SharedLockScope __(Project->m_ProjectLock); if (auto OpIt = Project->m_Oplogs.find(OplogId); OpIt != Project->m_Oplogs.end()) { OpLog = OpIt->second; } else { Stopwatch OplogTimer; std::filesystem::path OplogBasePath = Project->BasePathForOplog(OplogId); OpLog = new ProjectStore::Oplog( Project->Log(), Project->Identifier, OplogId, Project->m_CidStore, OplogBasePath, std::filesystem::path{}, ProjectStore::Oplog::EMode::kFull); // We need it to be a full read so we can write a new index snapshot OpLog->Read(); if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: projectstore [COMPACT] '{}': read oplog '{}/{}' at '{}' in {}", m_BasePath, Project->Identifier, OplogId, OplogBasePath, NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); } } if (OpLog) { const uint64_t PreSize = OpLog->TotalSize(); OpLog->Compact(!Ctx.Settings.IsDeleteMode, /*RetainLSNs*/ true, fmt::format("GCV2: projectstore [COMPACT] '{}': ", m_BasePath)); const uint64_t PostSize = OpLog->TotalSize(); const uint64_t FreedSize = (PreSize > PostSize) ? (PreSize - PostSize) : 0; Stats.RemovedDisk += FreedSize; } } } } if (!Ctx.Settings.IsDeleteMode) { ZEN_DEBUG("GCV2: projectstore [COMPACT] '{}': Skipped deleting of {} oplogs and {} projects, skipped compacting {} oplogs", m_BasePath, m_OplogPathsToRemove.size(), m_ProjectPathsToRemove.size(), CompactOplogCount); } m_ProjectPathsToRemove.clear(); m_OplogPathsToRemove.clear(); } virtual std::string GetGcName(GcCtx&) override { return fmt::format("projectstore: '{}'", m_BasePath.string()); } private: ProjectStore& m_ProjectStore; std::filesystem::path m_BasePath; std::vector m_OplogPathsToRemove; std::vector m_ProjectPathsToRemove; }; GcStoreCompactor* ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { ZEN_TRACE_CPU("Store::RemoveExpiredData"); ZEN_MEMSCOPE(GetProjectstoreTag()); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: projectstore [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {} in {}", m_ProjectBasePath, Stats.CheckedCount, Stats.FoundCount, Stats.DeletedCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); std::vector OplogPathsToRemove; std::vector ProjectPathsToRemove; std::vector> ExpiredProjects; std::vector> Projects; DiscoverProjects(); { RwLock::SharedLockScope Lock(m_ProjectsLock); for (auto& Kv : m_Projects) { Stats.CheckedCount++; if (Kv.second->IsExpired(Ctx.Settings.ProjectStoreExpireTime)) { ExpiredProjects.push_back(Kv.second); continue; } Projects.push_back(Kv.second); } } size_t ExpiredOplogCount = 0; for (const Ref& Project : Projects) { if (Ctx.IsCancelledFlag) { break; } std::vector ExpiredOplogs; std::vector OpLogs = Project->ScanForOplogs(); for (const std::string& OplogId : OpLogs) { Stats.CheckedCount++; if (Project->IsExpired(Ctx.Settings.ProjectStoreExpireTime, OplogId)) { ExpiredOplogs.push_back(OplogId); } else if (!Project->IsOplogTouchedSince(GcClock::Now() - std::chrono::minutes(15), OplogId)) { if (Project->TryUnloadOplog(OplogId)) { ZEN_INFO("GCV2: projectstore [REMOVE EXPIRED] '{}': Unloaded oplog {}/{} due to inactivity", m_ProjectBasePath, Project->Identifier, OplogId); } } } std::filesystem::path ProjectPath = BasePathForProject(Project->Identifier); ExpiredOplogCount += ExpiredOplogs.size(); if (Ctx.Settings.IsDeleteMode) { for (const std::string& OplogId : ExpiredOplogs) { std::filesystem::path RemovePath; if (Project->RemoveOplog(OplogId, RemovePath)) { if (!RemovePath.empty()) { OplogPathsToRemove.push_back(RemovePath); } Stats.DeletedCount++; } } Project->Flush(); } } if (Ctx.Settings.IsDeleteMode) { for (const Ref& Project : ExpiredProjects) { std::string ProjectId = Project->Identifier; { { if (!Project->IsExpired(Ctx.Settings.ProjectStoreExpireTime)) { ZEN_DEBUG( "GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project no longer " "expired.", m_ProjectBasePath, ProjectId); continue; } } std::filesystem::path RemovePath; bool Success = RemoveProject(ProjectId, RemovePath); if (!Success) { ZEN_DEBUG( "GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project folder is locked.", m_ProjectBasePath, ProjectId); continue; } if (!RemovePath.empty()) { ProjectPathsToRemove.push_back(RemovePath); } } } Stats.DeletedCount += ExpiredProjects.size(); } size_t ExpiredProjectCount = ExpiredProjects.size(); Stats.FoundCount += ExpiredOplogCount + ExpiredProjectCount; return new ProjectStoreGcStoreCompactor(*this, m_ProjectBasePath, std::move(OplogPathsToRemove), std::move(ProjectPathsToRemove)); } class ProjectStoreReferenceChecker : public GcReferenceChecker { public: ProjectStoreReferenceChecker(ProjectStore& InProjectStore) : m_ProjectStore(InProjectStore) { m_ProjectStore.EnableUpdateCapture(); } virtual ~ProjectStoreReferenceChecker() { try { m_ProjectStore.DisableUpdateCapture(); } catch (const std::exception& Ex) { ZEN_ERROR("~ProjectStoreReferenceChecker threw exception: '{}'", Ex.what()); } } virtual std::string GetGcName(GcCtx&) override { return "projectstore"; } virtual void PreCache(GcCtx&) override {} virtual void UpdateLockedState(GcCtx& Ctx) override { ZEN_TRACE_CPU("Store::UpdateLockedState"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; std::vector> AddedOplogs; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} in {} new oplogs", "projectstore", m_References.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()), AddedOplogs.size()); }); std::vector AddedProjects = m_ProjectStore.GetCapturedProjectsLocked(); for (const std::string& AddedProject : AddedProjects) { if (auto It = m_ProjectStore.m_Projects.find(AddedProject); It != m_ProjectStore.m_Projects.end()) { ProjectStore::Project& Project = *It->second; for (auto& OplogPair : Project.m_Oplogs) { Ref Oplog = OplogPair.second; AddedOplogs.push_back(Oplog); } } } for (auto& ProjectPair : m_ProjectStore.m_Projects) { ProjectStore::Project& Project = *ProjectPair.second; std::vector AddedOplogNames(Project.GetCapturedOplogsLocked()); for (const std::string& OplogName : AddedOplogNames) { if (auto It = Project.m_Oplogs.find(OplogName); It != Project.m_Oplogs.end()) { Ref Oplog = It->second; AddedOplogs.push_back(Oplog); } } } for (const Ref& Oplog : AddedOplogs) { size_t BaseReferenceCount = m_References.size(); Stopwatch InnerTimer; const auto __ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} from {}", Oplog->m_BasePath, m_References.size() - BaseReferenceCount, NiceTimeSpanMs(InnerTimer.GetElapsedTimeMs()), Oplog->OplogId()); }); Oplog->GetAttachmentsLocked(m_References, Ctx.Settings.StoreProjectAttachmentMetaData); if (std::vector PendingChunkReferences = Oplog->GetPendingChunkReferencesLocked(); !PendingChunkReferences.empty()) { m_References.insert(m_References.end(), PendingChunkReferences.begin(), PendingChunkReferences.end()); } } FilterReferences(Ctx, fmt::format("projectstore [LOCKSTATE] '{}'", "projectstore"), m_References); } virtual std::span GetUnusedReferences(GcCtx& Ctx, std::span IoCids) override { ZEN_TRACE_CPU("Store::GetUnusedReferences"); auto Log = [&Ctx]() { return Ctx.Logger; }; size_t InitialCount = IoCids.size(); size_t UsedCount = InitialCount; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", "projectstore", UsedCount, InitialCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); std::span UnusedReferences = KeepUnusedReferences(m_References, IoCids); UsedCount = IoCids.size() - UnusedReferences.size(); return UnusedReferences; } private: ProjectStore& m_ProjectStore; std::vector m_References; }; class ProjectStoreOplogReferenceChecker : public GcReferenceChecker { public: ProjectStoreOplogReferenceChecker(ProjectStore& InProjectStore, Ref InProject, std::string_view InOplog) : m_ProjectStore(InProjectStore) , m_Project(InProject) , m_OplogId(InOplog) { m_Project->EnableUpdateCapture(); } virtual ~ProjectStoreOplogReferenceChecker() { try { m_Project->DisableUpdateCapture(); if (m_OplogHasUpdateCapture) { RwLock::SharedLockScope _(m_Project->m_ProjectLock); if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) { Ref Oplog = It->second; Oplog->DisableUpdateCapture(); m_OplogHasUpdateCapture = false; } } } catch (const std::exception& Ex) { ZEN_ERROR("~ProjectStoreOplogReferenceChecker threw exception: '{}'", Ex.what()); } } virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}/{}'", m_Project->Identifier, m_OplogId); } virtual void PreCache(GcCtx& Ctx) override { ZEN_TRACE_CPU("Store::Oplog::PreCache"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': precached {} references in {} from {}/{}", m_OplogBasePath, m_References.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()), m_Project->Identifier, m_OplogId); }); m_OplogBasePath = m_Project->BasePathForOplog(m_OplogId); { Ref Oplog; RwLock::SharedLockScope __(m_Project->m_ProjectLock); if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) { Oplog = It->second; Oplog->EnableUpdateCapture(); m_OplogHasUpdateCapture = true; } else if (ProjectStore::Oplog::ExistsAt(m_OplogBasePath)) { Stopwatch OplogTimer; Oplog = new ProjectStore::Oplog(m_Project->Log(), m_Project->Identifier, m_OplogId, m_Project->m_CidStore, m_OplogBasePath, std::filesystem::path{}, ProjectStore::Oplog::EMode::kBasicReadOnly); Oplog->Read(); if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': read oplog '{}/{}' in {}", m_OplogBasePath, m_Project->Identifier, m_OplogId, NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); } } else { return; } RwLock::SharedLockScope ___(Oplog->m_OplogLock); if (Ctx.IsCancelledFlag) { return; } GcClock::TimePoint CompactExpireTime = GcClock::Now() - std::chrono::minutes(30); if (!m_Project->IsOplogTouchedSince(CompactExpireTime, m_OplogId)) { const uint32_t CompactUnusedThreshold = 25; if (Oplog->GetUnusedSpacePercent() >= CompactUnusedThreshold) { m_Project->AddOplogToCompact(m_OplogId); } } Oplog->GetAttachmentsLocked(m_References, Ctx.Settings.StoreProjectAttachmentMetaData); m_OplogAccessTime = m_Project->LastOplogAccessTime(m_OplogId); } FilterReferences(Ctx, fmt::format("projectstore [PRECACHE] '{}'", m_OplogBasePath), m_References); } virtual void UpdateLockedState(GcCtx& Ctx) override { ZEN_TRACE_CPU("Store::Oplog::UpdateLockedState"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} from {}/{}", m_OplogBasePath, m_AddedReferences.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()), m_Project->Identifier, m_OplogId); }); if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) { Ref Oplog = It->second; Oplog->IterateCapturedOpsLocked([&](const Oid& Key, ProjectStore::LogSequenceNumber LSN, const CbObjectView& UpdateOp) -> bool { ZEN_UNUSED(Key, LSN); UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_AddedReferences.emplace_back(Visitor.AsAttachment()); }); return true; }); std::vector AddedAttachments = Oplog->GetCapturedAttachmentsLocked(); m_AddedReferences.insert(m_AddedReferences.end(), AddedAttachments.begin(), AddedAttachments.end()); if (std::vector PendingChunkReferences = Oplog->GetPendingChunkReferencesLocked(); !PendingChunkReferences.empty()) { m_AddedReferences.insert(m_AddedReferences.end(), PendingChunkReferences.begin(), PendingChunkReferences.end()); } } else if (m_Project->LastOplogAccessTime(m_OplogId) > m_OplogAccessTime && ProjectStore::Oplog::ExistsAt(m_OplogBasePath)) { Stopwatch OplogTimer; { Ref Oplog(new ProjectStore::Oplog(m_Project->Log(), m_Project->Identifier, m_OplogId, m_Project->m_CidStore, m_OplogBasePath, std::filesystem::path{}, ProjectStore::Oplog::EMode::kBasicReadOnly)); Oplog->Read(); if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': read oplog '{}/{}' in {}", m_OplogBasePath, m_Project->Identifier, m_OplogId, NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); } OplogTimer.Reset(); Oplog->GetAttachmentsLocked(m_AddedReferences, Ctx.Settings.StoreProjectAttachmentMetaData); } if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': read referenced attachments from oplog '{}/{}' in {}", m_OplogBasePath, m_Project->Identifier, m_OplogId, NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); } } FilterReferences(Ctx, fmt::format("projectstore [LOCKSTATE] '{}'", m_OplogBasePath), m_AddedReferences); } virtual std::span GetUnusedReferences(GcCtx& Ctx, std::span IoCids) override { ZEN_TRACE_CPU("Store::Oplog::GetUnusedReferences"); auto Log = [&Ctx]() { return Ctx.Logger; }; const size_t InitialCount = IoCids.size(); size_t UsedCount = InitialCount; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {} from {}/{}", m_OplogBasePath, UsedCount, InitialCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()), m_Project->Identifier, m_OplogId); }); std::span UnusedReferences = KeepUnusedReferences(m_References, IoCids); UnusedReferences = KeepUnusedReferences(m_AddedReferences, UnusedReferences); UsedCount = IoCids.size() - UnusedReferences.size(); return UnusedReferences; } ProjectStore& m_ProjectStore; Ref m_Project; std::string m_OplogId; std::filesystem::path m_OplogBasePath; bool m_OplogHasUpdateCapture = false; std::vector m_References; std::vector m_AddedReferences; GcClock::TimePoint m_OplogAccessTime; }; std::vector ProjectStore::CreateReferenceCheckers(GcCtx& Ctx) { ZEN_TRACE_CPU("Store::CreateReferenceCheckers"); auto Log = [&Ctx]() { return Ctx.Logger; }; size_t ProjectCount = 0; size_t OplogCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: projectstore [CREATE CHECKERS] '{}': opened {} projects and {} oplogs in {}", m_ProjectBasePath, ProjectCount, OplogCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); DiscoverProjects(); std::vector> Projects; std::vector Checkers; Checkers.emplace_back(new ProjectStoreReferenceChecker(*this)); { RwLock::SharedLockScope Lock(m_ProjectsLock); Projects.reserve(m_Projects.size()); for (auto& Kv : m_Projects) { Projects.push_back(Kv.second); } } ProjectCount += Projects.size(); try { for (const Ref& Project : Projects) { std::vector OpLogs = Project->ScanForOplogs(); Checkers.reserve(Checkers.size() + OpLogs.size()); for (const std::string& OpLogId : OpLogs) { Checkers.emplace_back(new ProjectStoreOplogReferenceChecker(*this, Project, OpLogId)); OplogCount++; } } } catch (const std::exception&) { while (!Checkers.empty()) { delete Checkers.back(); Checkers.pop_back(); } throw; } return Checkers; } std::vector ProjectStore::LockState(GcCtx& Ctx) { ZEN_TRACE_CPU("Store::LockState"); auto Log = [&Ctx]() { return Ctx.Logger; }; ZEN_UNUSED(Log); std::vector Locks; Locks.emplace_back(RwLock::SharedLockScope(m_ProjectsLock)); for (auto& ProjectIt : m_Projects) { std::vector ProjectLocks = ProjectIt.second->GetGcReferencerLocks(); for (auto It = std::make_move_iterator(ProjectLocks.begin()); It != std::make_move_iterator(ProjectLocks.end()); It++) { Locks.emplace_back(std::move(*It)); } } return Locks; } class ProjectStoreOplogReferenceValidator : public GcReferenceValidator { public: ProjectStoreOplogReferenceValidator(ProjectStore& InProjectStore, std::string_view InProject, std::string_view InOplog) : m_ProjectStore(InProjectStore) , m_ProjectId(InProject) , m_OplogId(InOplog) { } virtual ~ProjectStoreOplogReferenceValidator() {} virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}/{}'", m_ProjectId, m_OplogId); } virtual void Validate(GcCtx& Ctx, GcReferenceValidatorStats& Stats) override { ZEN_TRACE_CPU("Store::Validate"); auto Log = [&Ctx]() { return Ctx.Logger; }; ProjectStore::Oplog::ValidationResult Result; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } std::string Status = Result.IsEmpty() ? "OK" : "Missing data"; ZEN_INFO("GCV2: projectstore [VALIDATE] '{}/{}': Validated in {}. OpCount: {}, MinLSN: {}, MaxLSN: {}, Status: {}", m_ProjectId, m_OplogId, NiceTimeSpanMs(Timer.GetElapsedTimeMs()), Result.OpCount, Result.LSNLow.Number, Result.LSNHigh.Number, Status); }); Ref Oplog; Ref Project = m_ProjectStore.OpenProject(m_ProjectId); if (Project) { { RwLock::SharedLockScope __(Project->m_ProjectLock); if (auto It = Project->m_Oplogs.find(m_OplogId); It != Project->m_Oplogs.end()) { Oplog = It->second; } else { Stopwatch OplogTimer; std::filesystem::path OplogBasePath = Project->BasePathForOplog(m_OplogId); Oplog = Ref(new ProjectStore::Oplog(Project->Log(), Project->Identifier, m_OplogId, Project->m_CidStore, OplogBasePath, std::filesystem::path{}, ProjectStore::Oplog::EMode::kBasicReadOnly)); Oplog->Read(); if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: projectstore [VALIDATE] '{}': read oplog '{}/{}' in {}", OplogBasePath, Project->Identifier, m_OplogId, NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); } if (Ctx.IsCancelledFlag) { return; } } } if (Oplog) { Result = Oplog->Validate(Project->RootDir, Ctx.IsCancelledFlag, nullptr); if (Ctx.IsCancelledFlag) { return; } Stats.CheckedCount = Result.OpCount; Stats.MissingChunks = Result.MissingChunks.size(); Stats.MissingFiles = Result.MissingFiles.size(); Stats.MissingMetas = Result.MissingMetas.size(); Stats.MissingAttachments = Result.MissingAttachments.size(); } if (!Result.IsEmpty()) { ZEN_WARN("GCV2: projectstore [VALIDATE] '{}/{}': Missing data: Files: {}, Chunks: {}, Metas: {}, Attachments: {}", m_ProjectId, m_OplogId, Result.MissingFiles.size(), Result.MissingChunks.size(), Result.MissingMetas.size(), Result.MissingAttachments.size()); } } } ProjectStore& m_ProjectStore; std::string m_ProjectId; std::string m_OplogId; }; std::vector ProjectStore::CreateReferenceValidators(GcCtx& Ctx) { if (Ctx.Settings.SkipCidDelete) { return {}; } auto Log = [&Ctx]() { return Ctx.Logger; }; ZEN_UNUSED(Log); DiscoverProjects(); std::vector> Oplogs; { RwLock::SharedLockScope _(m_ProjectsLock); for (auto& ProjectPair : m_Projects) { ProjectStore::Project& Project = *ProjectPair.second; std::vector OpLogs = Project.ScanForOplogs(); for (const std::string& OplogName : OpLogs) { Oplogs.push_back({Project.Identifier, OplogName}); } } } std::vector Validators; Validators.reserve(Oplogs.size()); for (const std::pair& Oplog : Oplogs) { Validators.push_back(new ProjectStoreOplogReferenceValidator(*this, Oplog.first, Oplog.second)); } return Validators; } ////////////////////////////////////////////////////////////////////////// Oid OpKeyStringAsOid(std::string_view OpKey) { eastl::fixed_vector Buffer; return OpKeyStringAsOid(OpKey, Buffer); } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS namespace testutils { using namespace std::literals; static std::string OidAsString(const Oid& Id) { StringBuilder<25> OidStringBuilder; Id.ToString(OidStringBuilder); return OidStringBuilder.ToString(); } static CbPackage CreateBulkDataOplogPackage(const Oid& Id, const std::span>& Attachments) { CbPackage Package; CbObjectWriter Object; Object << "key"sv << OidAsString(Id); if (!Attachments.empty()) { Object.BeginArray("bulkdata"); for (const auto& Attachment : Attachments) { CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); Object.BeginObject(); Object << "id"sv << Attachment.first; Object << "type"sv << "Standard"sv; Object << "data"sv << Attach; Object.EndObject(); Package.AddAttachment(Attach); } Object.EndArray(); } Package.SetObject(Object.Save()); return Package; }; static CbPackage CreateFilesOplogPackage(const Oid& Id, const std::filesystem::path ProjectRootDir, const std::span>& Attachments) { CbPackage Package; CbObjectWriter Object; Object << "key"sv << OidAsString(Id); if (!Attachments.empty()) { Object.BeginArray("files"); for (const auto& Attachment : Attachments) { std::filesystem::path ServerPath = std::filesystem::relative(Attachment.second, ProjectRootDir); std::filesystem::path ClientPath = ServerPath; // dummy Object.BeginObject(); Object << "id"sv << Attachment.first; Object << "serverpath"sv << ServerPath.string(); Object << "clientpath"sv << ClientPath.string(); Object.EndObject(); } Object.EndArray(); } Package.SetObject(Object.Save()); return Package; }; static std::vector> CreateAttachments( const std::span& Sizes, OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast, uint64_t BlockSize = 0) { std::vector> Result; Result.reserve(Sizes.size()); for (size_t Size : Sizes) { CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(CreateSemiRandomBlob(Size)), OodleCompressor::Mermaid, CompressionLevel, BlockSize); Result.emplace_back(std::pair(Oid::NewOid(), Compressed)); } return Result; } static uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64_t RawOffset) { if (RawOffset > 0) { uint64_t BlockSize = 0; OodleCompressor Compressor; OodleCompressionLevel CompressionLevel; if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) { return 0; } return BlockSize > 0 ? RawOffset % BlockSize : 0; } return 0; } template CbObject BuildChunksRequest(bool SkipData, std::string_view IdName, const std::vector& Chunks, const std::vector>& Ranges, const std::vector& ModTags) { CbObjectWriter Request; Request.BeginObject("Request"sv); { if (SkipData) { Request.AddBool("SkipData"sv, true); } if (!Chunks.empty()) { Request.BeginArray("Chunks"); for (size_t Index = 0; Index < Chunks.size(); Index++) { Request.BeginObject(); { Request << IdName << Chunks[Index]; if (!ModTags.empty()) { Request << "ModTag" << ModTags[Index]; } if (!Ranges.empty()) { Request << "Offset" << Ranges[Index].first; Request << "Size" << Ranges[Index].second; } } Request.EndObject(); } Request.EndArray(); } } Request.EndObject(); return Request.Save(); }; CbObject BuildChunksRequest(bool SkipData, const std::vector& Chunks, const std::vector>& Ranges, const std::vector& ModTags) { return BuildChunksRequest(SkipData, "Oid", Chunks, Ranges, ModTags); } CbObject BuildChunksRequest(bool SkipData, const std::vector& Chunks, const std::vector>& Ranges, const std::vector& ModTags) { return BuildChunksRequest(SkipData, "RawHash", Chunks, Ranges, ModTags); } } // namespace testutils TEST_CASE("project.opkeys") { using namespace std::literals; const std::string_view LongKey = "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"sv; // Not a test per se, this code just exercises the key computation logic to ensure all // edge cases are handled by the bug workaround logic for (int i = 1; i < 300; ++i) { CbObjectWriter Cbo; Cbo << "key"sv << LongKey.substr(0, i); const Oid KeyId = ComputeOpKey(Cbo.Save()); ZEN_UNUSED(KeyId); } { CbObjectWriter Cbo; Cbo << "key"sv << "abcdef"; const Oid KeyId = ComputeOpKey(Cbo.Save()); const Oid CorrectId = Oid::FromHexString( "7a03540e" "ecb0daa9" "00f2949e"); CHECK(KeyId == CorrectId); } { CbObjectWriter Cbo; Cbo << "key"sv << "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"; const Oid KeyId = ComputeOpKey(Cbo.Save()); const Oid CorrectId = Oid::FromHexString( "c5e88c79" "06b7fa38" "7b0d2efd"); CHECK(KeyId == CorrectId); } } TEST_CASE("project.store.create") { using namespace std::literals; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::string_view ProjectName("proj1"sv); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; Ref Project(ProjectStore.NewProject(BasePath / ProjectName, ProjectName, RootDir.string(), EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); CHECK(ProjectStore.DeleteProject(ProjectName)); CHECK(!Project->Exists(BasePath)); } TEST_CASE("project.store.lifetimes") { using namespace std::literals; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; Ref Project(ProjectStore.NewProject(BasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); Ref Oplog = Project->NewOplog("oplog1", {}); CHECK(Oplog); std::filesystem::path DeletePath; CHECK(Project->PrepareForDelete(DeletePath)); CHECK(!DeletePath.empty()); CHECK(!Project->OpenOplog("oplog1", /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true)); // Oplog is now invalid, but pointer can still be accessed since we store old oplog pointers CHECK(Oplog->OplogCount() == 0); // Project is still valid since we have a Ref to it CHECK(Project->Identifier == "proj1"sv); } TEST_CASE("project.store.gc") { using namespace std::literals; using namespace testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path Project1RootDir = TempDir.Path() / "game1"; std::filesystem::path Project1FilePath = TempDir.Path() / "game1" / "game.uproject"; { CreateDirectories(Project1FilePath.parent_path()); BasicFile ProjectFile; ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); } std::filesystem::path Project1OplogPath = TempDir.Path() / "game1" / "saves" / "cooked" / ".projectstore"; { CreateDirectories(Project1OplogPath.parent_path()); BasicFile OplogFile; OplogFile.Open(Project1OplogPath, BasicFile::Mode::kTruncate); } std::filesystem::path Project2RootDir = TempDir.Path() / "game2"; std::filesystem::path Project2FilePath = TempDir.Path() / "game2" / "game.uproject"; { CreateDirectories(Project2FilePath.parent_path()); BasicFile ProjectFile; ProjectFile.Open(Project2FilePath, BasicFile::Mode::kTruncate); } std::filesystem::path Project2Oplog1Path = TempDir.Path() / "game1" / "saves" / "cooked" / ".projectstore"; { CreateDirectories(Project2Oplog1Path.parent_path()); BasicFile OplogFile; OplogFile.Open(Project2Oplog1Path, BasicFile::Mode::kTruncate); } std::filesystem::path Project2Oplog2Path = TempDir.Path() / "game2" / "saves" / "cooked" / ".projectstore"; { CreateDirectories(Project2Oplog2Path.parent_path()); BasicFile OplogFile; OplogFile.Open(Project2Oplog2Path, BasicFile::Mode::kTruncate); } { Ref Project1(ProjectStore.NewProject(BasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); Ref Oplog = Project1->NewOplog("oplog1", Project1OplogPath); CHECK(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{77}))); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{7123, 583, 690, 99}))); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{55, 122}))); } { Ref Project2(ProjectStore.NewProject(BasePath / "proj2"sv, "proj2"sv, RootDir.string(), EngineRootDir.string(), Project2RootDir.string(), Project2FilePath.string())); { Ref Oplog = Project2->NewOplog("oplog2", Project2Oplog1Path); CHECK(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{177}))); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{9123, 383, 590, 96}))); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{535, 221}))); } { Ref Oplog = Project2->NewOplog("oplog3", Project2Oplog2Path); CHECK(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{137}))); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{9723, 683, 594, 98}))); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{531, 271}))); } } { GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24), .IsDeleteMode = true}; GcResult Result = Gc.CollectGarbage(Settings); CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); CHECK_EQ(21u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); CHECK(ProjectStore.OpenProject("proj1"sv)); CHECK(ProjectStore.OpenProject("proj2"sv)); } { GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24), .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24), .IsDeleteMode = true}; GcResult Result = Gc.CollectGarbage(Settings); CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); CHECK_EQ(21u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); CHECK(ProjectStore.OpenProject("proj1"sv)); CHECK(ProjectStore.OpenProject("proj2"sv)); } RemoveFile(Project1FilePath); { GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24), .IsDeleteMode = true}; GcResult Result = Gc.CollectGarbage(Settings); CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); CHECK_EQ(21u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); CHECK(ProjectStore.OpenProject("proj1"sv)); CHECK(ProjectStore.OpenProject("proj2"sv)); } { GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24), .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24), .CollectSmallObjects = true, .IsDeleteMode = true}; GcResult Result = Gc.CollectGarbage(Settings); CHECK_EQ(4u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); CHECK_EQ(21u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); CHECK_EQ(7u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); CHECK(!ProjectStore.OpenProject("proj1"sv)); CHECK(ProjectStore.OpenProject("proj2"sv)); } RemoveFile(Project2Oplog1Path); { GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24), .CollectSmallObjects = true, .IsDeleteMode = true}; GcResult Result = Gc.CollectGarbage(Settings); CHECK_EQ(3u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); CHECK_EQ(14u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); CHECK(!ProjectStore.OpenProject("proj1"sv)); CHECK(ProjectStore.OpenProject("proj2"sv)); } { GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24), .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24), .CollectSmallObjects = true, .IsDeleteMode = true}; GcResult Result = Gc.CollectGarbage(Settings); CHECK_EQ(3u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); CHECK_EQ(14u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); CHECK(!ProjectStore.OpenProject("proj1"sv)); CHECK(ProjectStore.OpenProject("proj2"sv)); } RemoveFile(Project2FilePath); { GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24), .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24), .CollectSmallObjects = true, .IsDeleteMode = true}; GcResult Result = Gc.CollectGarbage(Settings); CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); CHECK_EQ(14u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); CHECK_EQ(14u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); CHECK(!ProjectStore.OpenProject("proj1"sv)); CHECK(!ProjectStore.OpenProject("proj2"sv)); } } TEST_CASE("project.store.gc.prep") { using namespace std::literals; using namespace testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path Project1RootDir = TempDir.Path() / "game1"; std::filesystem::path Project1FilePath = TempDir.Path() / "game1" / "game.uproject"; { CreateDirectories(Project1FilePath.parent_path()); BasicFile ProjectFile; ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); } std::filesystem::path Project1OplogPath = TempDir.Path() / "game1" / "saves" / "cooked" / ".projectstore"; { CreateDirectories(Project1OplogPath.parent_path()); BasicFile OplogFile; OplogFile.Open(Project1OplogPath, BasicFile::Mode::kTruncate); } std::vector> OpAttachments = CreateAttachments(std::initializer_list{7123, 583, 690, 99}); std::vector OpChunkHashes; for (const auto& Chunk : OpAttachments) { OpChunkHashes.push_back(Chunk.second.DecodeRawHash()); } { Ref Project1(ProjectStore.NewProject(BasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); Ref Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); } { Ref Project1 = ProjectStore.OpenProject("proj1"sv); Project1->DeleteOplog("oplog1"sv); } // Equivalent of a `prep` existance check call for (auto Attachment : OpAttachments) { CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); } { GcSettings Settings = {.CacheExpireTime = GcClock::Now(), .ProjectStoreExpireTime = GcClock::Now(), .CollectSmallObjects = true, .IsDeleteMode = true}; GcResult Result = Gc.CollectGarbage(Settings); } // If a gc comes in between our prep and op write the chunks will be removed for (auto Attachment : OpAttachments) { CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); } { // Make sure the chunks are stored but not the referencing op Ref Project1 = ProjectStore.OpenProject("proj1"sv); Ref Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); Project1->DeleteOplog("oplog1"sv); } { Ref Project1 = ProjectStore.OpenProject("proj1"sv); Ref Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); // Equivalent of a `prep` call with tracking of ops CHECK(Oplog->CheckPendingChunkReferences(OpChunkHashes, std::chrono::hours(1)).empty()); } for (auto Attachment : OpAttachments) { CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); } { GcSettings Settings = {.CacheExpireTime = GcClock::Now(), .ProjectStoreExpireTime = GcClock::Now(), .CollectSmallObjects = true, .IsDeleteMode = true}; GcResult Result = Gc.CollectGarbage(Settings); } // Attachments should now be retained for (auto Attachment : OpAttachments) { CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); } { GcSettings Settings = {.CacheExpireTime = GcClock::Now(), .ProjectStoreExpireTime = GcClock::Now(), .CollectSmallObjects = true, .IsDeleteMode = true}; GcResult Result = Gc.CollectGarbage(Settings); } // Attachments should now be retained across multiple GCs if retain time is still valud for (auto Attachment : OpAttachments) { CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); } { Ref Project1 = ProjectStore.OpenProject("proj1"sv); Ref Oplog = Project1->OpenOplog("oplog1"sv, true, true); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); Oplog->RemovePendingChunkReferences(OpChunkHashes); CHECK(Oplog->GetPendingChunkReferencesLocked().size() == 0); } for (auto Attachment : OpAttachments) { CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); } { Ref Project1 = ProjectStore.OpenProject("proj1"sv); Project1->DeleteOplog("oplog1"sv); } { GcSettings Settings = {.CacheExpireTime = GcClock::Now(), .ProjectStoreExpireTime = GcClock::Now(), .CollectSmallObjects = true, .IsDeleteMode = true}; GcResult Result = Gc.CollectGarbage(Settings); } for (auto Attachment : OpAttachments) { CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); } { Ref Project1 = ProjectStore.OpenProject("proj1"sv); Project1->DeleteOplog("oplog1"sv); } { // Make sure the chunks are stored but not the referencing op Ref Project1 = ProjectStore.OpenProject("proj1"sv); Ref Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); Project1->DeleteOplog("oplog1"sv); } // Caution - putting breakpoints and stepping through this part of the test likely makes it fails due to expiry time of pending // chunks { Ref Project1 = ProjectStore.OpenProject("proj1"sv); Ref Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); CHECK(Oplog->CheckPendingChunkReferences(OpChunkHashes, std::chrono::milliseconds(100)).empty()); } // This pass they should be retained and while the ops are picked up in GC we are blocked from adding our op { GcSettings Settings = {.CacheExpireTime = GcClock::Now(), .ProjectStoreExpireTime = GcClock::Now(), .CollectSmallObjects = true, .IsDeleteMode = true}; GcResult Result = Gc.CollectGarbage(Settings); } for (auto Attachment : OpAttachments) { CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); } Sleep(200); // This pass they should also be retained since our age retention has kept them alive and they will now be picked up and the // retention cleared { GcSettings Settings = {.CacheExpireTime = GcClock::Now(), .ProjectStoreExpireTime = GcClock::Now(), .CollectSmallObjects = true, .IsDeleteMode = true}; GcResult Result = Gc.CollectGarbage(Settings); } for (auto Attachment : OpAttachments) { CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); } // This pass the retention time has expired and the last GC pass cleared the entries { GcSettings Settings = {.CacheExpireTime = GcClock::Now(), .ProjectStoreExpireTime = GcClock::Now(), .CollectSmallObjects = true, .IsDeleteMode = true}; GcResult Result = Gc.CollectGarbage(Settings); } for (auto Attachment : OpAttachments) { CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); } } TEST_CASE("project.store.rpc.getchunks") { using namespace std::literals; using namespace testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"sv; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv; std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv; { CreateDirectories(Project1FilePath.parent_path()); BasicFile ProjectFile; ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); } std::vector OpIds; OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()}); std::unordered_map>, Oid::Hasher> Attachments; Oid FilesOpId = Oid::NewOid(); std::vector> FilesOpIdAttachments; { Ref Project1(ProjectStore.NewProject(BasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); Ref Oplog = Project1->NewOplog("oplog1"sv, {}); CHECK(Oplog); Attachments[OpIds[0]] = {}; Attachments[OpIds[1]] = CreateAttachments(std::initializer_list{77}); Attachments[OpIds[2]] = CreateAttachments(std::initializer_list{200 * 1024, 314 * 1024, 690, 99}, OodleCompressionLevel::VeryFast, 128 * 1024); Attachments[OpIds[3]] = CreateAttachments(std::initializer_list{55, 122}); for (auto It : Attachments) { Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(It.first, It.second)); } std::filesystem::path UncompressedFilePath = RootDir / "content" / "uncompressed_file"; CreateDirectories(UncompressedFilePath.parent_path()); IoBuffer FileBlob = CreateRandomBlob(81823 * 2); WriteFile(UncompressedFilePath, FileBlob); FilesOpIdAttachments.push_back({Oid::NewOid(), UncompressedFilePath}); Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(FilesOpId, RootDir, FilesOpIdAttachments)); } auto GetChunks = [](zen::ProjectStore& ProjectStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, const CbObject& Cb) { std::vector Requests = ProjectStore.ParseChunksRequests(Project, Oplog, Cb); std::vector Results = Requests.empty() ? std::vector{} : ProjectStore.GetChunks(Project, Oplog, Requests); return ProjectStore.WriteChunksRequestResponse(Project, Oplog, std::move(Requests), std::move(Results)); }; Ref Project1 = ProjectStore.OpenProject("proj1"sv); CHECK(Project1); Ref Oplog1 = Project1->OpenOplog("oplog1"sv, false, true); CHECK(Oplog1); // Invalid request { CbObjectWriter Request; Request.BeginObject("WrongName"sv); Request.EndObject(); CbPackage Response; CHECK_THROWS(GetChunks(ProjectStore, *Project1, *Oplog1, Request.Save())); } // Empty request { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, std::vector{}, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(0, Chunks.Num()); } // Single non-existing chunk by RawHash IoHash NotFoundIoHash = IoHash::Max; { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundIoHash}, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(0, Chunks.Num()); } { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundIoHash}, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(0, Chunks.Num()); } // Single non-existing chunk by Id Oid NotFoundId = Oid::NewOid(); { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundId}, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(0, Chunks.Num()); } { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundId}, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(0, Chunks.Num()); } // Single existing chunk by RawHash { // Fresh fetch IoHash FirstAttachmentHash = Attachments[OpIds[2]][1].second.DecodeRawHash(); uint64_t ResponseModTag = 0; { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {})); CHECK_EQ(1, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); IoHash Id = Chunk["Id"].AsHash(); CHECK_EQ(FirstAttachmentHash, Id); ResponseModTag = Chunk["ModTag"].AsUInt64(); CHECK_NE(0, ResponseModTag); IoHash AttachmentHash = Chunk["RawHash"].AsHash(); const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); CHECK_NE(nullptr, Attachment); CompressedBuffer Buffer = Attachment->AsCompressedBinary(); CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } // Fetch with matching ModTag { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {ResponseModTag})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); IoHash Id = Chunk["Id"].AsHash(); CHECK_EQ(FirstAttachmentHash, Id); CHECK(!Chunk.FindView("ModTag")); CHECK(!Chunk.FindView("RawHash")); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } // Fetch with mismatching ModTag { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)})); CHECK_EQ(1, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); IoHash Id = Chunk["Id"].AsHash(); CHECK_EQ(FirstAttachmentHash, Id); ResponseModTag = Chunk["ModTag"].AsUInt64(); CHECK_NE(0, ResponseModTag); IoHash AttachmentHash = Chunk["RawHash"].AsHash(); const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); CHECK_NE(nullptr, Attachment); CompressedBuffer Buffer = Attachment->AsCompressedBinary(); CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } // Fresh modtime query { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); IoHash Id = Chunk["Id"].AsHash(); CHECK_EQ(FirstAttachmentHash, Id); uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); CHECK_EQ(ResponseModTag, ResponseModTag2); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } // Modtime query with matching ModTag { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {ResponseModTag})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); IoHash Id = Chunk["Id"].AsHash(); CHECK_EQ(FirstAttachmentHash, Id); CHECK(!Chunk.FindView("ModTag")); CHECK(!Chunk.FindView("RawHash")); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } // Modtime query with mismatching ModTag { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); IoHash Id = Chunk["Id"].AsHash(); CHECK_EQ(FirstAttachmentHash, Id); uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); CHECK_EQ(ResponseModTag, ResponseModTag2); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } } // Single existing CID chunk by Id { Oid FirstAttachmentId = Attachments[OpIds[2]][1].first; uint64_t ResponseModTag = 0; { // Full chunk request const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {})); CHECK_EQ(1, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); CHECK_EQ(FirstAttachmentId, Id); ResponseModTag = Chunk["ModTag"].AsUInt64(); CHECK_NE(0, ResponseModTag); IoHash AttachmentHash = Chunk["RawHash"].AsHash(); const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); CHECK_NE(nullptr, Attachment); CompressedBuffer Buffer = Attachment->AsCompressedBinary(); CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } { // Partial chunk request const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{130 * 1024, 8100}}, {})); CHECK_EQ(1, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); CHECK_EQ(FirstAttachmentId, Id); ResponseModTag = Chunk["ModTag"].AsUInt64(); CHECK_NE(0, ResponseModTag); IoHash AttachmentHash = Chunk["FragmentHash"].AsHash(); const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); CHECK_NE(nullptr, Attachment); std::uint64_t FragmentStart = Chunk["FragmentOffset"].AsUInt64(); CompressedBuffer Buffer = Attachment->AsCompressedBinary(); CHECK(FragmentStart <= 130 * 1024); CHECK(FragmentStart + Buffer.DecodeRawSize() >= 130 * 1024 + 8100); auto ResponseDecompressedBuffer = Buffer.Decompress(130 * 1024 - FragmentStart, 8100); auto ExpectedDecompressedBuffer = Attachments[OpIds[2]][1].second.Decompress(130 * 1024, 8100); CHECK(ResponseDecompressedBuffer.AsIoBuffer().GetView().EqualBytes(ExpectedDecompressedBuffer.AsIoBuffer().GetView())); CHECK_EQ(Chunk["RawSize"sv].AsUInt64(), Attachments[OpIds[2]][1].second.DecodeRawSize()); CHECK(!Chunk.FindView("Size")); } { // Fetch with matching ModTag const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); CHECK_EQ(FirstAttachmentId, Id); CHECK(!Chunk.FindView("ModTag")); CHECK(!Chunk.FindView("RawHash")); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } { // Fetch with mismatching ModTag const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)})); CHECK_EQ(1, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); CHECK_EQ(FirstAttachmentId, Id); uint64_t ResponseModTag3 = Chunk["ModTag"].AsUInt64(); CHECK_EQ(ResponseModTag, ResponseModTag3); IoHash AttachmentHash = Chunk["RawHash"].AsHash(); const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); CHECK_NE(nullptr, Attachment); CompressedBuffer Buffer = Attachment->AsCompressedBinary(); CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } // Fresh modtime query { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); CHECK_EQ(FirstAttachmentId, Id); uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); CHECK_EQ(ResponseModTag, ResponseModTag2); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } // Modtime query with matching ModTag { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); CHECK_EQ(FirstAttachmentId, Id); CHECK(!Chunk.FindView("ModTag")); CHECK(!Chunk.FindView("RawHash")); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } // Modtime query with mismatching ModTag { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); CHECK_EQ(FirstAttachmentId, Id); uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); CHECK_EQ(ResponseModTag, ResponseModTag2); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } } // Single existing file chunk by Id { Oid FirstAttachmentId = FilesOpIdAttachments[0].first; uint64_t ResponseModTag = 0; { // Full chunk request const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {})); CHECK_EQ(1, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); CHECK_EQ(FirstAttachmentId, Id); ResponseModTag = Chunk["ModTag"].AsUInt64(); CHECK_NE(0, ResponseModTag); IoHash AttachmentHash = Chunk["Hash"].AsHash(); const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); CHECK_NE(nullptr, Attachment); CompositeBuffer Buffer = Attachment->AsCompositeBinary(); CHECK_EQ(IoHash::HashBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten()), IoHash::HashBuffer(Buffer)); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } { // Partial chunk request const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{81823, 5434}}, {})); CHECK_EQ(1, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); CHECK_EQ(FirstAttachmentId, Id); ResponseModTag = Chunk["ModTag"].AsUInt64(); CHECK_NE(0, ResponseModTag); IoHash AttachmentHash = Chunk["Hash"].AsHash(); const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); CHECK_NE(nullptr, Attachment); CompositeBuffer Buffer = Attachment->AsCompositeBinary(); CHECK_EQ(IoHash::HashBuffer(IoBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten(), 81823, 5434)), IoHash::HashBuffer(Buffer)); CHECK_EQ(Chunk["Size"sv].AsUInt64(), FileSizeFromPath(FilesOpIdAttachments[0].second)); CHECK(!Chunk.FindView("RawSize")); } { // Fetch with matching ModTag const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); CHECK_EQ(FirstAttachmentId, Id); CHECK(!Chunk.FindView("ModTag")); CHECK(!Chunk.FindView("Hash")); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } { // Fetch with mismatching ModTag const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)})); CHECK_EQ(1, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); CHECK_EQ(FirstAttachmentId, Id); uint64_t ResponseModTag3 = Chunk["ModTag"].AsUInt64(); CHECK_EQ(ResponseModTag, ResponseModTag3); IoHash AttachmentHash = Chunk["Hash"].AsHash(); const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); CHECK_NE(nullptr, Attachment); CompositeBuffer Buffer = Attachment->AsCompositeBinary(); CHECK_EQ(IoHash::HashBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten()), IoHash::HashBuffer(Buffer)); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } // Fresh modtime query { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); CHECK_EQ(FirstAttachmentId, Id); uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); CHECK_EQ(ResponseModTag, ResponseModTag2); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } // Modtime query with matching ModTag { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); CHECK_EQ(FirstAttachmentId, Id); CHECK(!Chunk.FindView("ModTag")); CHECK(!Chunk.FindView("Hash")); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } // Modtime query with mismatching ModTag { const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); CHECK_EQ(FirstAttachmentId, Id); uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); CHECK_EQ(ResponseModTag, ResponseModTag2); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } } // Multi RawHash Request { std::vector AttachmentBuffers{Attachments[OpIds[1]][0].second, Attachments[OpIds[2]][0].second, Attachments[OpIds[2]][1].second}; std::vector AttachmentHashes{AttachmentBuffers[0].DecodeRawHash(), AttachmentBuffers[1].DecodeRawHash(), AttachmentBuffers[2].DecodeRawHash()}; std::vector ResponseModTags(3, 0); { // Fresh fetch const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, {})); CHECK_EQ(3, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); for (CbFieldView ChunkView : Chunks) { CbObjectView Chunk = ChunkView.AsObjectView(); IoHash Id = Chunk["Id"].AsHash(); auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); CHECK(It != AttachmentHashes.end()); ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); CHECK_EQ(AttachmentHashes[Index], Id); ResponseModTags[Index] = Chunk["ModTag"].AsUInt64(); CHECK_NE(0, ResponseModTags[Index]); IoHash AttachmentHash = Chunk["RawHash"].AsHash(); const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); CHECK_NE(nullptr, Attachment); CompressedBuffer Buffer = Attachment->AsCompressedBinary(); CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); CHECK(AttachmentBuffers[Index].GetCompressed().Flatten().GetView().EqualBytes(Buffer.GetCompressed().Flatten().GetView())); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } } { // Fetch with matching ModTag const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, ResponseModTags)); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); for (CbFieldView ChunkView : Chunks) { CbObjectView Chunk = ChunkView.AsObjectView(); IoHash Id = Chunk["Id"].AsHash(); auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); CHECK(It != AttachmentHashes.end()); ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); CHECK_EQ(AttachmentHashes[Index], Id); CHECK(!Chunk.FindView("ModTag")); CHECK(!Chunk.FindView("RawHash")); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } } { // Fresh modtime query const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); for (CbFieldView ChunkView : Chunks) { CbObjectView Chunk = ChunkView.AsObjectView(); IoHash Id = Chunk["Id"].AsHash(); auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); CHECK(It != AttachmentHashes.end()); ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); CHECK_EQ(AttachmentHashes[Index], Id); CHECK_EQ(ResponseModTags[Index], Chunk["ModTag"].AsUInt64()); CHECK(!Chunk.FindView("RawHash")); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } } { // Modtime query with matching ModTags const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, ResponseModTags)); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); for (CbFieldView ChunkView : Chunks) { CbObjectView Chunk = ChunkView.AsObjectView(); IoHash Id = Chunk["Id"].AsHash(); auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); CHECK(It != AttachmentHashes.end()); ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); CHECK_EQ(AttachmentHashes[Index], Id); CHECK(!Chunk.FindView("ModTag")); CHECK(!Chunk.FindView("RawHash")); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } } { // Modtime query with mismatching ModTags std::vector MismatchingModTags(ResponseModTags); for (uint64_t& Tag : MismatchingModTags) { Tag++; } const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, MismatchingModTags)); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); for (CbFieldView ChunkView : Chunks) { CbObjectView Chunk = ChunkView.AsObjectView(); IoHash Id = Chunk["Id"].AsHash(); auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); CHECK(It != AttachmentHashes.end()); ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); CHECK_EQ(AttachmentHashes[Index], Id); CHECK(Chunk["ModTag"].AsUInt64() == ResponseModTags[Index]); CHECK(!Chunk.FindView("RawHash")); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } } } // Multi Id Request { std::vector AttachmentBuffers{Attachments[OpIds[1]][0].second, Attachments[OpIds[2]][0].second, Attachments[OpIds[2]][1].second}; std::vector AttachmentHashes{AttachmentBuffers[0].DecodeRawHash(), AttachmentBuffers[1].DecodeRawHash(), AttachmentBuffers[2].DecodeRawHash()}; std::vector AttachedIds{Attachments[OpIds[1]][0].first, Attachments[OpIds[2]][0].first, Attachments[OpIds[2]][1].first}; std::vector ResponseModTags(3, 0); { // Fresh fetch const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, {})); CHECK_EQ(3, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); for (CbFieldView ChunkView : Chunks) { CbObjectView Chunk = ChunkView.AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); CHECK(It != AttachedIds.end()); ptrdiff_t Index = std::distance(AttachedIds.begin(), It); CHECK_EQ(AttachedIds[Index], Id); ResponseModTags[Index] = Chunk["ModTag"].AsUInt64(); CHECK_NE(0, ResponseModTags[Index]); IoHash AttachmentHash = Chunk["RawHash"].AsHash(); const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); CHECK_NE(nullptr, Attachment); CompressedBuffer Buffer = Attachment->AsCompressedBinary(); CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); CHECK(AttachmentBuffers[Index].GetCompressed().Flatten().GetView().EqualBytes(Buffer.GetCompressed().Flatten().GetView())); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } } { // Fetch with matching ModTag const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, ResponseModTags)); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); for (CbFieldView ChunkView : Chunks) { CbObjectView Chunk = ChunkView.AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); CHECK(It != AttachedIds.end()); ptrdiff_t Index = std::distance(AttachedIds.begin(), It); CHECK_EQ(AttachedIds[Index], Id); CHECK(!Chunk.FindView("ModTag")); CHECK(!Chunk.FindView("RawHash")); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } } { // Fresh modtime query const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); for (CbFieldView ChunkView : Chunks) { CbObjectView Chunk = ChunkView.AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); CHECK(It != AttachedIds.end()); ptrdiff_t Index = std::distance(AttachedIds.begin(), It); CHECK_EQ(AttachedIds[Index], Id); CHECK_EQ(ResponseModTags[Index], Chunk["ModTag"].AsUInt64()); CHECK(!Chunk.FindView("RawHash")); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } } { // Modtime query with matching ModTags const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, ResponseModTags)); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); for (CbFieldView ChunkView : Chunks) { CbObjectView Chunk = ChunkView.AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); CHECK(It != AttachedIds.end()); ptrdiff_t Index = std::distance(AttachedIds.begin(), It); CHECK_EQ(AttachedIds[Index], Id); CHECK(!Chunk.FindView("ModTag")); CHECK(!Chunk.FindView("RawHash")); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } } { // Modtime query with mismatching ModTags std::vector MismatchingModTags(ResponseModTags); for (uint64_t& Tag : MismatchingModTags) { Tag++; } const CbPackage& Response = GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, MismatchingModTags)); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); for (CbFieldView ChunkView : Chunks) { CbObjectView Chunk = ChunkView.AsObjectView(); Oid Id = Chunk["Id"].AsObjectId(); auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); CHECK(It != AttachedIds.end()); ptrdiff_t Index = std::distance(AttachedIds.begin(), It); CHECK_EQ(AttachedIds[Index], Id); CHECK(Chunk["ModTag"].AsUInt64() == ResponseModTags[Index]); CHECK(!Chunk.FindView("RawHash")); CHECK(!Chunk.FindView("Size")); CHECK(!Chunk.FindView("RawSize")); } } } } TEST_CASE("project.store.partial.read") { using namespace std::literals; using namespace testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"sv; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv; std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv; { CreateDirectories(Project1FilePath.parent_path()); BasicFile ProjectFile; ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); } std::vector OpIds; OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()}); std::unordered_map>, Oid::Hasher> Attachments; { Ref Project1(ProjectStore.NewProject(BasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); Ref Oplog = Project1->NewOplog("oplog1"sv, {}); CHECK(Oplog); Attachments[OpIds[0]] = {}; Attachments[OpIds[1]] = CreateAttachments(std::initializer_list{77}); Attachments[OpIds[2]] = CreateAttachments(std::initializer_list{7123, 9583, 690, 99}); Attachments[OpIds[3]] = CreateAttachments(std::initializer_list{55, 122}); for (auto It : Attachments) { Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(It.first, It.second)); } } Ref Project1 = ProjectStore.OpenProject("proj1"sv); CHECK(Project1); Ref Oplog1 = Project1->OpenOplog("oplog1"sv, false, true); CHECK(Oplog1); { uint64_t ModificationTag = 0; auto Result = ProjectStore.GetChunkRange(Log(), *Project1, *Oplog1, Attachments[OpIds[1]][0].first, 0, ~0ull, ZenContentType::kCompressedBinary, &ModificationTag); CHECK_EQ(ProjectStore::GetChunkRangeResult::EError::Ok, Result.Error); IoHash RawHash; uint64_t RawSize; CompressedBuffer Attachment = CompressedBuffer::FromCompressed(Result.Chunk, RawHash, RawSize); CHECK(RawSize == Attachments[OpIds[1]][0].second.DecodeRawSize()); auto Result2 = ProjectStore.GetChunkRange(Log(), *Project1, *Oplog1, Attachments[OpIds[1]][0].first, 0, ~0ull, ZenContentType::kCompressedBinary, &ModificationTag); CHECK_EQ(ProjectStore::GetChunkRangeResult::EError::NotModified, Result2.Error); } { uint64_t FullChunkModificationTag = 0; { auto Result = ProjectStore.GetChunkRange(Log(), *Project1, *Oplog1, Attachments[OpIds[2]][1].first, 0, ~0ull, ZenContentType::kCompressedBinary, &FullChunkModificationTag); CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::Ok); CHECK(Result.Chunk); CHECK(CompressedBuffer::FromCompressedNoValidate(std::move(Result.Chunk)).DecodeRawSize() == Attachments[OpIds[2]][1].second.DecodeRawSize()); } { auto Result = ProjectStore.GetChunkRange(Log(), *Project1, *Oplog1, Attachments[OpIds[2]][1].first, 0, ~0ull, ZenContentType::kCompressedBinary, &FullChunkModificationTag); CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::NotModified); } } { uint64_t PartialChunkModificationTag = 0; { auto Result = ProjectStore.GetChunkRange(Log(), *Project1, *Oplog1, Attachments[OpIds[2]][1].first, 5, 1773, ZenContentType::kCompressedBinary, &PartialChunkModificationTag); CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::Ok); IoHash PartialRawHash; uint64_t PartialRawSize; CompressedBuffer PartialCompressedResult = CompressedBuffer::FromCompressed(Result.Chunk, PartialRawHash, PartialRawSize); CHECK(PartialRawSize >= 1773); uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5); SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed); SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress(); const uint8_t* FullDataPtr = &(reinterpret_cast(FullDecompressed.GetView().GetData())[5]); const uint8_t* PartialDataPtr = reinterpret_cast(PartialDecompressed.GetView().GetData()); CHECK(FullDataPtr[0] == PartialDataPtr[0]); } { auto Result = ProjectStore.GetChunkRange(Log(), *Project1, *Oplog1, Attachments[OpIds[2]][1].first, 0, 1773, ZenContentType::kCompressedBinary, &PartialChunkModificationTag); CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::NotModified); } } } TEST_CASE("project.store.iterateoplog") { using namespace std::literals; using namespace testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"sv; std::filesystem::path EngineRootDir = TempDir.Path() / "enginesv"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"sv; std::filesystem::path ProjectFilePath = TempDir.Path() / "game"sv / "game.uproject"sv; { CreateDirectories(ProjectFilePath.parent_path()); BasicFile ProjectFile; ProjectFile.Open(ProjectFilePath, BasicFile::Mode::kTruncate); } std::filesystem::path ProjectOplogPath = TempDir.Path() / "game"sv / "saves"sv / "cooked"sv / ".projectstore"sv; { CreateDirectories(ProjectOplogPath.parent_path()); BasicFile OplogFile; OplogFile.Open(ProjectOplogPath, BasicFile::Mode::kTruncate); } Ref TestProject(ProjectStore.NewProject(BasePath / "proj"sv, "proj"sv, RootDir.string(), EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); Ref Oplog = TestProject->NewOplog("oplog"sv, ProjectOplogPath); CHECK(Oplog); struct TestOidData { Oid KeyAsOidNotOplogId = Oid::NewOid(); std::string Key = KeyAsOidNotOplogId.ToString(); bool bFound = false; }; constexpr int NumTestOids = 4; TestOidData TestOids[NumTestOids]; for (const TestOidData& TestOid : TestOids) { Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(TestOid.KeyAsOidNotOplogId, {})); } int Count = 0; auto ResetTest = [&Count, &TestOids]() { Count = 0; for (TestOidData& TestOid : TestOids) { TestOid.bFound = false; } }; auto IncrementCount = [&Count](CbObjectView /* Op */) { ++Count; }; auto MarkFound = [&TestOids, &Count](ProjectStore::LogSequenceNumber /* LSN */, const Oid& /* InId */, CbObjectView Op) { for (TestOidData& TestOid : TestOids) { if (Op["key"sv].AsString() == TestOid.Key) { TestOid.bFound = true; ++Count; } } }; // Tests of IterateOpLog and IterateOplogWithKey, with various Paging arguments { ResetTest(); Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{}); CHECK(Count == NumTestOids); ResetTest(); Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{}); CHECK(Count == NumTestOids); ResetTest(); Oplog->IterateOplogWithKey(MarkFound); CHECK(Count == NumTestOids); Count = 0; for (int Start = 0; Start < NumTestOids; ++Start) { for (int Size = 0; Size < NumTestOids - Start; ++Size) { ResetTest(); Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{Start, Size}); CHECK(Count == Size); ResetTest(); Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{Start, Size}); CHECK(Count == Size); } // Out of range Size arguments ResetTest(); Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{Start, -1}); CHECK(Count == NumTestOids - Start); ResetTest(); Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{Start, NumTestOids * 2}); CHECK(Count == NumTestOids - Start); } // Out of range Start arguments for (int Size = 0; Size < NumTestOids; ++Size) { ResetTest(); Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{-1, Size}); CHECK(Count == Size); ResetTest(); Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{-1, Size}); CHECK(Count == Size); ResetTest(); Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{NumTestOids, Size}); CHECK(Count == 0); ResetTest(); Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{NumTestOids, Size}); CHECK(Count == 0); } // Out of range Start and Size arguments ResetTest(); Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{-1, -1}); CHECK(Count == NumTestOids); ResetTest(); Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{-1, -1}); CHECK(Count == NumTestOids); ResetTest(); Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{-1, 2 * NumTestOids}); CHECK(Count == NumTestOids); ResetTest(); Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{-1, 2 * NumTestOids}); CHECK(Count == NumTestOids); ResetTest(); Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{NumTestOids, -1}); CHECK(Count == 0); ResetTest(); Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{NumTestOids, -1}); CHECK(Count == 0); ResetTest(); Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{NumTestOids, 2 * NumTestOids}); CHECK(Count == 0); ResetTest(); Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{NumTestOids, 2 * NumTestOids}); CHECK(Count == 0); } } #endif void prj_forcelink() { } } // namespace zen