aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/projectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-02 16:35:22 +0200
committerGitHub Enterprise <[email protected]>2025-10-02 16:35:22 +0200
commit289379b55d19e08c47afb54a363fda9478623b9d (patch)
treeda2eb6aa2f95833c0c7063c6f71dc9576512d7c1 /src/zenstore/projectstore.cpp
parentfix for RPC replay issue (wrong content-type) (#536) (diff)
downloadzen-289379b55d19e08c47afb54a363fda9478623b9d.tar.xz
zen-289379b55d19e08c47afb54a363fda9478623b9d.zip
move projectstore to zenstore (#541)
Diffstat (limited to 'src/zenstore/projectstore.cpp')
-rw-r--r--src/zenstore/projectstore.cpp8098
1 files changed, 8098 insertions, 0 deletions
diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp
new file mode 100644
index 000000000..9a02cc7f0
--- /dev/null
+++ b/src/zenstore/projectstore.cpp
@@ -0,0 +1,8098 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenstore/projectstore.h>
+
+#include <zencore/assertfmt.h>
+#include <zencore/compactbinaryutil.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/except.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/memory/llm.h>
+#include <zencore/scopeguard.h>
+#include <zencore/stream.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zenstore/caslog.h>
+#include <zenstore/cidstore.h>
+#include <zenstore/scrubcontext.h>
+#include <zenutil/parallelwork.h>
+#include <zenutil/referencemetadata.h>
+#include <zenutil/workerpools.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_set.h>
+#include <xxh3.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zenutil/chunkblock.h>
+
+# include <unordered_map>
+#endif // ZEN_WITH_TESTS
+
+namespace zen {
+
+const FLLMTag&
+GetProjectstoreTag()
+{
+ static FLLMTag _("store", FLLMTag("project"));
+
+ return _;
+}
+
+namespace {
+ bool PrepareDirectoryDelete(const std::filesystem::path& Dir, std::filesystem::path& OutDeleteDir)
+ {
+ std::filesystem::path DroppedBucketPath;
+ do
+ {
+ if (!IsDir(Dir))
+ {
+ return true;
+ }
+
+ StringBuilder<64> MovedId;
+ Oid::NewOid().ToString(MovedId);
+
+ std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), MovedId);
+ DroppedBucketPath = Dir.parent_path() / DroppedName;
+ if (IsDir(DroppedBucketPath))
+ {
+ if (!DeleteDirectories(DroppedBucketPath))
+ {
+ ZEN_INFO("Drop directory '{}' for '{}' already exists but could not be removed, attempting different name.",
+ DroppedBucketPath,
+ Dir);
+ continue;
+ }
+ if (IsDir(DroppedBucketPath))
+ {
+ ZEN_INFO("Drop directory '{}' for '{}' still exists after remove, attempting different name.", DroppedBucketPath, Dir);
+ continue;
+ }
+ }
+
+ int RenameAttempt = 0;
+ do
+ {
+ std::error_code Ec;
+ RenameDirectory(Dir, DroppedBucketPath, Ec);
+ if (!Ec)
+ {
+ OutDeleteDir = DroppedBucketPath;
+ return true;
+ }
+ if (IsDir(DroppedBucketPath))
+ {
+ ZEN_INFO("Can't rename '{}' to still existing drop directory '{}'. Reason: '{}'. Attempting different name.",
+ Dir,
+ DroppedBucketPath,
+ Ec.message());
+ break;
+ }
+ if (++RenameAttempt == 10)
+ {
+ ZEN_INFO("Can't rename '{}' to drop directory '{}' after {} attempts. Reason: {}.",
+ Dir,
+ DroppedBucketPath,
+ RenameAttempt,
+ Ec.message());
+ return false;
+ }
+ ZEN_INFO("Can't rename '{}' to drop directory '{}', pausing and retrying. Reason: {}.",
+ Dir,
+ DroppedBucketPath,
+ Ec.message());
+ Sleep(100);
+ } while (true);
+
+ } while (true);
+
+ return false;
+ }
+
+ bool IsFileOlderThan(const std::filesystem::path& CheckPath, const std::filesystem::path& ReferencePath)
+ {
+ std::error_code Ec;
+ std::filesystem::file_time_type CheckWriteTime = std::filesystem::last_write_time(CheckPath, Ec);
+ if (Ec)
+ {
+ return true;
+ }
+ std::filesystem::file_time_type ReferenceWriteTime = std::filesystem::last_write_time(ReferencePath, Ec);
+ if (Ec)
+ {
+ return true;
+ }
+ return CheckWriteTime < ReferenceWriteTime;
+ }
+
+ std::pair<int32_t, int32_t> GetPagedRange(int32_t TotalSize, const ProjectStore::Oplog::Paging& EntryPaging)
+ {
+ int32_t Start = std::clamp(EntryPaging.Start, 0, TotalSize);
+ int32_t End = EntryPaging.Count < 0 ? TotalSize : (Start + std::min<int32_t>(EntryPaging.Count, TotalSize - Start));
+ return {Start, End};
+ }
+
+#pragma pack(push)
+#pragma pack(1)
+ struct OplogIndexHeader
+ {
+ OplogIndexHeader() {}
+
+ struct BodyV1
+ {
+ uint64_t LogPosition = 0;
+ uint32_t LSNCount = 0;
+ uint64_t KeyCount = 0;
+ uint32_t OpAddressMapCount = 0;
+ uint32_t LatestOpMapCount = 0;
+ uint64_t ChunkMapCount = 0;
+ uint64_t MetaMapCount = 0;
+ uint64_t FileMapCount = 0;
+ };
+
+ struct BodyV2
+ {
+ uint64_t LogPosition = 0;
+ uint64_t KeyCount = 0;
+ uint32_t OpPayloadIndexCount = 0;
+ uint32_t OpPayloadCount = 0;
+ uint32_t ChunkMapCount = 0;
+ uint32_t MetaMapCount = 0;
+ uint32_t FileMapCount = 0;
+ uint32_t MaxLSN = 0;
+ uint32_t NextOpCoreOffset = 0; // note: Multiple of oplog data alignment!
+ uint32_t Reserved[2] = {0, 0};
+ };
+
+ static constexpr uint32_t ExpectedMagic = 0x7569647a; // 'zidx';
+ static constexpr uint32_t Version1 = 1;
+ static constexpr uint32_t Version2 = 2;
+ static constexpr uint32_t CurrentVersion = Version2;
+ static constexpr uint64_t DataAlignment = 8;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+
+ union
+ {
+ BodyV1 V1;
+ BodyV2 V2;
+ };
+
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const OplogIndexHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(OplogIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+ };
+
+#pragma pack(pop)
+
+ static_assert(sizeof(OplogIndexHeader) == 64);
+
+ static std::uint64_t GetModificationTagFromRawHash(const IoHash& Hash)
+ {
+ IoHash::Hasher H;
+ return H(Hash);
+ }
+
+ static std::uint64_t GetModificationTagFromModificationTime(IoBuffer FileBuffer)
+ {
+ IoBufferFileReference FileRef;
+ if (FileBuffer.GetFileReference(FileRef))
+ {
+ std::error_code Ec;
+ uint64_t ModificationTick = GetModificationTickFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ return ModificationTick;
+ }
+ }
+ return {};
+ }
+
+} // namespace
+
+//////////////////////////////////////////////////////////////////////////
+
+Oid
+ComputeOpKey(const CbObjectView& Op)
+{
+ using namespace std::literals;
+
+ eastl::fixed_vector<uint8_t, 256> KeyData;
+
+ Op["key"sv].WriteToStream([&](const void* Data, size_t Size) {
+ auto Begin = reinterpret_cast<const uint8_t*>(Data);
+ auto End = Begin + Size;
+ KeyData.insert(KeyData.end(), Begin, End);
+ });
+
+ XXH3_128 KeyHash128;
+
+ // This logic currently exists to work around a problem caused by misusing the xxhash
+ // functions in the past. Short keys are evaluated using the old and buggy
+ // path but longer paths are evaluated properly. In the future all key lengths
+ // should be evaluated using the proper path, this is a temporary workaround to
+ // maintain compatibility with existing disk state.
+ if (KeyData.size() < 240)
+ {
+ XXH3_128Stream_deprecated KeyHasher;
+ KeyHasher.Append(KeyData.data(), KeyData.size());
+ KeyHash128 = KeyHasher.GetHash();
+ }
+ else
+ {
+ KeyHash128 = XXH3_128::HashMemory(KeyData.data(), KeyData.size());
+ }
+
+ Oid KeyHash;
+ memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash);
+
+ return KeyHash;
+}
+
+struct ProjectStore::OplogStorage : public RefCounted
+{
+ OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath)
+ {
+ }
+
+ ~OplogStorage()
+ {
+ ZEN_DEBUG("oplog '{}/{}': closing oplog storage at {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ m_OplogStoragePath);
+ try
+ {
+ m_Oplog.Close();
+ m_OpBlobs.Close();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("oplog '{}/{}': flushing oplog at '{}' failed. Reason: '{}'",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ m_OplogStoragePath,
+ Ex.what());
+ }
+ }
+
+ [[nodiscard]] bool Exists() const { return Exists(m_OplogStoragePath); }
+ [[nodiscard]] static bool Exists(const std::filesystem::path& BasePath)
+ {
+ return IsFile(GetLogPath(BasePath)) && IsFile(GetBlobsPath(BasePath));
+ }
+ [[nodiscard]] bool IsValid() const { return IsValid(m_OplogStoragePath); }
+ [[nodiscard]] static bool IsValid(const std::filesystem::path& BasePath)
+ {
+ return TCasLogFile<OplogEntry>::IsValid(GetLogPath(BasePath));
+ }
+ void WipeState() const
+ {
+ std::error_code Ec;
+ RemoveFile(GetLogPath(), Ec);
+ RemoveFile(GetBlobsPath(), Ec);
+ }
+
+ static bool Delete(const std::filesystem::path& BasePath) { return DeleteDirectories(BasePath); }
+
+ uint64_t OpBlobsSize() const { return FileSizeFromPath(GetBlobsPath()); }
+
+ uint64_t OpsSize() const { return OpsSize(m_OplogStoragePath); }
+ static uint64_t OpsSize(const std::filesystem::path& BasePath)
+ {
+ if (Exists(BasePath))
+ {
+ std::error_code DummyEc;
+ return FileSizeFromPath(GetLogPath(BasePath)) + FileSizeFromPath(GetBlobsPath(BasePath));
+ }
+ return 0;
+ }
+
+ uint32_t MaxLSN() const { return m_MaxLsn; }
+ void SetMaxLSNAndNextWriteAddress(uint32_t MaxLSN, const OplogEntryAddress& NextOpFileOffset)
+ {
+ m_MaxLsn.store(MaxLSN);
+ m_NextOpsOffset = RoundUp((NextOpFileOffset.Offset * m_OpsAlign) + NextOpFileOffset.Size, m_OpsAlign);
+ }
+
+ void SetMaxLSNAndNextOpOffset(uint32_t MaxLSN, uint32_t NextOpOffset)
+ {
+ m_MaxLsn.store(MaxLSN);
+ m_NextOpsOffset = NextOpOffset * m_OpsAlign;
+ }
+
+ uint32_t GetNextOpOffset() const { return gsl::narrow<uint32_t>(m_NextOpsOffset / m_OpsAlign); }
+
+ enum EMode
+ {
+ Create,
+ Read,
+ Write
+ };
+
+ void Open(EMode InMode)
+ {
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::OplogStorage::Open");
+
+ CasLogFile::Mode LogMode = CasLogFile::Mode::kRead;
+ BasicFile::Mode BlobsMode = BasicFile::Mode::kRead;
+
+ if (InMode == EMode::Create)
+ {
+ ZEN_DEBUG("oplog '{}/{}': initializing storage at '{}'",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ m_OplogStoragePath);
+
+ DeleteDirectories(m_OplogStoragePath);
+ CreateDirectories(m_OplogStoragePath);
+
+ LogMode = CasLogFile::Mode::kTruncate;
+ BlobsMode = BasicFile::Mode::kTruncate;
+ }
+ else if (InMode == EMode::Write)
+ {
+ LogMode = CasLogFile::Mode::kWrite;
+ BlobsMode = BasicFile::Mode::kWrite;
+ ZEN_DEBUG("oplog '{}/{}': opening storage at '{}'",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ m_OplogStoragePath);
+ }
+ else if (InMode == EMode::Read)
+ {
+ ZEN_DEBUG("oplog '{}/{}': opening read only storage at '{}'",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ m_OplogStoragePath);
+ }
+
+ m_Oplog.Open(GetLogPath(m_OplogStoragePath), LogMode);
+ m_Oplog.Initialize();
+
+ m_OpBlobs.Open(GetBlobsPath(m_OplogStoragePath), BlobsMode);
+
+ ZEN_ASSERT(IsPow2(m_OpsAlign));
+ ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1)));
+ }
+
+ IoBuffer GetOpBuffer(BasicFileBuffer& OpBlobsBuffer, const OplogEntry& LogEntry) const
+ {
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ const uint64_t OpFileOffset = LogEntry.OpCoreAddress.Offset * m_OpsAlign;
+ const MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreAddress.Size, OpFileOffset);
+ if (OpBufferView.GetSize() == LogEntry.OpCoreAddress.Size)
+ {
+ return IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize());
+ }
+ else
+ {
+ IoBuffer OpBuffer(LogEntry.OpCoreAddress.Size);
+ OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreAddress.Size, OpFileOffset);
+ return OpBuffer;
+ }
+ }
+
+ uint64_t GetEffectiveBlobsSize(std::span<const Oplog::OplogPayload> Addresses) const
+ {
+ uint64_t EffectiveSize = 0;
+ for (const Oplog::OplogPayload& OpPayload : Addresses)
+ {
+ EffectiveSize += RoundUp(OpPayload.Address.Size, m_OpsAlign);
+ }
+ return EffectiveSize;
+ }
+
+ void Compact(std::span<const ProjectStore::LogSequenceNumber> LSNs,
+ std::function<void(const Oid& OpKeyHash,
+ ProjectStore::LogSequenceNumber OldLSN,
+ ProjectStore::LogSequenceNumber NewLSN,
+ const OplogEntryAddress& NewAddress)>&& Callback,
+ bool RetainLSNs,
+ bool DryRun)
+ {
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::OplogStorage::Compact");
+
+ ZEN_INFO("oplog '{}/{}': compacting at '{}'",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ m_OplogStoragePath);
+
+ Stopwatch Timer;
+
+ StringBuilder<64> OplogName;
+ Oid::NewOid().ToString(OplogName);
+
+ std::filesystem::path OplogPath = m_OplogStoragePath / OplogName.c_str();
+
+ std::error_code Ec;
+ TCasLogFile<OplogEntry> Oplog;
+ Oplog.Open(OplogPath, CasLogFile::Mode::kTruncate);
+ (void)Oplog.Initialize();
+
+ TemporaryFile OpBlobs;
+ OpBlobs.CreateTemporary(m_OplogStoragePath, Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to create temp file for op blob at '{}'", m_OplogStoragePath));
+ }
+
+ try
+ {
+ std::vector<OplogEntry> Ops;
+ Ops.reserve(LSNs.size());
+
+ ProjectStore::LsnMap<size_t> LSNToIndex;
+ LSNToIndex.reserve(LSNs.size());
+ for (ProjectStore::LogSequenceNumber LSN : LSNs)
+ {
+ LSNToIndex[LSN] = (size_t)-1;
+ }
+
+ RwLock::ExclusiveLockScope Lock(m_RwLock);
+
+ const uint64_t SkipEntryCount = 0;
+ m_Oplog.Replay(
+ [&](const OplogEntry& LogEntry) {
+ if (auto It = LSNToIndex.find(LogEntry.OpLsn); It != LSNToIndex.end())
+ {
+ if (It->second != (size_t)-1)
+ {
+ Ops[It->second] = LogEntry;
+ }
+ else
+ {
+ LSNToIndex[LogEntry.OpLsn] = Ops.size();
+ Ops.push_back(LogEntry);
+ }
+ }
+ },
+ SkipEntryCount);
+
+ std::sort(Ops.begin(), Ops.end(), [&](const OplogEntry& Lhs, const OplogEntry& Rhs) {
+ return Lhs.OpCoreAddress.Offset < Rhs.OpCoreAddress.Offset;
+ });
+
+ std::vector<ProjectStore::LogSequenceNumber> OldLSNs;
+ OldLSNs.reserve(Ops.size());
+
+ uint64_t OpWriteOffset = 0;
+ uint32_t MaxLSN = 0;
+ {
+ BasicFileBuffer OldBlobsBuffer(m_OpBlobs, 65536);
+ BasicFileWriter NewOpBlobsBuffer(OpBlobs, 65536);
+
+ for (OplogEntry& LogEntry : Ops)
+ {
+ OldLSNs.push_back(LogEntry.OpLsn);
+
+ IoBuffer OpBuffer = GetOpBuffer(OldBlobsBuffer, LogEntry);
+ if (RetainLSNs)
+ {
+ MaxLSN = Max(MaxLSN, LogEntry.OpLsn.Number);
+ }
+ else
+ {
+ LogEntry.OpLsn = ProjectStore::LogSequenceNumber(++MaxLSN);
+ }
+ LogEntry.OpCoreAddress.Offset = gsl::narrow<uint32_t>(OpWriteOffset / m_OpsAlign);
+ NewOpBlobsBuffer.Write(OpBuffer.GetData(), LogEntry.OpCoreAddress.Size, OpWriteOffset);
+ OpWriteOffset = RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign);
+ }
+ Oplog.Append(Ops);
+ }
+
+ uint64_t OldOpLogSize = m_Oplog.GetLogSize();
+ uint64_t OldOpBlobsSize = m_OpBlobs.FileSize();
+
+ if (!DryRun)
+ {
+ m_Oplog.Close();
+ m_OpBlobs.Close();
+
+ Oplog.Close();
+ RenameFile(OplogPath, GetLogPath(), Ec);
+ if (Ec)
+ {
+ throw std::system_error(
+ Ec,
+ fmt::format("Oplog::Compact failed to rename temporary oplog blob storage file from '{}' to '{}'",
+ OplogPath,
+ GetLogPath()));
+ }
+ OpBlobs.MoveTemporaryIntoPlace(GetBlobsPath(), Ec);
+ if (Ec)
+ {
+ // We failed late - clean everything up as best we can
+ RemoveFile(OpBlobs.GetPath(), Ec);
+ RemoveFile(GetLogPath(), Ec);
+ RemoveFile(GetBlobsPath(), Ec);
+ throw std::system_error(Ec,
+ fmt::format("Oplog::Compact failed to rename temporary oplog file from '{}' to '{}'",
+ OpBlobs.GetPath(),
+ GetBlobsPath()));
+ }
+
+ m_Oplog.Open(GetLogPath(), CasLogFile::Mode::kWrite);
+ m_Oplog.Initialize();
+
+ m_OpBlobs.Open(GetBlobsPath(), BasicFile::Mode::kWrite);
+
+ m_MaxLsn.store(MaxLSN);
+ m_NextOpsOffset.store(OpWriteOffset);
+ }
+
+ for (size_t Index = 0; Index < Ops.size(); Index++)
+ {
+ const OplogEntry& LogEntry = Ops[Index];
+ Callback(LogEntry.OpKeyHash, OldLSNs[Index], LogEntry.OpLsn, LogEntry.OpCoreAddress);
+ }
+
+ ZEN_INFO("oplog '{}/{}': compact completed in {} - Max LSN# {}, New size: {}, old size {}.",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ m_MaxLsn.load(),
+ NiceBytes(m_Oplog.GetLogSize() + m_OpBlobs.FileSize()),
+ NiceBytes(OldOpLogSize + OldOpBlobsSize));
+ }
+ catch (const std::exception& /*Ex*/)
+ {
+ RemoveFile(OpBlobs.GetPath(), Ec);
+ throw;
+ }
+ }
+
+ static std::filesystem::path GetLogPath(const std::filesystem::path& OplogStoragePath)
+ {
+ using namespace std::literals;
+ return OplogStoragePath / "ops.zlog"sv;
+ }
+
+ static std::filesystem::path GetBlobsPath(const std::filesystem::path& OplogStoragePath)
+ {
+ using namespace std::literals;
+ return OplogStoragePath / "ops.zops"sv;
+ }
+
+ std::filesystem::path GetLogPath() const { return GetLogPath(m_OplogStoragePath); }
+
+ std::filesystem::path GetBlobsPath() const { return GetBlobsPath(m_OplogStoragePath); }
+
+ void ReadOplogEntriesFromLog(std::vector<OplogEntry>& OutOpLogEntries, uint64_t& OutInvalidEntries, uint64_t SkipEntryCount)
+ {
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::OplogStorage::ReadOplogEntriesFromLog");
+
+ if (m_Oplog.GetLogCount() == SkipEntryCount)
+ {
+ return;
+ }
+
+ Stopwatch Timer;
+
+ uint64_t OpsBlockSize = m_OpBlobs.FileSize();
+
+ {
+ tsl::robin_map<Oid, size_t, Oid::Hasher> LatestKeys;
+ if (m_Oplog.GetLogCount() > SkipEntryCount)
+ {
+ LatestKeys.reserve(m_Oplog.GetLogCount() - SkipEntryCount);
+ }
+
+ m_Oplog.Replay(
+ [&](const OplogEntry& LogEntry) {
+ if (LogEntry.IsTombstone())
+ {
+ if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It == LatestKeys.end())
+ {
+ ZEN_SCOPED_WARN("found tombstone referencing unknown key {}", LogEntry.OpKeyHash);
+ ++OutInvalidEntries;
+ return;
+ }
+ }
+ else if (LogEntry.OpCoreAddress.Size == 0)
+ {
+ ZEN_SCOPED_WARN("skipping zero size op {}", LogEntry.OpKeyHash);
+ ++OutInvalidEntries;
+ return;
+ }
+ else if (LogEntry.OpLsn.Number == 0)
+ {
+ ZEN_SCOPED_WARN("skipping zero lsn op {}", LogEntry.OpKeyHash);
+ ++OutInvalidEntries;
+ return;
+ }
+
+ const uint64_t OpFileOffset = LogEntry.OpCoreAddress.Offset * m_OpsAlign;
+ if ((OpFileOffset + LogEntry.OpCoreAddress.Size) > OpsBlockSize)
+ {
+ ZEN_SCOPED_WARN("skipping out of bounds op {}", LogEntry.OpKeyHash);
+ ++OutInvalidEntries;
+ return;
+ }
+
+ if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It != LatestKeys.end())
+ {
+ OplogEntry& Entry = OutOpLogEntries[It->second];
+
+ if (LogEntry.IsTombstone() && Entry.IsTombstone())
+ {
+ ZEN_SCOPED_WARN("found double tombstone - {}", LogEntry.OpKeyHash);
+ }
+
+ Entry = LogEntry;
+ }
+ else
+ {
+ const size_t OpIndex = OutOpLogEntries.size();
+ LatestKeys[LogEntry.OpKeyHash] = OpIndex;
+ OutOpLogEntries.push_back(LogEntry);
+ }
+ },
+ SkipEntryCount);
+ }
+ }
+
+ void ReplayLog(std::function<void(CbObjectView, const OplogEntry&)>&& Handler, uint64_t SkipEntryCount = 0)
+ {
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::OplogStorage::ReplayLog");
+
+ if (m_Oplog.GetLogCount() == SkipEntryCount)
+ {
+ return;
+ }
+
+ Stopwatch Timer;
+
+ std::vector<OplogEntry> OpLogEntries;
+ uint64_t InvalidEntries = 0;
+
+ ReadOplogEntriesFromLog(OpLogEntries, InvalidEntries, SkipEntryCount);
+
+ uint64_t TombstoneEntries = 0;
+
+ BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
+
+ uint32_t MaxOpLsn = m_MaxLsn;
+ uint64_t NextOpFileOffset = m_NextOpsOffset;
+
+ std::sort(OpLogEntries.begin(), OpLogEntries.end(), [&](const OplogEntry& Lhs, const OplogEntry& Rhs) {
+ return Lhs.OpCoreAddress.Offset < Rhs.OpCoreAddress.Offset;
+ });
+
+ for (const OplogEntry& LogEntry : OpLogEntries)
+ {
+ if (LogEntry.IsTombstone())
+ {
+ TombstoneEntries++;
+ }
+ else
+ {
+ IoBuffer OpBuffer = GetOpBuffer(OpBlobsBuffer, LogEntry);
+
+ // Verify checksum, ignore op data if incorrect
+
+ const uint32_t ExpectedOpCoreHash = LogEntry.OpCoreHash;
+ const uint32_t OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.GetData(), LogEntry.OpCoreAddress.Size) & 0xffffFFFF);
+
+ if (OpCoreHash != ExpectedOpCoreHash)
+ {
+ ZEN_WARN("oplog '{}/{}': skipping bad checksum op - {}. Expected: {}, found: {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ LogEntry.OpKeyHash,
+ ExpectedOpCoreHash,
+ OpCoreHash);
+ }
+ else if (CbValidateError Err = ValidateCompactBinary(OpBuffer.GetView(), CbValidateMode::Default);
+ Err != CbValidateError::None)
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Error: '{}'",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ LogEntry.OpKeyHash,
+ ToString(Err));
+ }
+ else
+ {
+ CbObjectView OpView(OpBuffer.GetData());
+ if (OpView.GetSize() != OpBuffer.GetSize())
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ LogEntry.OpKeyHash,
+ OpView.GetSize(),
+ OpBuffer.GetSize());
+ }
+ else
+ {
+ Handler(OpView, LogEntry);
+ MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn.Number);
+ const uint64_t EntryNextOpFileOffset =
+ RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign);
+ NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset);
+ }
+ }
+ }
+ }
+
+ m_MaxLsn = MaxOpLsn;
+ m_NextOpsOffset = NextOpFileOffset;
+
+ ZEN_INFO("oplog '{}/{}': replay from '{}' completed in {} - Max LSN# {}, Next offset: {}, {} tombstones, {} invalid entries",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ m_OplogStoragePath,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ m_MaxLsn.load(),
+ m_NextOpsOffset.load(),
+ TombstoneEntries,
+ InvalidEntries);
+ }
+
+ void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries,
+ const std::span<const Oplog::PayloadIndex> Order,
+ std::function<void(LogSequenceNumber Lsn, CbObjectView)>&& Handler)
+ {
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries");
+
+ BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
+
+ for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order)
+ {
+ const Oplog::OplogPayload& Entry = Entries[EntryOffset];
+
+ const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign;
+ MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset);
+ if (OpBufferView.GetSize() == Entry.Address.Size)
+ {
+ if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None)
+ {
+ CbObjectView OpView(OpBufferView.GetData());
+ if (OpView.GetSize() != OpBufferView.GetSize())
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ OpView.GetSize(),
+ OpBufferView.GetSize());
+ }
+ else
+ {
+ Handler(Entry.Lsn, OpView);
+ }
+ }
+ else
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ ToString(Error));
+ }
+ }
+ else
+ {
+ IoBuffer OpBuffer(Entry.Address.Size);
+ OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset);
+ OpBufferView = OpBuffer.GetView();
+ if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None)
+ {
+ CbObjectView OpView(OpBuffer.Data());
+ if (OpView.GetSize() != OpBuffer.GetSize())
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ OpView.GetSize(),
+ OpBuffer.GetSize());
+ }
+ else
+ {
+ Handler(Entry.Lsn, OpView);
+ }
+ }
+ else
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ ToString(Error));
+ }
+ }
+ }
+ }
+
+ CbObject GetOp(const OplogEntryAddress& Entry)
+ {
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::OplogStorage::GetOp");
+
+ IoBuffer OpBuffer(Entry.Size);
+
+ const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign;
+ m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset);
+
+ return CbObject(SharedBuffer(std::move(OpBuffer)));
+ }
+
+ struct AppendOpData
+ {
+ MemoryView Buffer;
+ uint32_t OpCoreHash;
+ Oid KeyHash;
+ };
+
+ static OplogStorage::AppendOpData GetAppendOpData(const CbObjectView& Core)
+ {
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ using namespace std::literals;
+
+ AppendOpData OpData;
+
+ OpData.Buffer = Core.GetView();
+ const uint64_t WriteSize = OpData.Buffer.GetSize();
+ OpData.OpCoreHash = uint32_t(XXH3_64bits(OpData.Buffer.GetData(), WriteSize) & 0xffffFFFF);
+
+ ZEN_ASSERT(WriteSize != 0);
+
+ OpData.KeyHash = ComputeOpKey(Core);
+
+ return OpData;
+ }
+
+ OplogEntry AppendOp(const AppendOpData& OpData)
+ {
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::OplogStorage::AppendOp");
+
+ uint64_t WriteSize = OpData.Buffer.GetSize();
+
+ RwLock::ExclusiveLockScope Lock(m_RwLock);
+ const uint64_t WriteOffset = m_NextOpsOffset;
+ const LogSequenceNumber OpLsn(++m_MaxLsn);
+ if (!OpLsn.Number)
+ {
+ ZEN_ERROR("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId());
+ throw std::runtime_error(fmt::format("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId()));
+ }
+ m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign);
+ Lock.ReleaseNow();
+
+ ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign));
+
+ OplogEntry Entry = {.OpLsn = OpLsn,
+ .OpCoreAddress = {.Offset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign), .Size = uint32_t(WriteSize)},
+ .OpCoreHash = OpData.OpCoreHash,
+ .OpKeyHash = OpData.KeyHash};
+
+ m_OpBlobs.Write(OpData.Buffer.GetData(), WriteSize, WriteOffset);
+ m_Oplog.Append(Entry);
+
+ return Entry;
+ }
+
+ std::vector<OplogEntry> AppendOps(std::span<const AppendOpData> Ops)
+ {
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::OplogStorage::AppendOps");
+
+ size_t OpCount = Ops.size();
+ std::vector<std::pair<uint64_t, uint64_t>> OffsetAndSizes;
+ std::vector<LogSequenceNumber> OpLsns;
+ OffsetAndSizes.resize(OpCount);
+ OpLsns.resize(OpCount);
+
+ for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
+ {
+ OffsetAndSizes[OpIndex].second = Ops[OpIndex].Buffer.GetSize();
+ }
+
+ uint64_t WriteStart = 0;
+ uint64_t WriteLength = 0;
+ {
+ RwLock::ExclusiveLockScope Lock(m_RwLock);
+ WriteStart = m_NextOpsOffset;
+ ZEN_ASSERT(IsMultipleOf(WriteStart, m_OpsAlign));
+ uint64_t WriteOffset = WriteStart;
+ for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
+ {
+ OffsetAndSizes[OpIndex].first = WriteOffset - WriteStart;
+ OpLsns[OpIndex] = LogSequenceNumber(++m_MaxLsn);
+ if (!OpLsns[OpIndex])
+ {
+ ZEN_ERROR("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId());
+ throw std::runtime_error(fmt::format("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId()));
+ }
+ WriteOffset = RoundUp(WriteOffset + OffsetAndSizes[OpIndex].second, m_OpsAlign);
+ }
+ WriteLength = WriteOffset - WriteStart;
+ m_NextOpsOffset = RoundUp(WriteOffset, m_OpsAlign);
+ }
+
+ IoBuffer WriteBuffer(WriteLength);
+
+ std::vector<OplogEntry> Entries;
+ Entries.resize(OpCount);
+ for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
+ {
+ MutableMemoryView WriteBufferView = WriteBuffer.GetMutableView().RightChop(OffsetAndSizes[OpIndex].first);
+ WriteBufferView.CopyFrom(Ops[OpIndex].Buffer);
+ Entries[OpIndex] = {
+ .OpLsn = OpLsns[OpIndex],
+ .OpCoreAddress = {.Offset = gsl::narrow_cast<uint32_t>((WriteStart + OffsetAndSizes[OpIndex].first) / m_OpsAlign),
+ .Size = uint32_t(OffsetAndSizes[OpIndex].second)},
+ .OpCoreHash = Ops[OpIndex].OpCoreHash,
+ .OpKeyHash = Ops[OpIndex].KeyHash};
+ }
+
+ m_OpBlobs.Write(WriteBuffer.GetData(), WriteBuffer.GetSize(), WriteStart);
+ m_Oplog.Append(Entries);
+
+ return Entries;
+ }
+
+ void AppendTombstone(Oid KeyHash)
+ {
+ OplogEntry Entry = {.OpKeyHash = KeyHash};
+ Entry.MakeTombstone();
+
+ m_Oplog.Append(Entry);
+ }
+
+ void Flush()
+ {
+ m_Oplog.Flush();
+ m_OpBlobs.Flush();
+ }
+ uint64_t LogCount() const { return m_Oplog.GetLogCount(); }
+
+ LoggerRef Log() { return m_OwnerOplog->Log(); }
+
+private:
+ ProjectStore::Oplog* m_OwnerOplog;
+ std::filesystem::path m_OplogStoragePath;
+ mutable RwLock m_RwLock;
+ TCasLogFile<OplogEntry> m_Oplog;
+ BasicFile m_OpBlobs;
+ std::atomic<uint64_t> m_NextOpsOffset{0};
+ uint64_t m_OpsAlign = 32;
+ std::atomic<uint32_t> m_MaxLsn{0};
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+ProjectStore::Oplog::Oplog(const LoggerRef& InLog,
+ std::string_view ProjectIdentifier,
+ std::string_view Id,
+ CidStore& Store,
+ const std::filesystem::path& BasePath,
+ const std::filesystem::path& MarkerPath,
+ EMode State)
+: m_Log(InLog)
+, m_OuterProjectId(ProjectIdentifier)
+, m_OplogId(Id)
+, m_CidStore(Store)
+, m_BasePath(BasePath)
+, m_MarkerPath(MarkerPath)
+, m_TempPath(m_BasePath / std::string_view("temp"))
+, m_MetaPath(m_BasePath / std::string_view("ops.meta"))
+, m_Mode(State)
+, m_MetaValid(false)
+, m_PendingPrepOpAttachmentsRetainEnd(GcClock::Now())
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ using namespace std::literals;
+
+ m_Storage = new OplogStorage(this, m_BasePath);
+ OplogStorage::EMode StorageMode = OplogStorage::EMode::Write;
+ if (m_Mode == EMode::kBasicReadOnly)
+ {
+ StorageMode = OplogStorage::EMode::Read;
+ }
+ else
+ {
+ bool StoreExists = m_Storage->Exists();
+ if (StoreExists)
+ {
+ if (!m_Storage->IsValid())
+ {
+ ZEN_WARN("Invalid oplog found at '{}'. Wiping state for oplog.", m_BasePath);
+ m_Storage->WipeState();
+ std::error_code DummyEc;
+ RemoveFile(m_MetaPath, DummyEc);
+ StoreExists = false;
+ }
+ }
+ if (!StoreExists)
+ {
+ StorageMode = OplogStorage::EMode::Create;
+ }
+ }
+ m_Storage->Open(StorageMode);
+ m_MetaValid = !IsFileOlderThan(m_MetaPath, m_Storage->GetBlobsPath());
+
+ CleanDirectory(m_TempPath, /*ForceRemoveReadOnlyFiles*/ false);
+}
+
+ProjectStore::Oplog::~Oplog()
+{
+ if (m_Storage)
+ {
+ Flush();
+ }
+}
+
+void
+ProjectStore::Oplog::Flush()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ ZEN_TRACE_CPU("Oplog::Flush");
+
+ if (m_Mode == EMode::kFull)
+ {
+ RwLock::SharedLockScope Lock(m_OplogLock);
+
+ if (!m_Storage)
+ {
+ return;
+ }
+
+ m_Storage->Flush();
+ if (!m_MetaValid)
+ {
+ std::error_code DummyEc;
+ RemoveFile(m_MetaPath, DummyEc);
+ }
+
+ uint64_t LogCount = m_Storage->LogCount();
+ if (m_LogFlushPosition != LogCount || m_IsLegacySnapshot)
+ {
+ WriteIndexSnapshot();
+ }
+ }
+}
+
+void
+ProjectStore::Oplog::RefreshLsnToPayloadOffsetMap(RwLock::ExclusiveLockScope&)
+{
+ if (!m_LsnToPayloadOffsetMap)
+ {
+ m_LsnToPayloadOffsetMap = std::make_unique<LsnMap<PayloadIndex>>();
+ m_LsnToPayloadOffsetMap->reserve(m_OpLogPayloads.size());
+ for (uint32_t PayloadOffset = 0; PayloadOffset < m_OpLogPayloads.size(); PayloadOffset++)
+ {
+ const OplogPayload& Payload = m_OpLogPayloads[PayloadOffset];
+ if (Payload.Lsn.Number != 0 && Payload.Address.Size != 0)
+ {
+ m_LsnToPayloadOffsetMap->insert_or_assign(Payload.Lsn, PayloadIndex(PayloadOffset));
+ }
+ }
+ }
+}
+
+void
+ProjectStore::Oplog::Scrub(ScrubContext& Ctx)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ ZEN_ASSERT(m_Mode == EMode::kFull);
+
+ std::vector<std::pair<LogSequenceNumber, Oid>> BadEntries;
+
+ using namespace std::literals;
+
+ IterateOplogWithKey([&](LogSequenceNumber Lsn, const Oid& Key, CbObjectView Op) {
+ {
+ const Oid KeyHash = ComputeOpKey(Op);
+ if (KeyHash != Key)
+ {
+ BadEntries.push_back({Lsn, Key});
+ ZEN_WARN("Scrub: oplog data does not match information from index (op:{} != index:{})", KeyHash, Key);
+ return;
+ }
+ }
+
+ // TODO: Should we really delete an Op because it points to a missing or malformed Cid chunk?
+
+ Op.IterateAttachments([&](CbFieldView Visitor) {
+ const IoHash Cid = Visitor.AsAttachment();
+ if (Ctx.IsBadCid(Cid))
+ {
+ // oplog entry references a CAS chunk which has been flagged as bad
+ BadEntries.push_back({Lsn, Key});
+ return;
+ }
+ if (!m_CidStore.ContainsChunk(Cid))
+ {
+ // oplog entry references a CAS chunk which is not present
+ BadEntries.push_back({Lsn, Key});
+ return;
+ }
+ });
+ });
+
+ if (!BadEntries.empty())
+ {
+ if (Ctx.RunRecovery())
+ {
+ ZEN_WARN("oplog '{}/{}': scrubbing found {} bad ops in oplog @ '{}', these will be removed from the index",
+ m_OuterProjectId,
+ m_OplogId,
+ BadEntries.size(),
+ m_BasePath);
+
+ // Actually perform some clean-up
+ RwLock::ExclusiveLockScope Lock(m_OplogLock);
+
+ RefreshLsnToPayloadOffsetMap(Lock);
+
+ for (const auto& BadEntry : BadEntries)
+ {
+ const LogSequenceNumber BadLsn = BadEntry.first;
+ const Oid& BadKey = BadEntry.second;
+ if (auto It = m_OpToPayloadOffsetMap.find(BadKey); It != m_OpToPayloadOffsetMap.end())
+ {
+ OplogPayload& Payload = m_OpLogPayloads[It->second];
+ if (Payload.Lsn == BadLsn)
+ {
+ m_OpToPayloadOffsetMap.erase(It);
+ m_Storage->AppendTombstone(BadKey);
+ }
+ }
+ if (auto LsnIt = m_LsnToPayloadOffsetMap->find(BadLsn); LsnIt != m_LsnToPayloadOffsetMap->end())
+ {
+ const PayloadIndex LsnPayloadOffset = LsnIt->second;
+ OplogPayload& LsnPayload = m_OpLogPayloads[LsnPayloadOffset];
+ LsnPayload = {.Lsn = LogSequenceNumber(0), .Address = {.Offset = 0, .Size = 0}};
+ }
+ m_LsnToPayloadOffsetMap->erase(BadLsn);
+ }
+ if (!BadEntries.empty())
+ {
+ m_MetaValid = false;
+ }
+ }
+ else
+ {
+ ZEN_WARN("oplog '{}/{}': scrubbing found {} bad ops in oplog @ '{}' but no cleanup will be performed",
+ m_OuterProjectId,
+ m_OplogId,
+ BadEntries.size(),
+ m_BasePath);
+ }
+ }
+}
+
+uint64_t
+ProjectStore::Oplog::TotalSize(const std::filesystem::path& BasePath)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ using namespace std::literals;
+
+ uint64_t Size = OplogStorage::OpsSize(BasePath);
+ std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv;
+ if (IsFile(StateFilePath))
+ {
+ Size += FileSizeFromPath(StateFilePath);
+ }
+ std::filesystem::path MetaFilePath = BasePath / "ops.meta"sv;
+ if (IsFile(MetaFilePath))
+ {
+ Size += FileSizeFromPath(MetaFilePath);
+ }
+ std::filesystem::path IndexFilePath = BasePath / "ops.zidx"sv;
+ if (IsFile(IndexFilePath))
+ {
+ Size += FileSizeFromPath(IndexFilePath);
+ }
+
+ return Size;
+}
+
+uint64_t
+ProjectStore::Oplog::TotalSize() const
+{
+ return TotalSize(m_BasePath);
+}
+
+void
+ProjectStore::Oplog::ResetState()
+{
+ RwLock::ExclusiveLockScope _(m_OplogLock);
+ m_ChunkMap.clear();
+ m_MetaMap.clear();
+ m_FileMap.clear();
+ m_OpToPayloadOffsetMap.clear();
+ m_OpLogPayloads.clear();
+ m_LsnToPayloadOffsetMap.reset();
+ m_Storage = {};
+}
+
+bool
+ProjectStore::Oplog::PrepareForDelete(std::filesystem::path& OutRemoveDirectory)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ RwLock::ExclusiveLockScope _(m_OplogLock);
+ m_UpdateCaptureRefCounter = 0;
+ m_CapturedOps.reset();
+ m_CapturedAttachments.reset();
+ m_PendingPrepOpAttachments.clear();
+ m_ChunkMap.clear();
+ m_MetaMap.clear();
+ m_FileMap.clear();
+ m_OpToPayloadOffsetMap.clear();
+ m_OpLogPayloads.clear();
+ m_LsnToPayloadOffsetMap.reset();
+ m_Storage = {};
+ if (PrepareDirectoryDelete(m_BasePath, OutRemoveDirectory))
+ {
+ return true;
+ }
+ return false;
+}
+
+bool
+ProjectStore::Oplog::ExistsAt(const std::filesystem::path& BasePath)
+{
+ using namespace std::literals;
+
+ std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv;
+ return IsFile(StateFilePath);
+}
+
+bool
+ProjectStore::Oplog::Exists() const
+{
+ return ExistsAt(m_BasePath);
+}
+
+void
+ProjectStore::Oplog::Read()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ using namespace std::literals;
+ ZEN_TRACE_CPU("Oplog::Read");
+ ZEN_LOG_SCOPE("Oplog::Read '{}'", m_OplogId);
+
+ ZEN_DEBUG("oplog '{}/{}': reading from '{}'. State: {}",
+ m_OuterProjectId,
+ m_OplogId,
+ m_BasePath,
+ m_Mode == EMode::kFull ? "Full" : "BasicNoLookups");
+
+ std::optional<CbObject> Config = ReadStateFile(m_BasePath, [this]() { return Log(); });
+ if (Config.has_value())
+ {
+ if (Config.value().GetSize() == 0)
+ {
+ // Invalid config file
+ return;
+ }
+ m_MarkerPath = Config.value()["gcpath"sv].AsU8String();
+ }
+
+ if (!m_MetaValid)
+ {
+ std::error_code DummyEc;
+ RemoveFile(m_MetaPath, DummyEc);
+ }
+
+ ZEN_ASSERT(!m_LsnToPayloadOffsetMap);
+
+ ReadIndexSnapshot();
+
+ if (m_Mode == EMode::kFull)
+ {
+ m_Storage->ReplayLog(
+ [&](CbObjectView Op, const OplogEntry& OpEntry) {
+ const OplogEntryMapping OpMapping = GetMapping(Op);
+ // Update chunk id maps
+ for (const ChunkMapping& Chunk : OpMapping.Chunks)
+ {
+ m_ChunkMap.insert_or_assign(Chunk.Id, Chunk.Hash);
+ }
+
+ for (const FileMapping& File : OpMapping.Files)
+ {
+ if (File.Hash != IoHash::Zero)
+ {
+ m_ChunkMap.insert_or_assign(File.Id, File.Hash);
+ }
+ m_FileMap.insert_or_assign(File.Id,
+ FileMapEntry{.ServerPath = File.Hash == IoHash::Zero ? File.ServerPath : std::string(),
+ .ClientPath = File.ClientPath});
+ }
+
+ for (const ChunkMapping& Meta : OpMapping.Meta)
+ {
+ m_MetaMap.insert_or_assign(Meta.Id, Meta.Hash);
+ }
+
+ const PayloadIndex PayloadOffset(m_OpLogPayloads.size());
+
+ m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset);
+ m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress});
+ },
+ m_LogFlushPosition);
+
+ if (m_Storage->LogCount() != m_LogFlushPosition || m_IsLegacySnapshot)
+ {
+ WriteIndexSnapshot();
+ }
+ }
+ else
+ {
+ std::vector<OplogEntry> OpLogEntries;
+ uint64_t InvalidEntries;
+ m_Storage->ReadOplogEntriesFromLog(OpLogEntries, InvalidEntries, m_LogFlushPosition);
+ for (const OplogEntry& OpEntry : OpLogEntries)
+ {
+ const PayloadIndex PayloadOffset(m_OpLogPayloads.size());
+
+ m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset);
+ m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress});
+ }
+ }
+}
+
+void
+ProjectStore::Oplog::Write()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_ASSERT(m_Mode != EMode::kBasicReadOnly);
+
+ using namespace std::literals;
+
+ BinaryWriter Mem;
+
+ CbObjectWriter Cfg;
+
+ Cfg << "gcpath"sv << PathToUtf8(m_MarkerPath);
+
+ Cfg.Save(Mem);
+
+ std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv;
+
+ ZEN_DEBUG("oplog '{}/{}': persisting config to '{}'", m_OuterProjectId, m_OplogId, StateFilePath);
+
+ TemporaryFile::SafeWriteFile(StateFilePath, Mem.GetView());
+}
+
+void
+ProjectStore::Oplog::Update(const std::filesystem::path& MarkerPath)
+{
+ if (m_MarkerPath == MarkerPath)
+ {
+ return;
+ }
+ Write();
+}
+
+bool
+ProjectStore::Oplog::Reset()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_ASSERT(m_Mode == EMode::kFull);
+ std::filesystem::path MovedDir;
+
+ {
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ m_Storage = {};
+ if (!PrepareDirectoryDelete(m_BasePath, MovedDir))
+ {
+ m_Storage = new OplogStorage(this, m_BasePath);
+ const bool StoreExists = m_Storage->Exists();
+ m_Storage->Open(StoreExists ? OplogStorage::EMode::Write : OplogStorage::EMode::Create);
+ m_MetaValid = !IsFileOlderThan(m_MetaPath, m_Storage->GetBlobsPath());
+ return false;
+ }
+ m_ChunkMap.clear();
+ m_MetaMap.clear();
+ m_FileMap.clear();
+ m_OpToPayloadOffsetMap.clear();
+ m_OpLogPayloads.clear();
+ m_LsnToPayloadOffsetMap.reset();
+ m_Storage = new OplogStorage(this, m_BasePath);
+ m_Storage->Open(OplogStorage::EMode::Create);
+ m_MetaValid = false;
+ CleanDirectory(m_TempPath, /*ForceRemoveReadOnlyFiles*/ false);
+ Write();
+ }
+ // Erase content on disk
+ if (!MovedDir.empty())
+ {
+ OplogStorage::Delete(MovedDir);
+ }
+ return true;
+}
+
+bool
+ProjectStore::Oplog::CanUnload()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ RwLock::SharedLockScope _(m_OplogLock);
+
+ uint64_t LogCount = m_Storage->LogCount();
+ if (m_LogFlushPosition != LogCount)
+ {
+ return false; // The oplog is not flushed so likely this is an active oplog
+ }
+
+ if (!m_PendingPrepOpAttachments.empty())
+ {
+ return false; // We have a pending oplog prep operation in flight
+ }
+ if (m_UpdateCaptureRefCounter > 0)
+ {
+ return false; // GC capture is enable for the oplog
+ }
+ return true;
+}
+
+std::optional<CbObject>
+ProjectStore::Oplog::ReadStateFile(const std::filesystem::path& BasePath, std::function<LoggerRef()>&& Log)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Oplog::ReadStateFile");
+ using namespace std::literals;
+
+ std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv;
+ if (IsFile(StateFilePath))
+ {
+ // ZEN_INFO("oplog '{}/{}': config read from '{}'", m_OuterProjectId, m_OplogId, StateFilePath);
+
+ BasicFile Blob;
+ Blob.Open(StateFilePath, BasicFile::Mode::kRead);
+
+ IoBuffer Obj = Blob.ReadAll();
+ CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All);
+
+ if (ValidationError != CbValidateError::None)
+ {
+ ZEN_ERROR("validation error {} hit for oplog config at '{}'", ToString(ValidationError), StateFilePath);
+ return CbObject();
+ }
+
+ return LoadCompactBinaryObject(Obj);
+ }
+ ZEN_INFO("config for oplog not found at '{}'. Assuming legacy store", StateFilePath);
+ return {};
+}
+
+ProjectStore::Oplog::ValidationResult
+ProjectStore::Oplog::Validate(const std::filesystem::path& ProjectRootDir,
+ std::atomic_bool& IsCancelledFlag,
+ WorkerThreadPool* OptionalWorkerPool)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ using namespace std::literals;
+
+ ValidationResult Result;
+
+ const size_t OpCount = OplogCount();
+
+ std::vector<Oid> KeyHashes;
+ std::vector<std::string> Keys;
+ std::vector<std::vector<IoHash>> Attachments;
+ std::vector<OplogEntryMapping> Mappings;
+
+ KeyHashes.reserve(OpCount);
+ Keys.reserve(OpCount);
+ Mappings.reserve(OpCount);
+
+ IterateOplogWithKey([&](LogSequenceNumber LSN, const Oid& Key, CbObjectView OpView) {
+ Result.LSNLow = Min(Result.LSNLow, LSN);
+ Result.LSNHigh = Max(Result.LSNHigh, LSN);
+ KeyHashes.push_back(Key);
+ Keys.emplace_back(std::string(OpView["key"sv].AsString()));
+
+ std::vector<IoHash> OpAttachments;
+ OpView.IterateAttachments([&OpAttachments](CbFieldView Attachment) { OpAttachments.push_back(Attachment.AsAttachment()); });
+ Attachments.emplace_back(std::move(OpAttachments));
+
+ Mappings.push_back(GetMapping(OpView));
+ });
+
+ Result.OpCount = gsl::narrow<uint32_t>(Keys.size());
+
+ RwLock ResultLock;
+
+ auto ValidateOne = [&](uint32_t OpIndex) {
+ const Oid& KeyHash = KeyHashes[OpIndex];
+ const std::string& Key = Keys[OpIndex];
+ const OplogEntryMapping& Mapping(Mappings[OpIndex]);
+ bool HasMissingEntries = false;
+ for (const ChunkMapping& Chunk : Mapping.Chunks)
+ {
+ if (!m_CidStore.ContainsChunk(Chunk.Hash))
+ {
+ ResultLock.WithExclusiveLock([&]() { Result.MissingChunks.push_back({KeyHash, Chunk}); });
+ HasMissingEntries = true;
+ }
+ }
+
+ for (const ChunkMapping& Meta : Mapping.Meta)
+ {
+ if (!m_CidStore.ContainsChunk(Meta.Hash))
+ {
+ ResultLock.WithExclusiveLock([&]() { Result.MissingMetas.push_back({KeyHash, Meta}); });
+ HasMissingEntries = true;
+ }
+ }
+
+ for (const FileMapping& File : Mapping.Files)
+ {
+ if (File.Hash == IoHash::Zero)
+ {
+ std::filesystem::path FilePath = ProjectRootDir / File.ServerPath;
+ if (!IsFile(FilePath))
+ {
+ ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); });
+ HasMissingEntries = true;
+ }
+ }
+ else if (!m_CidStore.ContainsChunk(File.Hash))
+ {
+ ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); });
+ HasMissingEntries = true;
+ }
+ }
+
+ const std::vector<IoHash>& OpAttachments = Attachments[OpIndex];
+ for (const IoHash& Attachment : OpAttachments)
+ {
+ if (!m_CidStore.ContainsChunk(Attachment))
+ {
+ ResultLock.WithExclusiveLock([&]() { Result.MissingAttachments.push_back({KeyHash, Attachment}); });
+ HasMissingEntries = true;
+ }
+ }
+
+ if (HasMissingEntries)
+ {
+ ResultLock.WithExclusiveLock([&]() { Result.OpKeys.push_back({KeyHash, Key}); });
+ }
+ };
+
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
+ try
+ {
+ for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+ if (OptionalWorkerPool)
+ {
+ Work.ScheduleWork(*OptionalWorkerPool, [&ValidateOne, Index = OpIndex](std::atomic<bool>& AbortFlag) {
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ if (AbortFlag)
+ {
+ return;
+ }
+ ValidateOne(Index);
+ });
+ }
+ else
+ {
+ ValidateOne(OpIndex);
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed validating oplogs in {}. Reason: '{}'", m_BasePath, Ex.what());
+ }
+ Work.Wait();
+
+ {
+ // Check if we were deleted while we were checking the references without a lock...
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ Result = {};
+ }
+ }
+
+ return Result;
+}
+
+void
+ProjectStore::Oplog::WriteIndexSnapshot()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Oplog::WriteIndexSnapshot");
+
+ ZEN_ASSERT(m_Mode == EMode::kFull);
+
+ ZEN_DEBUG("oplog '{}/{}': write store snapshot at '{}'", m_OuterProjectId, m_OplogId, m_BasePath);
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("oplog '{}/{}': wrote store snapshot for '{}' containing {} entries in {}",
+ m_OuterProjectId,
+ m_OplogId,
+ m_BasePath,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ namespace fs = std::filesystem;
+
+ const fs::path IndexPath = m_BasePath / "ops.zidx";
+ try
+ {
+ std::vector<Oid> Keys;
+ std::vector<PayloadIndex> OpPayloadOffsets;
+ std::vector<IoHash> ChunkMapEntries;
+ std::vector<IoHash> MetaMapEntries;
+ std::vector<uint32_t> FilePathLengths;
+ std::vector<std::string> FilePaths;
+ uint64_t IndexLogPosition = 0;
+ {
+ IndexLogPosition = m_Storage->LogCount();
+ Keys.reserve(m_OpToPayloadOffsetMap.size() + m_ChunkMap.size() + m_MetaMap.size() + m_FileMap.size());
+ OpPayloadOffsets.reserve(m_OpToPayloadOffsetMap.size());
+ for (const auto& Kv : m_OpToPayloadOffsetMap)
+ {
+ Keys.push_back(Kv.first);
+ OpPayloadOffsets.push_back(Kv.second);
+ }
+
+ ChunkMapEntries.reserve(m_ChunkMap.size());
+ for (const auto& It : m_ChunkMap)
+ {
+ Keys.push_back(It.first);
+ ChunkMapEntries.push_back(It.second);
+ }
+
+ MetaMapEntries.reserve(m_MetaMap.size());
+ for (const auto& It : m_MetaMap)
+ {
+ Keys.push_back(It.first);
+ MetaMapEntries.push_back(It.second);
+ }
+
+ FilePathLengths.reserve(m_FileMap.size() * 2);
+ FilePaths.reserve(m_FileMap.size() * 2);
+ for (const auto& It : m_FileMap)
+ {
+ Keys.push_back(It.first);
+ FilePathLengths.push_back(gsl::narrow<uint32_t>(It.second.ServerPath.length()));
+ FilePathLengths.push_back(gsl::narrow<uint32_t>(It.second.ClientPath.length()));
+ FilePaths.push_back(It.second.ServerPath);
+ FilePaths.push_back(It.second.ClientPath);
+ }
+ }
+
+ TemporaryFile ObjectIndexFile;
+ std::error_code Ec;
+ ObjectIndexFile.CreateTemporary(IndexPath.parent_path(), Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to create temp file for index snapshot at '{}'", IndexPath));
+ }
+
+ {
+ BasicFileWriter IndexFile(ObjectIndexFile, 65536u);
+
+ OplogIndexHeader Header;
+ Header.V2 = {.LogPosition = IndexLogPosition,
+ .KeyCount = gsl::narrow<uint64_t>(Keys.size()),
+ .OpPayloadIndexCount = gsl::narrow<uint32_t>(OpPayloadOffsets.size()),
+ .OpPayloadCount = gsl::narrow<uint32_t>(m_OpLogPayloads.size()),
+ .ChunkMapCount = gsl::narrow<uint32_t>(ChunkMapEntries.size()),
+ .MetaMapCount = gsl::narrow<uint32_t>(MetaMapEntries.size()),
+ .FileMapCount = gsl::narrow<uint32_t>(FilePathLengths.size() / 2),
+ .MaxLSN = m_Storage->MaxLSN(),
+ .NextOpCoreOffset = m_Storage->GetNextOpOffset()};
+
+ Header.Checksum = OplogIndexHeader::ComputeChecksum(Header);
+
+ uint64_t Offset = 0;
+ IndexFile.Write(&Header, sizeof(OplogIndexHeader), Offset);
+ Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment);
+
+ IndexFile.Write(Keys.data(), Keys.size() * sizeof(Oid), Offset);
+ Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment);
+
+ IndexFile.Write(OpPayloadOffsets.data(), OpPayloadOffsets.size() * sizeof(PayloadIndex), Offset);
+ Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment);
+
+ IndexFile.Write(m_OpLogPayloads.data(), m_OpLogPayloads.size() * sizeof(OplogPayload), Offset);
+ Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment);
+
+ IndexFile.Write(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset);
+ Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment);
+
+ IndexFile.Write(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset);
+ Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment);
+
+ IndexFile.Write(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset);
+ Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment);
+
+ for (const auto& FilePath : FilePaths)
+ {
+ IndexFile.Write(FilePath.c_str(), FilePath.length(), Offset);
+ Offset += FilePath.length();
+ }
+ }
+
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec,
+ fmt::format("Snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'",
+ ObjectIndexFile.GetPath(),
+ IndexPath,
+ Ec.message()));
+ }
+ EntryCount = m_OpLogPayloads.size();
+ m_LogFlushPosition = IndexLogPosition;
+ m_IsLegacySnapshot = false;
+ }
+ catch (const std::exception& Err)
+ {
+ ZEN_WARN("oplog '{}/{}': snapshot FAILED, reason: '{}'", m_OuterProjectId, m_OplogId, Err.what());
+ }
+}
+
+void
+ProjectStore::Oplog::ReadIndexSnapshot()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Oplog::ReadIndexSnapshot");
+
+ const std::filesystem::path IndexPath = m_BasePath / "ops.zidx";
+ if (IsFile(IndexPath))
+ {
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("oplog '{}/{}': index read from '{}' containing {} entries in {}",
+ m_OuterProjectId,
+ m_OplogId,
+ IndexPath,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ try
+ {
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(OplogIndexHeader))
+ {
+ OplogIndexHeader Header;
+ uint64_t Offset = 0;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ Offset += sizeof(OplogIndexHeader);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ if (Header.Magic == OplogIndexHeader::ExpectedMagic)
+ {
+ uint32_t Checksum = OplogIndexHeader::ComputeChecksum(Header);
+ if (Header.Checksum != Checksum)
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Checksum mismatch. Expected: {}, Found: {}",
+ m_OuterProjectId,
+ m_OplogId,
+ IndexPath,
+ Header.Checksum,
+ Checksum);
+ return;
+ }
+
+ if (Header.Version == OplogIndexHeader::Version1)
+ {
+ uint32_t MaxLSN = 0;
+ OplogEntryAddress LastOpAddress{.Offset = 0, .Size = 0};
+
+ if (Header.V1.LatestOpMapCount + Header.V1.ChunkMapCount + Header.V1.MetaMapCount + Header.V1.FileMapCount !=
+ Header.V1.KeyCount)
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Key count mismatch. Expected: {}, Found: {}",
+ m_OuterProjectId,
+ m_OplogId,
+ IndexPath,
+ Header.V1.LatestOpMapCount + Header.V1.ChunkMapCount + Header.V1.MetaMapCount + Header.V1.FileMapCount,
+ Header.V1.KeyCount);
+ return;
+ }
+
+ std::vector<uint32_t> LSNEntries(Header.V1.LSNCount);
+ ObjectIndexFile.Read(LSNEntries.data(), LSNEntries.size() * sizeof(uint32_t), Offset);
+ Offset += LSNEntries.size() * sizeof(uint32_t);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ size_t LSNOffset = 0;
+
+ std::vector<Oid> Keys(Header.V1.KeyCount);
+ ObjectIndexFile.Read(Keys.data(), Keys.size() * sizeof(Oid), Offset);
+ Offset += Keys.size() * sizeof(Oid);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ size_t KeyOffset = 0;
+
+ {
+ std::vector<OplogEntryAddress> AddressMapEntries(Header.V1.OpAddressMapCount);
+ ObjectIndexFile.Read(AddressMapEntries.data(), AddressMapEntries.size() * sizeof(OplogEntryAddress), Offset);
+ Offset += AddressMapEntries.size() * sizeof(OplogEntryAddress);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ tsl::robin_map<uint32_t, PayloadIndex> LsnToPayloadOffset;
+ LsnToPayloadOffset.reserve(AddressMapEntries.size());
+
+ m_OpLogPayloads.reserve(AddressMapEntries.size());
+ for (const OplogEntryAddress& Address : AddressMapEntries)
+ {
+ const uint32_t LSN = LSNEntries[LSNOffset++];
+ LsnToPayloadOffset.insert_or_assign(LSN, PayloadIndex(m_OpLogPayloads.size()));
+
+ m_OpLogPayloads.push_back({.Lsn = LogSequenceNumber(LSN), .Address = Address});
+ if (Address.Offset > LastOpAddress.Offset)
+ {
+ LastOpAddress = Address;
+ }
+ MaxLSN = Max(MaxLSN, LSN);
+ }
+
+ {
+ std::vector<uint32_t> LatestOpMapEntries(Header.V1.LatestOpMapCount);
+ ObjectIndexFile.Read(LatestOpMapEntries.data(), LatestOpMapEntries.size() * sizeof(uint32_t), Offset);
+ Offset += LatestOpMapEntries.size() * sizeof(uint32_t);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ m_OpToPayloadOffsetMap.reserve(LatestOpMapEntries.size());
+ for (uint32_t Lsn : LatestOpMapEntries)
+ {
+ if (auto It = LsnToPayloadOffset.find(Lsn); It != LsnToPayloadOffset.end())
+ {
+ m_OpToPayloadOffsetMap.insert_or_assign(Keys[KeyOffset++], It->second);
+ MaxLSN = Max(MaxLSN, Lsn);
+ }
+ }
+ }
+ }
+ if (m_Mode == EMode::kFull)
+ {
+ {
+ std::vector<IoHash> ChunkMapEntries(Header.V1.ChunkMapCount);
+ ObjectIndexFile.Read(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset);
+ Offset += ChunkMapEntries.size() * sizeof(IoHash);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ m_ChunkMap.reserve(ChunkMapEntries.size());
+ for (const IoHash& ChunkId : ChunkMapEntries)
+ {
+ m_ChunkMap.insert_or_assign(Keys[KeyOffset++], ChunkId);
+ }
+ }
+ {
+ std::vector<IoHash> MetaMapEntries(Header.V1.MetaMapCount);
+ ObjectIndexFile.Read(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset);
+ Offset += MetaMapEntries.size() * sizeof(IoHash);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ m_MetaMap.reserve(MetaMapEntries.size());
+ for (const IoHash& ChunkId : MetaMapEntries)
+ {
+ m_MetaMap.insert_or_assign(Keys[KeyOffset++], ChunkId);
+ }
+ }
+ {
+ m_FileMap.reserve(Header.V1.FileMapCount);
+
+ std::vector<uint32_t> FilePathLengths(Header.V1.FileMapCount * 2);
+ ObjectIndexFile.Read(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset);
+ Offset += FilePathLengths.size() * sizeof(uint32_t);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ BasicFileBuffer IndexFile(ObjectIndexFile, 65536);
+ auto ReadString([&IndexFile, &Offset](uint32_t Length) -> std::string {
+ MemoryView StringData = IndexFile.MakeView(Length, Offset);
+ if (StringData.GetSize() != Length)
+ {
+ throw std::runtime_error(fmt::format("Invalid format. Expected to read {} bytes but got {}",
+ Length,
+ uint32_t(StringData.GetSize())));
+ }
+ Offset += Length;
+ return std::string((const char*)StringData.GetData(), Length);
+ });
+ for (uint64_t FileLengthOffset = 0; FileLengthOffset < FilePathLengths.size();)
+ {
+ std::string ServerPath = ReadString(FilePathLengths[FileLengthOffset++]);
+ std::string ClientPath = ReadString(FilePathLengths[FileLengthOffset++]);
+ m_FileMap.insert_or_assign(
+ Keys[KeyOffset++],
+ FileMapEntry{.ServerPath = std::move(ServerPath), .ClientPath = std::move(ClientPath)});
+ }
+ }
+ }
+ m_LogFlushPosition = Header.V1.LogPosition;
+ m_Storage->SetMaxLSNAndNextWriteAddress(MaxLSN, LastOpAddress);
+ EntryCount = Header.V1.LSNCount;
+ m_IsLegacySnapshot = true;
+ }
+ else if (Header.Version == OplogIndexHeader::Version2)
+ {
+ std::vector<Oid> Keys(Header.V2.KeyCount);
+
+ ObjectIndexFile.Read(Keys.data(), Keys.size() * sizeof(Oid), Offset);
+ Offset += Keys.size() * sizeof(Oid);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ std::vector<PayloadIndex> OpPayloadOffsets(Header.V2.OpPayloadIndexCount);
+ ObjectIndexFile.Read(OpPayloadOffsets.data(), OpPayloadOffsets.size() * sizeof(PayloadIndex), Offset);
+ Offset += OpPayloadOffsets.size() * sizeof(PayloadIndex);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ m_OpLogPayloads.resize(Header.V2.OpPayloadCount);
+ ObjectIndexFile.Read(m_OpLogPayloads.data(), m_OpLogPayloads.size() * sizeof(OplogPayload), Offset);
+ Offset += m_OpLogPayloads.size() * sizeof(OplogPayload);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ size_t KeyOffset = 0;
+
+ m_OpToPayloadOffsetMap.reserve(Header.V2.OpPayloadIndexCount);
+ for (const PayloadIndex PayloadOffset : OpPayloadOffsets)
+ {
+ const Oid& Key = Keys[KeyOffset++];
+ ZEN_ASSERT_SLOW(PayloadOffset.Index < Header.V2.OpPayloadCount);
+ m_OpToPayloadOffsetMap.insert({Key, PayloadOffset});
+ }
+
+ if (m_Mode == EMode::kFull)
+ {
+ {
+ std::vector<IoHash> ChunkMapEntries(Header.V2.ChunkMapCount);
+ ObjectIndexFile.Read(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset);
+ Offset += ChunkMapEntries.size() * sizeof(IoHash);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ m_ChunkMap.reserve(ChunkMapEntries.size());
+ for (const IoHash& ChunkId : ChunkMapEntries)
+ {
+ m_ChunkMap.insert_or_assign(Keys[KeyOffset++], ChunkId);
+ }
+ }
+ {
+ std::vector<IoHash> MetaMapEntries(Header.V2.MetaMapCount);
+ ObjectIndexFile.Read(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset);
+ Offset += MetaMapEntries.size() * sizeof(IoHash);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ m_MetaMap.reserve(MetaMapEntries.size());
+ for (const IoHash& ChunkId : MetaMapEntries)
+ {
+ m_MetaMap.insert_or_assign(Keys[KeyOffset++], ChunkId);
+ }
+ }
+ {
+ m_FileMap.reserve(Header.V2.FileMapCount);
+
+ std::vector<uint32_t> FilePathLengths(Header.V2.FileMapCount * 2);
+ ObjectIndexFile.Read(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset);
+ Offset += FilePathLengths.size() * sizeof(uint32_t);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ BasicFileBuffer IndexFile(ObjectIndexFile, 65536);
+ auto ReadString([&IndexFile, &Offset](uint32_t Length) -> std::string {
+ MemoryView StringData = IndexFile.MakeView(Length, Offset);
+ if (StringData.GetSize() != Length)
+ {
+ throw std::runtime_error(fmt::format("Invalid format. Expected to read {} bytes but got {}",
+ Length,
+ uint32_t(StringData.GetSize())));
+ }
+ Offset += Length;
+ return std::string((const char*)StringData.GetData(), Length);
+ });
+ for (uint64_t FileLengthOffset = 0; FileLengthOffset < FilePathLengths.size();)
+ {
+ std::string ServerPath = ReadString(FilePathLengths[FileLengthOffset++]);
+ std::string ClientPath = ReadString(FilePathLengths[FileLengthOffset++]);
+ m_FileMap.insert_or_assign(
+ Keys[KeyOffset++],
+ FileMapEntry{.ServerPath = std::move(ServerPath), .ClientPath = std::move(ClientPath)});
+ }
+ }
+ }
+ m_LogFlushPosition = Header.V2.LogPosition;
+ m_Storage->SetMaxLSNAndNextOpOffset(Header.V2.MaxLSN, Header.V2.NextOpCoreOffset);
+ EntryCount = Header.V2.OpPayloadCount;
+ m_IsLegacySnapshot = false;
+ }
+ else
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Unsupported version number {}",
+ m_OuterProjectId,
+ m_OplogId,
+ IndexPath,
+ Header.Version);
+ return;
+ }
+ }
+ else
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'", m_OuterProjectId, m_OplogId, IndexPath);
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ m_OpToPayloadOffsetMap.clear();
+ m_OpLogPayloads.clear();
+ m_ChunkMap.clear();
+ m_MetaMap.clear();
+ m_FileMap.clear();
+ m_LogFlushPosition = 0;
+ ZEN_ERROR("oplog '{}/{}': failed reading index snapshot from '{}'. Reason: '{}'",
+ m_OuterProjectId,
+ m_OplogId,
+ IndexPath,
+ Ex.what());
+ }
+ }
+}
+
+uint32_t
+ProjectStore::Oplog::GetUnusedSpacePercent() const
+{
+ RwLock::SharedLockScope OplogLock(m_OplogLock);
+ return GetUnusedSpacePercentLocked();
+}
+
+uint32_t
+ProjectStore::Oplog::GetUnusedSpacePercentLocked() const
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ const uint64_t ActualBlobsSize = m_Storage->OpBlobsSize();
+ if (ActualBlobsSize == 0)
+ {
+ return 0;
+ }
+
+ const uint64_t EffectiveBlobsSize = m_Storage->GetEffectiveBlobsSize(m_OpLogPayloads);
+
+ if (EffectiveBlobsSize < ActualBlobsSize)
+ {
+ return gsl::narrow<uint32_t>((100 * (ActualBlobsSize - EffectiveBlobsSize)) / ActualBlobsSize);
+ }
+
+ return 0;
+}
+
+void
+ProjectStore::Oplog::Compact(bool DryRun, bool RetainLSNs, std::string_view LogPrefix)
+{
+ RwLock::ExclusiveLockScope Lock(m_OplogLock);
+ Compact(Lock, DryRun, RetainLSNs, LogPrefix);
+}
+
+void
+ProjectStore::Oplog::Compact(RwLock::ExclusiveLockScope&, bool DryRun, bool RetainLSNs, std::string_view LogPrefix)
+{
+ ZEN_ASSERT(m_Mode == EMode::kFull);
+
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ Stopwatch Timer;
+
+ std::vector<LogSequenceNumber> LatestLSNs;
+ LatestLSNs.reserve(m_OpLogPayloads.size());
+ for (const auto& Kv : m_OpToPayloadOffsetMap)
+ {
+ const OplogPayload& OpPayload = m_OpLogPayloads[Kv.second];
+ LatestLSNs.push_back(OpPayload.Lsn);
+ }
+
+ std::vector<OplogPayload> OpPayloads;
+ OpPayloads.reserve(LatestLSNs.size());
+
+ OidMap<PayloadIndex> OpToPayloadOffsetMap;
+ OpToPayloadOffsetMap.reserve(LatestLSNs.size());
+
+ uint64_t PreSize = TotalSize();
+
+ m_Storage->Compact(
+ LatestLSNs,
+ [&](const Oid& OpKeyHash, LogSequenceNumber, LogSequenceNumber NewLsn, const OplogEntryAddress& NewAddress) {
+ OpToPayloadOffsetMap.insert({OpKeyHash, PayloadIndex(OpPayloads.size())});
+ OpPayloads.push_back({.Lsn = NewLsn, .Address = NewAddress});
+ },
+ RetainLSNs,
+ /*DryRun*/ DryRun);
+
+ if (!DryRun)
+ {
+ m_OpToPayloadOffsetMap.swap(OpToPayloadOffsetMap);
+ m_OpLogPayloads.swap(OpPayloads);
+ m_LsnToPayloadOffsetMap.reset();
+ WriteIndexSnapshot();
+ }
+
+ uint64_t PostSize = TotalSize();
+ uint64_t FreedSize = (PreSize > PostSize) ? (PreSize - PostSize) : 0;
+
+ ZEN_INFO("{} oplog '{}/{}': Compacted in {}. New size: {}, freeing: {}",
+ LogPrefix,
+ m_OuterProjectId,
+ m_OplogId,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ NiceBytes(PostSize),
+ NiceBytes(FreedSize));
+}
+
+IoBuffer
+ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash)
+{
+ ZEN_ASSERT(m_Mode == EMode::kFull);
+ return m_CidStore.FindChunkByCid(RawHash);
+}
+
+bool
+ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes,
+ bool IncludeModTag,
+ const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
+{
+ ZEN_ASSERT(m_Mode == EMode::kFull);
+ return m_CidStore.IterateChunks(
+ RawHashes,
+ [&](size_t Index, const IoBuffer& Payload) {
+ return AsyncCallback(Index, Payload, IncludeModTag ? GetModificationTagFromRawHash(RawHashes[Index]) : 0);
+ },
+ OptionalWorkerPool,
+ LargeSizeLimit);
+}
+
+bool
+ProjectStore::Oplog::IterateChunks(const std::filesystem::path& ProjectRootDir,
+ std::span<Oid> ChunkIds,
+ bool IncludeModTag,
+ const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
+{
+ ZEN_ASSERT(m_Mode == EMode::kFull);
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ std::vector<size_t> CidChunkIndexes;
+ std::vector<IoHash> CidChunkHashes;
+ std::vector<size_t> FileChunkIndexes;
+ std::vector<std::filesystem::path> FileChunkPaths;
+ {
+ RwLock::SharedLockScope OplogLock(m_OplogLock);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkIds.size(); ChunkIndex++)
+ {
+ const Oid& ChunkId = ChunkIds[ChunkIndex];
+ if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end())
+ {
+ CidChunkIndexes.push_back(ChunkIndex);
+ CidChunkHashes.push_back(ChunkIt->second);
+ }
+ else if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end())
+ {
+ CidChunkIndexes.push_back(ChunkIndex);
+ CidChunkHashes.push_back(ChunkIt->second);
+ }
+ else if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end())
+ {
+ FileChunkIndexes.push_back(ChunkIndex);
+ FileChunkPaths.emplace_back(ProjectRootDir / FileIt->second.ServerPath);
+ }
+ }
+ }
+ if (OptionalWorkerPool)
+ {
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
+ try
+ {
+ for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback](
+ std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
+ size_t FileChunkIndex = FileChunkIndexes[ChunkIndex];
+ const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex];
+ try
+ {
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath);
+ if (!Payload)
+ {
+ ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[FileChunkIndex], FilePath);
+ }
+
+ if (!AsyncCallback(FileChunkIndex,
+ Payload,
+ IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0))
+ {
+ AbortFlag.store(true);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'",
+ m_OuterProjectId,
+ m_OplogId,
+ FileChunkIndex,
+ FilePath,
+ Ex.what());
+ }
+ });
+ }
+
+ if (!CidChunkHashes.empty() && !AbortFlag)
+ {
+ m_CidStore.IterateChunks(
+ CidChunkHashes,
+ [&](size_t Index, const IoBuffer& Payload) {
+ size_t CidChunkIndex = CidChunkIndexes[Index];
+ if (AbortFlag)
+ {
+ return false;
+ }
+ return AsyncCallback(CidChunkIndex,
+ Payload,
+ IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0);
+ },
+ OptionalWorkerPool,
+ LargeSizeLimit);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed iterating oplog chunks in {}. Reason: '{}'", m_BasePath, Ex.what());
+ }
+
+ Work.Wait();
+
+ return !AbortFlag;
+ }
+ else
+ {
+ if (!CidChunkHashes.empty())
+ {
+ m_CidStore.IterateChunks(
+ CidChunkHashes,
+ [&](size_t Index, const IoBuffer& Payload) {
+ size_t CidChunkIndex = CidChunkIndexes[Index];
+ return AsyncCallback(CidChunkIndex, Payload, IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0);
+ },
+ OptionalWorkerPool,
+ LargeSizeLimit);
+ }
+
+ for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++)
+ {
+ size_t FileChunkIndex = FileChunkIndexes[ChunkIndex];
+ const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex];
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath);
+ if (Payload)
+ {
+ bool Result = AsyncCallback(FileChunkIndex, Payload, IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0);
+ if (!Result)
+ {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+}
+
+IoBuffer
+ProjectStore::Oplog::FindChunk(const std::filesystem::path& ProjectRootDir, const Oid& ChunkId, uint64_t* OptOutModificationTag)
+{
+ ZEN_ASSERT(m_Mode == EMode::kFull);
+
+ RwLock::SharedLockScope OplogLock(m_OplogLock);
+ if (!m_Storage)
+ {
+ return IoBuffer{};
+ }
+
+ if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end())
+ {
+ IoHash ChunkHash = ChunkIt->second;
+ OplogLock.ReleaseNow();
+
+ IoBuffer Result = m_CidStore.FindChunkByCid(ChunkHash);
+ if (Result && OptOutModificationTag != nullptr)
+ {
+ *OptOutModificationTag = GetModificationTagFromRawHash(ChunkHash);
+ }
+ return Result;
+ }
+
+ if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end())
+ {
+ std::filesystem::path FilePath = ProjectRootDir / FileIt->second.ServerPath;
+
+ OplogLock.ReleaseNow();
+
+ IoBuffer Result = IoBufferBuilder::MakeFromFile(FilePath);
+ if (!Result)
+ {
+ ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkId, FilePath);
+ }
+ else if (OptOutModificationTag != nullptr)
+ {
+ *OptOutModificationTag = GetModificationTagFromModificationTime(Result);
+ }
+ return Result;
+ }
+
+ if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end())
+ {
+ IoHash ChunkHash = MetaIt->second;
+ OplogLock.ReleaseNow();
+
+ IoBuffer Result = m_CidStore.FindChunkByCid(ChunkHash);
+ if (Result && OptOutModificationTag != nullptr)
+ {
+ *OptOutModificationTag = GetModificationTagFromRawHash(ChunkHash);
+ }
+ return Result;
+ }
+
+ return {};
+}
+
+std::vector<ProjectStore::Oplog::ChunkInfo>
+ProjectStore::Oplog::GetAllChunksInfo(const std::filesystem::path& ProjectRootDir)
+{
+ ZEN_ASSERT(m_Mode == EMode::kFull);
+
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ // First just capture all the chunk ids
+
+ std::vector<ChunkInfo> InfoArray;
+
+ {
+ RwLock::SharedLockScope _(m_OplogLock);
+
+ if (m_Storage)
+ {
+ const size_t NumEntries = m_FileMap.size() + m_ChunkMap.size();
+
+ InfoArray.reserve(NumEntries);
+
+ for (const auto& Kv : m_FileMap)
+ {
+ InfoArray.push_back({.ChunkId = Kv.first});
+ }
+
+ for (const auto& Kv : m_ChunkMap)
+ {
+ InfoArray.push_back({.ChunkId = Kv.first});
+ }
+ }
+ }
+
+ for (ChunkInfo& Info : InfoArray)
+ {
+ if (IoBuffer Chunk = FindChunk(ProjectRootDir, Info.ChunkId, nullptr))
+ {
+ Info.ChunkSize = Chunk.GetSize();
+ }
+ }
+
+ return InfoArray;
+}
+
+void
+ProjectStore::Oplog::IterateChunkMap(std::function<void(const Oid&, const IoHash&)>&& Fn)
+{
+ ZEN_ASSERT(m_Mode == EMode::kFull);
+
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return;
+ }
+
+ for (const auto& Kv : m_ChunkMap)
+ {
+ Fn(Kv.first, Kv.second);
+ }
+}
+
+void
+ProjectStore::Oplog::IterateFileMap(
+ std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn)
+{
+ ZEN_ASSERT(m_Mode == EMode::kFull);
+
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return;
+ }
+
+ for (const auto& Kv : m_FileMap)
+ {
+ Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath);
+ }
+}
+
+void
+ProjectStore::Oplog::IterateOplog(std::function<void(CbObjectView)>&& Handler, const Paging& EntryPaging)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+ IterateOplogLocked(std::move(Handler), EntryPaging);
+}
+
+std::vector<ProjectStore::Oplog::PayloadIndex>
+ProjectStore::Oplog::GetSortedOpPayloadRangeLocked(const Paging& EntryPaging,
+ tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher>* OutOptionalReverseKeyMap)
+{
+ std::pair<int32_t, int32_t> StartAndEnd = GetPagedRange(int32_t(m_OpToPayloadOffsetMap.size()), EntryPaging);
+ if (StartAndEnd.first == StartAndEnd.second)
+ {
+ return {};
+ }
+
+ const int32_t ReplayCount = StartAndEnd.second - StartAndEnd.first;
+
+ auto Start = m_OpToPayloadOffsetMap.cbegin();
+ std::advance(Start, StartAndEnd.first);
+
+ auto End = Start;
+ std::advance(End, ReplayCount);
+
+ std::vector<PayloadIndex> ReplayOrder;
+ ReplayOrder.reserve(ReplayCount);
+
+ for (auto It = Start; It != End; It++)
+ {
+ const PayloadIndex PayloadOffset = It->second;
+ if (OutOptionalReverseKeyMap != nullptr)
+ {
+ OutOptionalReverseKeyMap->insert_or_assign(PayloadOffset, It->first);
+ }
+ ReplayOrder.push_back(PayloadOffset);
+ }
+
+ std::sort(ReplayOrder.begin(), ReplayOrder.end(), [this](const PayloadIndex Lhs, const PayloadIndex Rhs) {
+ const OplogEntryAddress& LhsEntry = m_OpLogPayloads[Lhs].Address;
+ const OplogEntryAddress& RhsEntry = m_OpLogPayloads[Rhs].Address;
+ return LhsEntry.Offset < RhsEntry.Offset;
+ });
+
+ return ReplayOrder;
+}
+
+void
+ProjectStore::Oplog::IterateOplogLocked(std::function<void(CbObjectView)>&& Handler, const Paging& EntryPaging)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::Oplog::IterateOplogLocked");
+ if (!m_Storage)
+ {
+ return;
+ }
+
+ std::vector<PayloadIndex> ReplayOrder = GetSortedOpPayloadRangeLocked(EntryPaging, nullptr);
+ if (!ReplayOrder.empty())
+ {
+ m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber, CbObjectView Op) { Handler(Op); });
+ }
+}
+
+void
+ProjectStore::Oplog::IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Handler)
+{
+ IterateOplogWithKey(std::move(Handler), Paging{});
+}
+
+void
+ProjectStore::Oplog::IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Handler,
+ const Paging& EntryPaging)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher> ReverseKeyMap;
+ std::vector<PayloadIndex> ReplayOrder;
+
+ {
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (m_Storage)
+ {
+ ReplayOrder = GetSortedOpPayloadRangeLocked(EntryPaging, &ReverseKeyMap);
+ if (!ReplayOrder.empty())
+ {
+ uint32_t EntryIndex = 0;
+ m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, CbObjectView Op) {
+ const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex];
+ Handler(Lsn, ReverseKeyMap.at(PayloadOffset), Op);
+ EntryIndex++;
+ });
+ }
+ }
+ }
+}
+
+static constexpr uint32_t OplogMetaDataExpectedMagic = 0x6f'74'6d'62; // 'omta';
+
+void
+ProjectStore::Oplog::GetAttachmentsLocked(std::vector<IoHash>& OutAttachments, bool StoreMetaDataOnDisk)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::Oplog::GetAttachmentsLocked");
+ if (!m_Storage)
+ {
+ return;
+ }
+
+ if (StoreMetaDataOnDisk && m_MetaValid)
+ {
+ IoBuffer MetadataPayload = IoBufferBuilder::MakeFromFile(m_MetaPath);
+ if (MetadataPayload)
+ {
+ ZEN_TRACE_CPU("Store::Oplog::GetAttachmentsFromMetaData");
+ if (GetAttachmentsFromMetaData<Oid, IoHash>(
+ MetadataPayload,
+ OplogMetaDataExpectedMagic,
+ [&](std::span<const Oid> Keys, std::span<const uint32_t> AttachmentCounts, std::span<const IoHash> Attachments) {
+ ZEN_UNUSED(Keys);
+ ZEN_UNUSED(AttachmentCounts);
+ OutAttachments.insert(OutAttachments.end(), Attachments.begin(), Attachments.end());
+ }))
+ {
+ return;
+ }
+ }
+ }
+
+ std::vector<Oid> Keys;
+ std::vector<uint32_t> AttachmentCounts;
+ size_t AttachmentOffset = OutAttachments.size();
+
+ IterateOplogLocked(
+ [&](CbObjectView Op) {
+ using namespace std::literals;
+ size_t CurrentAttachmentCount = OutAttachments.size();
+ Op.IterateAttachments([&](CbFieldView Visitor) { OutAttachments.emplace_back(Visitor.AsAttachment()); });
+ if (StoreMetaDataOnDisk)
+ {
+ const Oid KeyHash = ComputeOpKey(Op);
+ Keys.push_back(KeyHash);
+ AttachmentCounts.push_back(gsl::narrow<uint32_t>(OutAttachments.size() - CurrentAttachmentCount));
+ }
+ },
+ Paging{});
+ if (StoreMetaDataOnDisk)
+ {
+ const IoHash* FirstAttachment = OutAttachments.data() + AttachmentOffset;
+ size_t AttachmentCount = OutAttachments.size() - AttachmentOffset;
+ IoBuffer MetaPayload = BuildReferenceMetaData<Oid>(OplogMetaDataExpectedMagic,
+ Keys,
+ AttachmentCounts,
+ std::span<const IoHash>(FirstAttachment, AttachmentCount))
+ .Flatten()
+ .AsIoBuffer();
+
+ const std::filesystem::path MetaPath = m_MetaPath;
+ std::error_code Ec;
+ TemporaryFile::SafeWriteFile(MetaPath, MetaPayload.GetView(), Ec);
+ if (Ec)
+ {
+ m_MetaValid = false;
+ ZEN_WARN("oplog '{}/{}': unable to set meta data meta path: '{}'. Reason: '{}'",
+ m_OuterProjectId,
+ m_OplogId,
+ MetaPath,
+ Ec.message());
+ }
+ else
+ {
+ m_MetaValid = true;
+ }
+ }
+}
+
+size_t
+ProjectStore::Oplog::GetOplogEntryCount() const
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return 0;
+ }
+ return m_OpToPayloadOffsetMap.size();
+}
+
+ProjectStore::LogSequenceNumber
+ProjectStore::Oplog::GetOpIndexByKey(const Oid& Key)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return {};
+ }
+ if (const auto LatestOp = m_OpToPayloadOffsetMap.find(Key); LatestOp != m_OpToPayloadOffsetMap.end())
+ {
+ return m_OpLogPayloads[LatestOp->second].Lsn;
+ }
+ return {};
+}
+
+std::optional<CbObject>
+ProjectStore::Oplog::GetOpByKey(const Oid& Key)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return {};
+ }
+
+ if (const auto LatestOp = m_OpToPayloadOffsetMap.find(Key); LatestOp != m_OpToPayloadOffsetMap.end())
+ {
+ return m_Storage->GetOp(m_OpLogPayloads[LatestOp->second].Address);
+ }
+
+ return {};
+}
+
+std::optional<CbObject>
+ProjectStore::Oplog::GetOpByIndex(ProjectStore::LogSequenceNumber Index)
+{
+ {
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return {};
+ }
+
+ if (m_LsnToPayloadOffsetMap)
+ {
+ if (auto It = m_LsnToPayloadOffsetMap->find(Index); It != m_LsnToPayloadOffsetMap->end())
+ {
+ return m_Storage->GetOp(m_OpLogPayloads[It->second].Address);
+ }
+ }
+ }
+
+ RwLock::ExclusiveLockScope Lock(m_OplogLock);
+ if (!m_Storage)
+ {
+ return {};
+ }
+
+ RefreshLsnToPayloadOffsetMap(Lock);
+ if (auto It = m_LsnToPayloadOffsetMap->find(Index); It != m_LsnToPayloadOffsetMap->end())
+ {
+ return m_Storage->GetOp(m_OpLogPayloads[It->second].Address);
+ }
+ return {};
+}
+
+void
+ProjectStore::Oplog::CaptureAddedAttachments(std::span<const IoHash> AttachmentHashes)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ m_OplogLock.WithExclusiveLock([this, AttachmentHashes]() {
+ if (m_CapturedAttachments)
+ {
+ m_CapturedAttachments->reserve(m_CapturedAttachments->size() + AttachmentHashes.size());
+ m_CapturedAttachments->insert(m_CapturedAttachments->end(), AttachmentHashes.begin(), AttachmentHashes.end());
+ }
+ });
+}
+
+void
+ProjectStore::Oplog::EnableUpdateCapture()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ m_OplogLock.WithExclusiveLock([&]() {
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ ZEN_ASSERT(!m_CapturedOps);
+ ZEN_ASSERT(!m_CapturedAttachments);
+ m_CapturedOps = std::make_unique<std::vector<Oid>>();
+ m_CapturedAttachments = std::make_unique<std::vector<IoHash>>();
+ }
+ else
+ {
+ ZEN_ASSERT(m_CapturedOps);
+ ZEN_ASSERT(m_CapturedAttachments);
+ }
+ m_UpdateCaptureRefCounter++;
+ });
+}
+
+void
+ProjectStore::Oplog::DisableUpdateCapture()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ m_OplogLock.WithExclusiveLock([&]() {
+ ZEN_ASSERT(m_CapturedOps);
+ ZEN_ASSERT(m_CapturedAttachments);
+ ZEN_ASSERT(m_UpdateCaptureRefCounter > 0);
+ m_UpdateCaptureRefCounter--;
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ m_CapturedOps.reset();
+ m_CapturedAttachments.reset();
+ }
+ });
+}
+
+void
+ProjectStore::Oplog::IterateCapturedOpsLocked(
+ std::function<bool(const Oid& Key, ProjectStore::LogSequenceNumber LSN, const CbObjectView& UpdateOp)>&& Callback)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ if (m_CapturedOps)
+ {
+ if (!m_Storage)
+ {
+ return;
+ }
+ for (const Oid& OpKey : *m_CapturedOps)
+ {
+ if (const auto AddressEntryIt = m_OpToPayloadOffsetMap.find(OpKey); AddressEntryIt != m_OpToPayloadOffsetMap.end())
+ {
+ const OplogPayload& OpPayload = m_OpLogPayloads[AddressEntryIt->second];
+ Callback(OpKey, OpPayload.Lsn, m_Storage->GetOp(OpPayload.Address));
+ }
+ }
+ }
+}
+
+std::vector<IoHash>
+ProjectStore::Oplog::GetCapturedAttachmentsLocked()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ if (m_CapturedAttachments)
+ {
+ return *m_CapturedAttachments;
+ }
+ return {};
+}
+
+std::vector<IoHash>
+ProjectStore::Oplog::CheckPendingChunkReferences(std::span<const IoHash> ChunkHashes, const GcClock::Duration& RetainTime)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ m_OplogLock.WithExclusiveLock([&]() {
+ GcClock::TimePoint Now = GcClock::Now();
+ if (m_PendingPrepOpAttachmentsRetainEnd < Now)
+ {
+ m_PendingPrepOpAttachments.clear();
+ }
+ m_PendingPrepOpAttachments.insert(ChunkHashes.begin(), ChunkHashes.end());
+ GcClock::TimePoint NewEndPoint = Now + RetainTime;
+ if (m_PendingPrepOpAttachmentsRetainEnd < NewEndPoint)
+ {
+ m_PendingPrepOpAttachmentsRetainEnd = NewEndPoint;
+ }
+ });
+
+ std::vector<IoHash> MissingChunks;
+ MissingChunks.reserve(ChunkHashes.size());
+ for (const IoHash& FileHash : ChunkHashes)
+ {
+ if (!m_CidStore.ContainsChunk(FileHash))
+ {
+ MissingChunks.push_back(FileHash);
+ }
+ }
+
+ return MissingChunks;
+}
+
+void
+ProjectStore::Oplog::RemovePendingChunkReferences(std::span<const IoHash> ChunkHashes)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ m_OplogLock.WithExclusiveLock([&]() {
+ GcClock::TimePoint Now = GcClock::Now();
+ if (m_PendingPrepOpAttachmentsRetainEnd < Now)
+ {
+ m_PendingPrepOpAttachments.clear();
+ }
+ else
+ {
+ for (const IoHash& Chunk : ChunkHashes)
+ {
+ m_PendingPrepOpAttachments.erase(Chunk);
+ }
+ }
+ });
+}
+
+std::vector<IoHash>
+ProjectStore::Oplog::GetPendingChunkReferencesLocked()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ std::vector<IoHash> Result;
+ Result.reserve(m_PendingPrepOpAttachments.size());
+ Result.insert(Result.end(), m_PendingPrepOpAttachments.begin(), m_PendingPrepOpAttachments.end());
+ GcClock::TimePoint Now = GcClock::Now();
+ if (m_PendingPrepOpAttachmentsRetainEnd < Now)
+ {
+ m_PendingPrepOpAttachments.clear();
+ }
+ return Result;
+}
+
+void
+ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&,
+ const Oid& FileId,
+ const IoHash& Hash,
+ std::string_view ServerPath,
+ std::string_view ClientPath)
+{
+ FileMapEntry Entry;
+
+ if (Hash != IoHash::Zero)
+ {
+ m_ChunkMap.insert_or_assign(FileId, Hash);
+ }
+ else
+ {
+ Entry.ServerPath = ServerPath;
+ }
+
+ Entry.ClientPath = ClientPath;
+
+ m_FileMap[FileId] = std::move(Entry);
+}
+
+void
+ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, const Oid& ChunkId, const IoHash& Hash)
+{
+ m_ChunkMap.insert_or_assign(ChunkId, Hash);
+}
+
+void
+ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, const Oid& ChunkId, const IoHash& Hash)
+{
+ m_MetaMap.insert_or_assign(ChunkId, Hash);
+}
+
+ProjectStore::Oplog::OplogEntryMapping
+ProjectStore::Oplog::GetMapping(CbObjectView Core)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ using namespace std::literals;
+
+ OplogEntryMapping Result;
+
+ // Update chunk id maps
+ for (CbFieldView Field : Core)
+ {
+ std::string_view FieldName = Field.GetName();
+ if (FieldName == "package"sv)
+ {
+ CbObjectView PackageObj = Field.AsObjectView();
+ Oid Id = PackageObj["id"sv].AsObjectId();
+ IoHash Hash = PackageObj["data"sv].AsBinaryAttachment();
+ Result.Chunks.emplace_back(ChunkMapping{Id, Hash});
+ ZEN_DEBUG("oplog {}/{}: package data {} -> {}", m_OuterProjectId, m_OplogId, Id, Hash);
+ continue;
+ }
+ if (FieldName == "bulkdata"sv)
+ {
+ CbArrayView BulkDataArray = Field.AsArrayView();
+ for (CbFieldView& Entry : BulkDataArray)
+ {
+ CbObjectView BulkObj = Entry.AsObjectView();
+ Oid Id = BulkObj["id"sv].AsObjectId();
+ IoHash Hash = BulkObj["data"sv].AsBinaryAttachment();
+ Result.Chunks.emplace_back(ChunkMapping{Id, Hash});
+ ZEN_DEBUG("oplog {}/{}: bulkdata {} -> {}", m_OuterProjectId, m_OplogId, Id, Hash);
+ }
+ continue;
+ }
+ if (FieldName == "packagedata"sv)
+ {
+ CbArrayView PackageDataArray = Field.AsArrayView();
+ for (CbFieldView& Entry : PackageDataArray)
+ {
+ CbObjectView PackageDataObj = Entry.AsObjectView();
+ Oid Id = PackageDataObj["id"sv].AsObjectId();
+ IoHash Hash = PackageDataObj["data"sv].AsBinaryAttachment();
+ Result.Chunks.emplace_back(ChunkMapping{Id, Hash});
+ ZEN_DEBUG("oplog {}/{}: package {} -> {}", m_OuterProjectId, m_OplogId, Id, Hash);
+ }
+ continue;
+ }
+ if (FieldName == "files"sv)
+ {
+ CbArrayView FilesArray = Field.AsArrayView();
+ Result.Files.reserve(FilesArray.Num());
+ for (CbFieldView& Entry : FilesArray)
+ {
+ CbObjectView FileObj = Entry.AsObjectView();
+
+ std::string_view ServerPath = FileObj["serverpath"sv].AsString();
+ std::string_view ClientPath = FileObj["clientpath"sv].AsString();
+ Oid Id = FileObj["id"sv].AsObjectId();
+ IoHash Hash = FileObj["data"sv].AsBinaryAttachment();
+ if (ServerPath.empty() && Hash == IoHash::Zero)
+ {
+ ZEN_WARN("oplog {}/{}: invalid file for entry '{}', missing both 'serverpath' and 'data' fields",
+ m_OuterProjectId,
+ m_OplogId,
+ Id);
+ continue;
+ }
+ if (ClientPath.empty())
+ {
+ ZEN_WARN("oplog {}/{}: invalid file for entry '{}', missing 'clientpath' field", m_OuterProjectId, m_OplogId, Id);
+ continue;
+ }
+
+ Result.Files.emplace_back(FileMapping{Id, Hash, std::string(ServerPath), std::string(ClientPath)});
+ ZEN_DEBUG("oplog {}/{}: file {} -> {}, ServerPath: {}, ClientPath: {}",
+ m_OuterProjectId,
+ m_OplogId,
+ Id,
+ Hash,
+ ServerPath,
+ ClientPath);
+ }
+ continue;
+ }
+ if (FieldName == "meta"sv)
+ {
+ CbArrayView MetaArray = Field.AsArrayView();
+ Result.Meta.reserve(MetaArray.Num());
+ for (CbFieldView& Entry : MetaArray)
+ {
+ CbObjectView MetaObj = Entry.AsObjectView();
+ Oid Id = MetaObj["id"sv].AsObjectId();
+ IoHash Hash = MetaObj["data"sv].AsBinaryAttachment();
+ Result.Meta.emplace_back(ChunkMapping{Id, Hash});
+ auto NameString = MetaObj["name"sv].AsString();
+ ZEN_DEBUG("oplog {}/{}: meta data ({}) {} -> {}", m_OuterProjectId, m_OplogId, NameString, Id, Hash);
+ }
+ continue;
+ }
+ }
+
+ return Result;
+}
+
+ProjectStore::LogSequenceNumber
+ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
+ const OplogEntryMapping& OpMapping,
+ const OplogEntry& OpEntry)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing
+ // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however
+
+ using namespace std::literals;
+
+ // Update chunk id maps
+ for (const ChunkMapping& Chunk : OpMapping.Chunks)
+ {
+ AddChunkMapping(OplogLock, Chunk.Id, Chunk.Hash);
+ }
+
+ for (const FileMapping& File : OpMapping.Files)
+ {
+ AddFileMapping(OplogLock, File.Id, File.Hash, File.ServerPath, File.ClientPath);
+ }
+
+ for (const ChunkMapping& Meta : OpMapping.Meta)
+ {
+ AddMetaMapping(OplogLock, Meta.Id, Meta.Hash);
+ }
+
+ const PayloadIndex PayloadOffset(m_OpLogPayloads.size());
+ m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset);
+ m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress});
+
+ if (m_LsnToPayloadOffsetMap)
+ {
+ m_LsnToPayloadOffsetMap->insert_or_assign(OpEntry.OpLsn, PayloadOffset);
+ }
+ return OpEntry.OpLsn;
+}
+
+ProjectStore::LogSequenceNumber
+ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
+{
+ ZEN_ASSERT(m_Mode == EMode::kFull);
+
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntry");
+
+ const CbObject& Core = OpPackage.GetObject();
+ const ProjectStore::LogSequenceNumber EntryId = AppendNewOplogEntry(Core);
+ if (!EntryId)
+ {
+ // The oplog has been deleted so just drop this
+ return EntryId;
+ }
+
+ // Persist attachments after oplog entry so GC won't find attachments without references
+
+ uint64_t AttachmentBytes = 0;
+ uint64_t NewAttachmentBytes = 0;
+
+ auto Attachments = OpPackage.GetAttachments();
+
+ if (!Attachments.empty())
+ {
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ std::vector<uint64_t> WriteRawSizes;
+
+ WriteAttachmentBuffers.reserve(Attachments.size());
+ WriteRawHashes.reserve(Attachments.size());
+ WriteRawSizes.reserve(Attachments.size());
+
+ for (const auto& Attach : Attachments)
+ {
+ ZEN_ASSERT(Attach.IsCompressedBinary());
+
+ const CompressedBuffer& AttachmentData = Attach.AsCompressedBinary();
+ const uint64_t AttachmentSize = AttachmentData.DecodeRawSize();
+ WriteAttachmentBuffers.push_back(AttachmentData.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(Attach.GetHash());
+ WriteRawSizes.push_back(AttachmentSize);
+ AttachmentBytes += AttachmentSize;
+ }
+
+ std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < InsertResults.size(); Index++)
+ {
+ if (InsertResults[Index].New)
+ {
+ NewAttachmentBytes += WriteRawSizes[Index];
+ }
+ }
+ }
+
+ ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId.Number, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes));
+
+ return EntryId;
+}
+
+RefPtr<ProjectStore::OplogStorage>
+ProjectStore::Oplog::GetStorage()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ RefPtr<OplogStorage> Storage;
+ {
+ RwLock::SharedLockScope _(m_OplogLock);
+ Storage = m_Storage;
+ }
+ return Storage;
+}
+
+ProjectStore::LogSequenceNumber
+ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core)
+{
+ ZEN_ASSERT(m_Mode == EMode::kFull);
+
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntry");
+
+ using namespace std::literals;
+
+ RefPtr<OplogStorage> Storage = GetStorage();
+ if (!Storage)
+ {
+ return {};
+ }
+
+ OplogEntryMapping Mapping = GetMapping(Core);
+ OplogStorage::AppendOpData OpData = OplogStorage::GetAppendOpData(Core);
+
+ const OplogEntry OpEntry = Storage->AppendOp(OpData);
+
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ const ProjectStore::LogSequenceNumber EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry);
+ if (m_CapturedOps)
+ {
+ m_CapturedOps->push_back(OpData.KeyHash);
+ }
+ m_MetaValid = false;
+
+ return EntryId;
+}
+
+std::vector<ProjectStore::LogSequenceNumber>
+ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores)
+{
+ ZEN_ASSERT(m_Mode == EMode::kFull);
+
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntries");
+
+ using namespace std::literals;
+
+ RefPtr<OplogStorage> Storage = GetStorage();
+ if (!Storage)
+ {
+ return std::vector<ProjectStore::LogSequenceNumber>(Cores.size(), LogSequenceNumber{});
+ }
+
+ size_t OpCount = Cores.size();
+ std::vector<OplogEntryMapping> Mappings;
+ std::vector<OplogStorage::AppendOpData> OpDatas;
+ Mappings.resize(OpCount);
+ OpDatas.resize(OpCount);
+
+ for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
+ {
+ const CbObjectView& Core = Cores[OpIndex];
+ OpDatas[OpIndex] = OplogStorage::GetAppendOpData(Core);
+ Mappings[OpIndex] = GetMapping(Core);
+ }
+
+ std::vector<OplogEntry> OpEntries = Storage->AppendOps(OpDatas);
+
+ std::vector<ProjectStore::LogSequenceNumber> EntryIds;
+ EntryIds.resize(OpCount);
+ {
+ {
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ if (m_CapturedOps)
+ {
+ m_CapturedOps->reserve(m_CapturedOps->size() + OpCount);
+ }
+ for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
+ {
+ EntryIds[OpIndex] = RegisterOplogEntry(OplogLock, Mappings[OpIndex], OpEntries[OpIndex]);
+ if (m_CapturedOps)
+ {
+ m_CapturedOps->push_back(OpDatas[OpIndex].KeyHash);
+ }
+ }
+ }
+ m_MetaValid = false;
+ }
+ return EntryIds;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ProjectStore::Project::Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath)
+: m_ProjectStore(PrjStore)
+, m_CidStore(Store)
+, m_OplogStoragePath(BasePath)
+, m_LastAccessTimes({std::make_pair(std::string(), GcClock::TickCount())})
+{
+}
+
+ProjectStore::Project::~Project()
+{
+ // Only write access times if we have not been explicitly deleted
+ if (!m_OplogStoragePath.empty())
+ {
+ WriteAccessTimes();
+ }
+}
+
+bool
+ProjectStore::Project::Exists(const std::filesystem::path& BasePath)
+{
+ return IsFile(BasePath / "Project.zcb");
+}
+
+void
+ProjectStore::Project::Read()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Project::Read");
+
+ using namespace std::literals;
+
+ std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv;
+
+ ZEN_DEBUG("project '{}': reading config from '{}'", Identifier, ProjectStateFilePath);
+
+ BasicFile Blob;
+ Blob.Open(ProjectStateFilePath, BasicFile::Mode::kRead);
+
+ IoBuffer Obj = Blob.ReadAll();
+ CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All);
+
+ if (ValidationError == CbValidateError::None)
+ {
+ CbObject Cfg = LoadCompactBinaryObject(Obj);
+
+ Identifier = std::filesystem::path(Cfg["id"sv].AsU8String()).string();
+ RootDir = std::filesystem::path(Cfg["root"sv].AsU8String()).string();
+ ProjectRootDir = std::filesystem::path(Cfg["project"sv].AsU8String()).string();
+ EngineRootDir = std::filesystem::path(Cfg["engine"sv].AsU8String()).string();
+ ProjectFilePath = std::filesystem::path(Cfg["projectfile"sv].AsU8String()).string();
+ }
+ else
+ {
+ ZEN_ERROR("validation error {} hit for '{}'", ToString(ValidationError), ProjectStateFilePath);
+ }
+
+ ReadAccessTimes();
+}
+
+void
+ProjectStore::Project::Write()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Project::Write");
+
+ using namespace std::literals;
+
+ BinaryWriter Mem;
+
+ CbObjectWriter Cfg;
+ Cfg << "id"sv << Identifier;
+ Cfg << "root"sv << PathToUtf8(RootDir);
+ Cfg << "project"sv << PathToUtf8(ProjectRootDir);
+ Cfg << "engine"sv << PathToUtf8(EngineRootDir);
+ Cfg << "projectfile"sv << PathToUtf8(ProjectFilePath);
+
+ Cfg.Save(Mem);
+
+ CreateDirectories(m_OplogStoragePath);
+
+ std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv;
+
+ ZEN_INFO("project '{}': persisting config to '{}'", Identifier, ProjectStateFilePath);
+
+ TemporaryFile::SafeWriteFile(ProjectStateFilePath, Mem.GetView());
+}
+
+void
+ProjectStore::Project::ReadAccessTimes()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ using namespace std::literals;
+
+ std::filesystem::path ProjectAccessTimesFilePath = m_OplogStoragePath / "AccessTimes.zcb"sv;
+ if (!IsFile(ProjectAccessTimesFilePath))
+ {
+ return;
+ }
+
+ ZEN_DEBUG("project '{}': reading access times '{}'", Identifier, ProjectAccessTimesFilePath);
+
+ BasicFile Blob;
+ Blob.Open(ProjectAccessTimesFilePath, BasicFile::Mode::kRead);
+
+ IoBuffer Obj = Blob.ReadAll();
+ CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All);
+
+ if (ValidationError == CbValidateError::None)
+ {
+ CbObject Reader = LoadCompactBinaryObject(Obj);
+
+ uint64_t Count = Reader["count"sv].AsUInt64(0);
+ if (Count > 0)
+ {
+ std::vector<uint64_t> Ticks;
+ Ticks.reserve(Count);
+ CbArrayView TicksArray = Reader["ticks"sv].AsArrayView();
+ for (CbFieldView& TickView : TicksArray)
+ {
+ Ticks.emplace_back(TickView.AsUInt64());
+ }
+ CbArrayView IdArray = Reader["ids"sv].AsArrayView();
+ uint64_t Index = 0;
+
+ for (CbFieldView& IdView : IdArray)
+ {
+ std::string_view Id = IdView.AsString();
+ m_LastAccessTimes.insert_or_assign(std::string(Id), Ticks[Index++]);
+ }
+ }
+
+ ////// Legacy format read
+ {
+ CbArrayView LastAccessTimes = Reader["lastaccess"sv].AsArrayView();
+ for (CbFieldView& Entry : LastAccessTimes)
+ {
+ CbObjectView AccessTime = Entry.AsObjectView();
+ std::string_view Id = AccessTime["id"sv].AsString();
+ GcClock::Tick AccessTick = AccessTime["tick"sv].AsUInt64();
+ m_LastAccessTimes.insert_or_assign(std::string(Id), AccessTick);
+ }
+ }
+ }
+ else
+ {
+ ZEN_WARN("project '{}': validation error {} hit for '{}'", Identifier, ToString(ValidationError), ProjectAccessTimesFilePath);
+ }
+}
+
+void
+ProjectStore::Project::WriteAccessTimes()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ using namespace std::literals;
+
+ CbObjectWriter Writer(32 + (m_LastAccessTimes.size() * 16));
+
+ {
+ RwLock::SharedLockScope _(m_LastAccessTimesLock);
+
+ Writer.AddInteger("count", gsl::narrow<uint64_t>(m_LastAccessTimes.size()));
+ Writer.BeginArray("ids");
+
+ for (const auto& It : m_LastAccessTimes)
+ {
+ Writer << It.first;
+ }
+ Writer.EndArray();
+ Writer.BeginArray("ticks");
+ for (const auto& It : m_LastAccessTimes)
+ {
+ Writer << gsl::narrow<uint64_t>(It.second);
+ }
+ Writer.EndArray();
+ }
+
+ CbObject Data = Writer.Save();
+
+ try
+ {
+ CreateDirectories(m_OplogStoragePath);
+
+ std::filesystem::path ProjectAccessTimesFilePath = m_OplogStoragePath / "AccessTimes.zcb"sv;
+
+ ZEN_DEBUG("project '{}': persisting access times for '{}'", Identifier, ProjectAccessTimesFilePath);
+
+ TemporaryFile::SafeWriteFile(ProjectAccessTimesFilePath, Data.GetView());
+ }
+ catch (const std::exception& Err)
+ {
+ ZEN_WARN("project '{}': writing access times FAILED, reason: '{}'", Identifier, Err.what());
+ }
+}
+
+LoggerRef
+ProjectStore::Project::Log() const
+{
+ return m_ProjectStore->Log();
+}
+
+std::filesystem::path
+ProjectStore::Project::BasePathForOplog(std::string_view OplogId) const
+{
+ return m_OplogStoragePath / OplogId;
+}
+
+Ref<ProjectStore::Oplog>
+ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ RwLock::ExclusiveLockScope _(m_ProjectLock);
+
+ std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
+
+ try
+ {
+ Stopwatch Timer;
+
+ Ref<Oplog> NewLog(new Oplog(Log(), Identifier, OplogId, m_CidStore, OplogBasePath, MarkerPath, Oplog::EMode::kFull));
+ m_Oplogs.insert_or_assign(std::string{OplogId}, NewLog);
+
+ NewLog->Write();
+
+ ZEN_INFO("oplog '{}/{}': created oplog at '{}' in {}",
+ Identifier,
+ OplogId,
+ OplogBasePath,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+
+ if (m_CapturedOplogs)
+ {
+ m_CapturedOplogs->push_back(std::string(OplogId));
+ }
+
+ return NewLog;
+ }
+ catch (const std::exception&)
+ {
+ // In case of failure we need to ensure there's no half constructed entry around
+ //
+ // (This is probably already ensured by the try_emplace implementation?)
+
+ m_Oplogs.erase(std::string{OplogId});
+
+ return {};
+ }
+}
+
+Ref<ProjectStore::Oplog>
+ProjectStore::Project::ReadOplog(std::string_view OplogId)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::OpenOplog");
+
+ std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
+
+ RwLock::SharedLockScope Lock(m_ProjectLock);
+
+ if (auto It = m_Oplogs.find(std::string{OplogId}); It != m_Oplogs.end())
+ {
+ if (Oplog::ExistsAt(OplogBasePath))
+ {
+ return It->second;
+ }
+ else
+ {
+ return {};
+ }
+ }
+
+ if (Oplog::ExistsAt(OplogBasePath))
+ {
+ Stopwatch Timer;
+
+ Ref<Oplog> ExistingLog(new Oplog(m_ProjectStore->Log(),
+ Identifier,
+ OplogId,
+ m_CidStore,
+ OplogBasePath,
+ std::filesystem::path{},
+ Oplog::EMode::kBasicReadOnly));
+
+ ExistingLog->Read();
+ Lock.ReleaseNow();
+
+ ZEN_INFO("oplog '{}/{}': read oplog at '{}' in {}", Identifier, OplogId, OplogBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+
+ return ExistingLog;
+ }
+ return {};
+}
+
+Ref<ProjectStore::Oplog>
+ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bool VerifyPathOnDisk)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::OpenOplog");
+
+ {
+ RwLock::SharedLockScope ProjectLock(m_ProjectLock);
+
+ auto OplogIt = m_Oplogs.find(std::string(OplogId));
+
+ if (OplogIt != m_Oplogs.end())
+ {
+ bool ReOpen = false;
+
+ if (VerifyPathOnDisk)
+ {
+ std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
+
+ if (!Oplog::ExistsAt(OplogBasePath))
+ {
+ // Somebody deleted the oplog on disk behind our back
+ ProjectLock.ReleaseNow();
+ std::filesystem::path DeletePath;
+ if (!RemoveOplog(OplogId, DeletePath))
+ {
+ ZEN_WARN("Failed to clean up deleted oplog {}/{}", Identifier, OplogId, OplogBasePath);
+ }
+
+ ReOpen = true;
+ }
+ }
+
+ if (!ReOpen)
+ {
+ return OplogIt->second;
+ }
+ }
+ }
+
+ std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
+
+ RwLock::ExclusiveLockScope Lock(m_ProjectLock);
+ if (auto It = m_Oplogs.find(std::string{OplogId}); It != m_Oplogs.end())
+ {
+ return It->second;
+ }
+ if (Oplog::ExistsAt(OplogBasePath))
+ {
+ try
+ {
+ Stopwatch Timer;
+
+ Ref<Oplog> ExistingLog(new Oplog(m_ProjectStore->Log(),
+ Identifier,
+ OplogId,
+ m_CidStore,
+ OplogBasePath,
+ std::filesystem::path{},
+ Oplog::EMode::kFull));
+
+ m_Oplogs.insert_or_assign(std::string{OplogId}, ExistingLog);
+ ExistingLog->Read();
+ Lock.ReleaseNow();
+
+ ZEN_INFO("oplog '{}/{}': opened oplog at '{}' in {}",
+ Identifier,
+ OplogId,
+ OplogBasePath,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+
+ if (AllowCompact)
+ {
+ const uint32_t CompactUnusedThreshold = 50;
+ ExistingLog->CompactIfUnusedExceeds(/*DryRun*/ false,
+ CompactUnusedThreshold,
+ fmt::format("Compact on initial open of oplog {}/{}: ", Identifier, OplogId));
+ }
+ return ExistingLog;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("oplog '{}/{}': failed to open oplog at '{}': {}", Identifier, OplogId, OplogBasePath, Ex.what());
+ m_Oplogs.erase(std::string{OplogId});
+ }
+ }
+
+ return {};
+}
+
+void
+ProjectStore::Oplog::CompactIfUnusedExceeds(bool DryRun, uint32_t CompactUnusedThreshold, std::string_view LogPrefix)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ if (!m_Storage)
+ {
+ return;
+ }
+ uint32_t UnusedPercent = GetUnusedSpacePercentLocked();
+ if (UnusedPercent >= CompactUnusedThreshold)
+ {
+ Compact(OplogLock,
+ DryRun,
+ /*RetainLSNs*/ m_Storage->MaxLSN() <=
+ 0xff000000ul, // If we have less than 16 miln entries left of our LSN range, allow renumbering of LSNs
+ LogPrefix);
+ }
+}
+
+bool
+ProjectStore::Project::TryUnloadOplog(std::string_view OplogId)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ RwLock::ExclusiveLockScope _(m_ProjectLock);
+ if (auto OplogIt = m_Oplogs.find(std::string(OplogId)); OplogIt != m_Oplogs.end())
+ {
+ Ref<Oplog>& Oplog = OplogIt->second;
+
+ if (Oplog->CanUnload())
+ {
+ m_Oplogs.erase(OplogIt);
+ return true;
+ }
+ return false;
+ }
+
+ return false;
+}
+
+bool
+ProjectStore::Project::RemoveOplog(std::string_view OplogId, std::filesystem::path& OutDeletePath)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ {
+ RwLock::ExclusiveLockScope _(m_ProjectLock);
+
+ if (auto OplogIt = m_Oplogs.find(std::string(OplogId)); OplogIt == m_Oplogs.end())
+ {
+ std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
+
+ if (Oplog::ExistsAt(OplogBasePath))
+ {
+ if (!PrepareDirectoryDelete(OplogBasePath, OutDeletePath))
+ {
+ return false;
+ }
+ }
+ }
+ else
+ {
+ Ref<Oplog>& Oplog = OplogIt->second;
+ if (!Oplog->PrepareForDelete(OutDeletePath))
+ {
+ return false;
+ }
+ m_Oplogs.erase(OplogIt);
+ }
+ }
+ m_LastAccessTimesLock.WithExclusiveLock([&]() { m_LastAccessTimes.erase(std::string(OplogId)); });
+ return true;
+}
+
+bool
+ProjectStore::Project::DeleteOplog(std::string_view OplogId)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ std::filesystem::path DeletePath;
+ if (!RemoveOplog(OplogId, DeletePath))
+ {
+ return false;
+ }
+
+ // Erase content on disk
+ if (!DeletePath.empty())
+ {
+ if (!OplogStorage::Delete(DeletePath))
+ {
+ ZEN_WARN("oplog '{}/{}': failed to remove old oplog path '{}'", Identifier, OplogId, DeletePath);
+ return false;
+ }
+ }
+ return true;
+}
+
+std::vector<std::string>
+ProjectStore::Project::ScanForOplogs() const
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ RwLock::SharedLockScope _(m_ProjectLock);
+
+ std::vector<std::string> Oplogs;
+ if (Project::Exists(m_OplogStoragePath))
+ {
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_OplogStoragePath, DirectoryContentFlags::IncludeDirs, DirContent);
+ Oplogs.reserve(DirContent.Directories.size());
+ for (const std::filesystem::path& DirPath : DirContent.Directories)
+ {
+ std::string DirName = PathToUtf8(DirPath.filename());
+ if (DirName.starts_with("[dropped]"))
+ {
+ continue;
+ }
+ if (Oplog::ExistsAt(DirPath))
+ {
+ Oplogs.push_back(DirPath.filename().string());
+ }
+ }
+ }
+
+ return Oplogs;
+}
+
+void
+ProjectStore::Project::IterateOplogs(std::function<void(const RwLock::SharedLockScope&, const Oplog&)>&& Fn) const
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ RwLock::SharedLockScope Lock(m_ProjectLock);
+
+ for (auto& Kv : m_Oplogs)
+ {
+ Fn(Lock, *Kv.second);
+ }
+}
+
+void
+ProjectStore::Project::IterateOplogs(std::function<void(const RwLock::SharedLockScope&, Oplog&)>&& Fn)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ RwLock::SharedLockScope Lock(m_ProjectLock);
+
+ for (auto& Kv : m_Oplogs)
+ {
+ Fn(Lock, *Kv.second);
+ }
+}
+
+void
+ProjectStore::Project::Flush()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Project::Flush");
+
+ // We only need to flush oplogs that we have already loaded
+ IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) { Ops.Flush(); });
+ WriteAccessTimes();
+}
+
+void
+ProjectStore::Project::Scrub(ScrubContext& Ctx)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ // Scrubbing needs to check all existing oplogs
+ std::vector<std::string> OpLogs = ScanForOplogs();
+ for (const std::string& OpLogId : OpLogs)
+ {
+ OpenOplog(OpLogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true);
+ }
+ IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) {
+ if (!IsExpired(GcClock::TimePoint::min(), Ops))
+ {
+ Ops.Scrub(Ctx);
+ }
+ });
+}
+
+uint64_t
+ProjectStore::Project::TotalSize(const std::filesystem::path& BasePath)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ using namespace std::literals;
+
+ uint64_t Size = 0;
+ std::filesystem::path AccessTimesFilePath = BasePath / "AccessTimes.zcb"sv;
+ if (IsFile(AccessTimesFilePath))
+ {
+ Size += FileSizeFromPath(AccessTimesFilePath);
+ }
+ std::filesystem::path ProjectFilePath = BasePath / "Project.zcb"sv;
+ if (IsFile(ProjectFilePath))
+ {
+ Size += FileSizeFromPath(ProjectFilePath);
+ }
+
+ return Size;
+}
+
+uint64_t
+ProjectStore::Project::TotalSize() const
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ uint64_t Result = TotalSize(m_OplogStoragePath);
+ {
+ std::vector<std::string> OpLogs = ScanForOplogs();
+ for (const std::string& OpLogId : OpLogs)
+ {
+ std::filesystem::path OplogBasePath = BasePathForOplog(OpLogId);
+ Result += Oplog::TotalSize(OplogBasePath);
+ }
+ }
+ return Result;
+}
+
+bool
+ProjectStore::Project::PrepareForDelete(std::filesystem::path& OutDeletePath)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ RwLock::ExclusiveLockScope _(m_ProjectLock);
+
+ for (auto& It : m_Oplogs)
+ {
+ It.second->ResetState();
+ }
+
+ m_Oplogs.clear();
+
+ bool Success = PrepareDirectoryDelete(m_OplogStoragePath, OutDeletePath);
+ if (!Success)
+ {
+ return false;
+ }
+ m_OplogStoragePath.clear();
+ return true;
+}
+
+void
+ProjectStore::Project::EnableUpdateCapture()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ m_ProjectLock.WithExclusiveLock([&]() {
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ ZEN_ASSERT(!m_CapturedOplogs);
+ m_CapturedOplogs = std::make_unique<std::vector<std::string>>();
+ }
+ else
+ {
+ ZEN_ASSERT(m_CapturedOplogs);
+ }
+ m_UpdateCaptureRefCounter++;
+ });
+}
+
+void
+ProjectStore::Project::DisableUpdateCapture()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ m_ProjectLock.WithExclusiveLock([&]() {
+ ZEN_ASSERT(m_CapturedOplogs);
+ ZEN_ASSERT(m_UpdateCaptureRefCounter > 0);
+ m_UpdateCaptureRefCounter--;
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ m_CapturedOplogs.reset();
+ }
+ });
+}
+
+std::vector<std::string>
+ProjectStore::Project::GetCapturedOplogsLocked()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ if (m_CapturedOplogs)
+ {
+ return *m_CapturedOplogs;
+ }
+ return {};
+}
+
+std::vector<RwLock::SharedLockScope>
+ProjectStore::Project::GetGcReferencerLocks()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ std::vector<RwLock::SharedLockScope> Locks;
+ Locks.emplace_back(RwLock::SharedLockScope(m_ProjectLock));
+ Locks.reserve(1 + m_Oplogs.size());
+ for (auto& Kv : m_Oplogs)
+ {
+ Locks.emplace_back(Kv.second->GetGcReferencerLock());
+ }
+ return Locks;
+}
+
+bool
+ProjectStore::Project::IsExpired(const std::string& EntryName,
+ const std::filesystem::path& MarkerPath,
+ const GcClock::TimePoint ExpireTime) const
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ if (!MarkerPath.empty())
+ {
+ std::error_code Ec;
+ if (IsFile(MarkerPath, Ec))
+ {
+ if (Ec)
+ {
+ ZEN_WARN("{} '{}{}{}', Failed to check expiry via marker file '{}', assuming not expired",
+ EntryName.empty() ? "project"sv : "oplog"sv,
+ Identifier,
+ EntryName.empty() ? ""sv : "/"sv,
+ EntryName,
+ MarkerPath.string());
+ return false;
+ }
+ return false;
+ }
+ }
+
+ const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
+
+ RwLock::SharedLockScope _(m_LastAccessTimesLock);
+ if (auto It = m_LastAccessTimes.find(EntryName); It != m_LastAccessTimes.end())
+ {
+ if (It->second <= ExpireTicks)
+ {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool
+ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime) const
+{
+ return IsExpired(std::string(), ProjectFilePath, ExpireTime);
+}
+
+bool
+ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog) const
+{
+ return IsExpired(Oplog.OplogId(), Oplog.MarkerPath(), ExpireTime);
+}
+
+bool
+ProjectStore::Project::IsOplogTouchedSince(const GcClock::TimePoint TouchTime, std::string_view Oplog) const
+{
+ const GcClock::Tick TouchTicks = TouchTime.time_since_epoch().count();
+
+ RwLock::ExclusiveLockScope _(m_LastAccessTimesLock);
+ if (auto It = m_LastAccessTimes.find(std::string(Oplog)); It != m_LastAccessTimes.end())
+ {
+ if (It->second > TouchTicks)
+ {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool
+ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime, std::string_view OplogId) const
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ using namespace std::literals;
+
+ {
+ RwLock::SharedLockScope Lock(m_ProjectLock);
+ auto OplogIt = m_Oplogs.find(std::string(OplogId));
+
+ if (OplogIt != m_Oplogs.end())
+ {
+ Lock.ReleaseNow();
+ return IsExpired(ExpireTime, *OplogIt->second);
+ }
+ }
+
+ std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
+ std::optional<CbObject> OplogConfig = Oplog::ReadStateFile(OplogBasePath, [this]() { return Log(); });
+ std::filesystem::path MarkerPath = OplogConfig.has_value() ? OplogConfig.value()["gcpath"sv].AsU8String() : std::u8string();
+ return IsExpired(std::string(OplogId), MarkerPath, ExpireTime);
+}
+
+void
+ProjectStore::Project::TouchProject()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ RwLock::ExclusiveLockScope _(m_LastAccessTimesLock);
+ m_LastAccessTimes.insert_or_assign(std::string(), GcClock::TickCount());
+}
+
+void
+ProjectStore::Project::TouchOplog(std::string_view Oplog)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_ASSERT(!Oplog.empty());
+
+ RwLock::ExclusiveLockScope _(m_LastAccessTimesLock);
+ m_LastAccessTimes.insert_or_assign(std::string(Oplog), GcClock::TickCount());
+}
+
+GcClock::TimePoint
+ProjectStore::Project::LastOplogAccessTime(std::string_view Oplog) const
+{
+ RwLock::SharedLockScope _(m_LastAccessTimesLock);
+ if (auto It = m_LastAccessTimes.find(std::string(Oplog)); It != m_LastAccessTimes.end())
+ {
+ return GcClock::TimePointFromTick(It->second);
+ }
+ return GcClock::TimePoint::min();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, const Configuration& Config)
+: m_Log(logging::Get("project"))
+, m_Gc(Gc)
+, m_CidStore(Store)
+, m_ProjectBasePath(BasePath)
+, m_Config(Config)
+, m_DiskWriteBlocker(Gc.GetDiskWriteBlocker())
+{
+ ZEN_INFO("initializing project store at '{}'", m_ProjectBasePath);
+ // m_Log.set_level(spdlog::level::debug);
+ m_Gc.AddGcStorage(this);
+ m_Gc.AddGcReferencer(*this);
+ m_Gc.AddGcReferenceLocker(*this);
+}
+
+ProjectStore::~ProjectStore()
+{
+ ZEN_INFO("closing project store at '{}'", m_ProjectBasePath);
+ m_Gc.RemoveGcReferenceLocker(*this);
+ m_Gc.RemoveGcReferencer(*this);
+ m_Gc.RemoveGcStorage(this);
+}
+
+std::filesystem::path
+ProjectStore::BasePathForProject(std::string_view ProjectId)
+{
+ return m_ProjectBasePath / ProjectId;
+}
+
+void
+ProjectStore::DiscoverProjects()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ if (!IsDir(m_ProjectBasePath))
+ {
+ return;
+ }
+
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_ProjectBasePath, DirectoryContentFlags::IncludeDirs, DirContent);
+
+ for (const std::filesystem::path& DirPath : DirContent.Directories)
+ {
+ std::string DirName = PathToUtf8(DirPath.filename());
+ if (DirName.starts_with("[dropped]"))
+ {
+ continue;
+ }
+ OpenProject(DirName);
+ }
+}
+
+void
+ProjectStore::IterateProjects(std::function<void(Project& Prj)>&& Fn)
+{
+ RwLock::SharedLockScope _(m_ProjectsLock);
+
+ for (auto& Kv : m_Projects)
+ {
+ Fn(*Kv.second.Get());
+ }
+}
+
+void
+ProjectStore::Flush()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::Flush");
+
+ ZEN_INFO("flushing project store at '{}'", m_ProjectBasePath);
+ std::vector<Ref<Project>> Projects;
+ {
+ RwLock::SharedLockScope _(m_ProjectsLock);
+ Projects.reserve(m_Projects.size());
+
+ for (auto& Kv : m_Projects)
+ {
+ Projects.push_back(Kv.second);
+ }
+ }
+
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst);
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
+ try
+ {
+ for (const Ref<Project>& Project : Projects)
+ {
+ Work.ScheduleWork(
+ WorkerPool,
+ [this, Project](std::atomic<bool>&) {
+ try
+ {
+ Project->Flush();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what());
+ }
+ },
+ 0);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed projects in {}. Reason: '{}'", m_ProjectBasePath, Ex.what());
+ }
+
+ Work.Wait();
+}
+
+void
+ProjectStore::ScrubStorage(ScrubContext& Ctx)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_INFO("scrubbing '{}'", m_ProjectBasePath);
+
+ DiscoverProjects();
+
+ std::vector<Ref<Project>> Projects;
+ {
+ RwLock::SharedLockScope Lock(m_ProjectsLock);
+ Projects.reserve(m_Projects.size());
+
+ for (auto& Kv : m_Projects)
+ {
+ if (Kv.second->IsExpired(GcClock::TimePoint::min()))
+ {
+ continue;
+ }
+ Projects.push_back(Kv.second);
+ }
+ }
+ for (const Ref<Project>& Project : Projects)
+ {
+ Project->Scrub(Ctx);
+ }
+}
+
+GcStorageSize
+ProjectStore::StorageSize() const
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::StorageSize");
+
+ using namespace std::literals;
+
+ GcStorageSize Result;
+ {
+ if (IsDir(m_ProjectBasePath))
+ {
+ DirectoryContent ProjectsFolderContent;
+ GetDirectoryContent(m_ProjectBasePath, DirectoryContentFlags::IncludeDirs, ProjectsFolderContent);
+
+ for (const std::filesystem::path& ProjectBasePath : ProjectsFolderContent.Directories)
+ {
+ std::filesystem::path ProjectStateFilePath = ProjectBasePath / "Project.zcb"sv;
+ if (IsFile(ProjectStateFilePath))
+ {
+ Result.DiskSize += Project::TotalSize(ProjectBasePath);
+ DirectoryContent DirContent;
+ GetDirectoryContent(ProjectBasePath, DirectoryContentFlags::IncludeDirs, DirContent);
+ for (const std::filesystem::path& OplogBasePath : DirContent.Directories)
+ {
+ Result.DiskSize += Oplog::TotalSize(OplogBasePath);
+ }
+ }
+ }
+ }
+ }
+ return Result;
+}
+
+Ref<ProjectStore::Project>
+ProjectStore::OpenProject(std::string_view ProjectId)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::OpenProject");
+
+ {
+ RwLock::SharedLockScope _(m_ProjectsLock);
+ if (auto ProjIt = m_Projects.find(std::string{ProjectId}); ProjIt != m_Projects.end())
+ {
+ return ProjIt->second;
+ }
+ }
+
+ RwLock::ExclusiveLockScope _(m_ProjectsLock);
+ if (auto ProjIt = m_Projects.find(std::string{ProjectId}); ProjIt != m_Projects.end())
+ {
+ return ProjIt->second;
+ }
+
+ std::filesystem::path BasePath = BasePathForProject(ProjectId);
+
+ if (Project::Exists(BasePath))
+ {
+ try
+ {
+ ZEN_INFO("project '{}': opening project at '{}'", ProjectId, BasePath);
+
+ Ref<Project>& Prj =
+ m_Projects
+ .try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath)))
+ .first->second;
+ Prj->Identifier = ProjectId;
+ Prj->Read();
+ return Prj;
+ }
+ catch (const std::exception& e)
+ {
+ ZEN_WARN("project '{}': failed to open at {} ({})", ProjectId, BasePath, e.what());
+ m_Projects.erase(std::string{ProjectId});
+ }
+ }
+
+ return {};
+}
+
+Ref<ProjectStore::Project>
+ProjectStore::NewProject(const std::filesystem::path& BasePath,
+ std::string_view ProjectId,
+ const std::filesystem::path& RootDir,
+ const std::filesystem::path& EngineRootDir,
+ const std::filesystem::path& ProjectRootDir,
+ const std::filesystem::path& ProjectFilePath)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::NewProject");
+
+ RwLock::ExclusiveLockScope _(m_ProjectsLock);
+
+ ZEN_INFO("project '{}': creating project at '{}'", ProjectId, BasePath);
+
+ Ref<Project>& Prj =
+ m_Projects.try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath)))
+ .first->second;
+ Prj->Identifier = ProjectId;
+ Prj->RootDir = RootDir;
+ Prj->EngineRootDir = EngineRootDir;
+ Prj->ProjectRootDir = ProjectRootDir;
+ Prj->ProjectFilePath = ProjectFilePath;
+ Prj->Write();
+
+ if (m_CapturedProjects)
+ {
+ m_CapturedProjects->push_back(std::string(ProjectId));
+ }
+
+ return Prj;
+}
+
+bool
+ProjectStore::UpdateProject(std::string_view ProjectId,
+ const std::filesystem::path& RootDir,
+ const std::filesystem::path& EngineRootDir,
+ const std::filesystem::path& ProjectRootDir,
+ const std::filesystem::path& ProjectFilePath)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::UpdateProject");
+
+ RwLock::ExclusiveLockScope ProjectsLock(m_ProjectsLock);
+
+ auto ProjIt = m_Projects.find(std::string{ProjectId});
+
+ if (ProjIt == m_Projects.end())
+ {
+ return false;
+ }
+ Ref<ProjectStore::Project> Prj = ProjIt->second;
+
+ Prj->RootDir = RootDir;
+ Prj->EngineRootDir = EngineRootDir;
+ Prj->ProjectRootDir = ProjectRootDir;
+ Prj->ProjectFilePath = ProjectFilePath;
+ Prj->Write();
+
+ ZEN_INFO("project '{}': updated", ProjectId);
+
+ return true;
+}
+
+bool
+ProjectStore::RemoveProject(std::string_view ProjectId, std::filesystem::path& OutDeletePath)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ RwLock::ExclusiveLockScope ProjectsLock(m_ProjectsLock);
+
+ auto ProjIt = m_Projects.find(std::string{ProjectId});
+
+ if (ProjIt == m_Projects.end())
+ {
+ return true;
+ }
+
+ bool Success = ProjIt->second->PrepareForDelete(OutDeletePath);
+
+ if (!Success)
+ {
+ return false;
+ }
+ m_Projects.erase(ProjIt);
+ return true;
+}
+
+bool
+ProjectStore::DeleteProject(std::string_view ProjectId)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::DeleteProject");
+
+ ZEN_INFO("project '{}': deleting", ProjectId);
+
+ std::filesystem::path DeletePath;
+ if (!RemoveProject(ProjectId, DeletePath))
+ {
+ return false;
+ }
+
+ if (!DeletePath.empty())
+ {
+ if (!DeleteDirectories(DeletePath))
+ {
+ ZEN_WARN("project '{}': failed to remove old project path '{}'", ProjectId, DeletePath);
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool
+ProjectStore::Exists(std::string_view ProjectId)
+{
+ return Project::Exists(BasePathForProject(ProjectId));
+}
+
+CbArray
+ProjectStore::GetProjectsList()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::GetProjectsList");
+
+ using namespace std::literals;
+
+ DiscoverProjects();
+
+ CbWriter Response;
+ Response.BeginArray();
+
+ IterateProjects([&Response](Project& Prj) {
+ Response.BeginObject();
+ Response << "Id"sv << Prj.Identifier;
+ Response << "RootDir"sv << Prj.RootDir.string();
+ Response << "ProjectRootDir"sv << PathToUtf8(Prj.ProjectRootDir);
+ Response << "EngineRootDir"sv << PathToUtf8(Prj.EngineRootDir);
+ Response << "ProjectFilePath"sv << PathToUtf8(Prj.ProjectFilePath);
+ Response.EndObject();
+ });
+ Response.EndArray();
+ return Response.Save().AsArray();
+}
+
+CbObject
+ProjectStore::GetProjectFiles(LoggerRef InLog, Project& Project, Oplog& Oplog, const std::unordered_set<std::string>& WantedFieldNames)
+{
+ auto Log = [&InLog]() { return InLog; };
+
+ using namespace std::literals;
+
+ const bool WantsAllFields = WantedFieldNames.empty();
+
+ const bool WantsIdField = WantsAllFields || WantedFieldNames.contains("id");
+ const bool WantsClientPathField = WantsAllFields || WantedFieldNames.contains("clientpath");
+ const bool WantsServerPathField = WantsAllFields || WantedFieldNames.contains("serverpath");
+ const bool WantsRawSizeField = WantsAllFields || WantedFieldNames.contains("rawsize");
+ const bool WantsSizeField = WantsAllFields || WantedFieldNames.contains("size");
+
+ std::vector<Oid> Ids;
+ std::vector<std::string> ServerPaths;
+ std::vector<std::string> ClientPaths;
+ std::vector<uint64_t> Sizes;
+ std::vector<uint64_t> RawSizes;
+
+ size_t Count = 0;
+ Oplog.IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) {
+ if (WantsIdField || WantsRawSizeField || WantsSizeField)
+ {
+ Ids.push_back(Id);
+ }
+ if (WantsServerPathField)
+ {
+ ServerPaths.push_back(std::string(ServerPath));
+ }
+ if (WantsClientPathField)
+ {
+ ClientPaths.push_back(std::string(ClientPath));
+ }
+ Count++;
+ });
+ if (WantsRawSizeField || WantsSizeField)
+ {
+ if (WantsSizeField)
+ {
+ Sizes.resize(Ids.size(), (uint64_t)-1);
+ }
+ if (WantsRawSizeField)
+ {
+ RawSizes.resize(Ids.size(), (uint64_t)-1);
+ }
+
+ Oplog.IterateChunks(
+ Project.RootDir,
+ Ids,
+ false,
+ [&](size_t Index, const IoBuffer& Payload, uint64_t /*ModTag*/) {
+ try
+ {
+ if (Payload)
+ {
+ if (Payload.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ if (WantsRawSizeField)
+ {
+ IoHash _;
+ if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index]))
+ {
+ if (WantsSizeField)
+ {
+ Sizes[Index] = Payload.GetSize();
+ }
+ }
+ else
+ {
+ ZEN_WARN("oplog '{}/{}': payload for project file info for id {} is not a valid compressed binary.",
+ Project.Identifier,
+ Oplog.OplogId(),
+ Ids[Index]);
+ }
+ }
+ else if (WantsSizeField)
+ {
+ Sizes[Index] = Payload.GetSize();
+ }
+ }
+ else
+ {
+ if (WantsSizeField)
+ {
+ Sizes[Index] = Payload.GetSize();
+ }
+ if (WantsRawSizeField)
+ {
+ RawSizes[Index] = Payload.GetSize();
+ }
+ }
+ }
+ else
+ {
+ ZEN_WARN("oplog '{}/{}': failed getting payload for project file info for id {}.",
+ Project.Identifier,
+ Oplog.OplogId(),
+ Ids[Index]);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("oplog '{}/{}': failed getting project file info for id {}. Reason: '{}'",
+ Project.Identifier,
+ Oplog.OplogId(),
+ Ids[Index],
+ Ex.what());
+ }
+ return true;
+ },
+ &GetSmallWorkerPool(EWorkloadType::Burst),
+ 256u * 1024u);
+ }
+
+ CbObjectWriter Response(64u + Count * ((WantsIdField ? (5 + sizeof(Oid::OidBits)) : 0) + (WantsServerPathField ? 64 : 0) +
+ (WantsClientPathField ? 64 : 0) + (WantsSizeField ? 16 : 0) + (WantsRawSizeField ? 16 : 0)));
+ Response.BeginArray("files"sv);
+ for (size_t Index = 0; Index < Count; Index++)
+ {
+ Response.BeginObject();
+ if (WantsIdField)
+ {
+ Response << "id"sv << Ids[Index];
+ }
+ if (WantsServerPathField)
+ {
+ Response << "serverpath"sv << ServerPaths[Index];
+ }
+ if (WantsClientPathField)
+ {
+ Response << "clientpath"sv << ClientPaths[Index];
+ }
+ if (WantsSizeField && Sizes[Index] != (uint64_t)-1)
+ {
+ Response << "size"sv << Sizes[Index];
+ }
+ if (WantsRawSizeField && RawSizes[Index] != (uint64_t)-1)
+ {
+ Response << "rawsize"sv << RawSizes[Index];
+ }
+ Response.EndObject();
+ }
+ Response.EndArray();
+
+ return Response.Save();
+}
+
+CbObject
+ProjectStore::GetProjectChunkInfos(LoggerRef InLog, Project& Project, Oplog& Oplog, const std::unordered_set<std::string>& WantedFieldNames)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("ProjectStore::GetProjectChunkInfos");
+
+ auto Log = [&InLog]() { return InLog; };
+
+ using namespace std::literals;
+
+ const bool WantsAllFields = WantedFieldNames.empty();
+
+ const bool WantsIdField = WantsAllFields || WantedFieldNames.contains("id");
+ const bool WantsRawHashField = WantsAllFields || WantedFieldNames.contains("rawhash");
+ const bool WantsRawSizeField = WantsAllFields || WantedFieldNames.contains("rawsize");
+ const bool WantsSizeField = WantsAllFields || WantedFieldNames.contains("size");
+
+ std::vector<Oid> Ids;
+ std::vector<IoHash> Hashes;
+ std::vector<uint64_t> RawSizes;
+ std::vector<uint64_t> Sizes;
+
+ size_t Count = 0;
+ size_t EstimatedCount = Oplog.OplogCount();
+ if (WantsIdField)
+ {
+ Ids.reserve(EstimatedCount);
+ }
+ if (WantsRawHashField || WantsRawSizeField || WantsSizeField)
+ {
+ Hashes.reserve(EstimatedCount);
+ }
+ Oplog.IterateChunkMap([&](const Oid& Id, const IoHash& Hash) {
+ if (WantsIdField)
+ {
+ Ids.push_back(Id);
+ }
+ if (WantsRawHashField || WantsRawSizeField || WantsSizeField)
+ {
+ Hashes.push_back(Hash);
+ }
+ Count++;
+ });
+
+ if (WantsRawSizeField || WantsSizeField)
+ {
+ if (WantsRawSizeField)
+ {
+ RawSizes.resize(Hashes.size(), (uint64_t)-1);
+ }
+ if (WantsSizeField)
+ {
+ Sizes.resize(Hashes.size(), (uint64_t)-1);
+ }
+
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();
+ (void)Oplog.IterateChunks(
+ Hashes,
+ false,
+ [&](size_t Index, const IoBuffer& Payload, uint64_t /*ModTag*/) -> bool {
+ try
+ {
+ if (Payload)
+ {
+ if (Payload.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ if (WantsRawSizeField)
+ {
+ ZEN_ASSERT_SLOW(RawSizes[Index] == (uint64_t)-1);
+ IoHash _;
+ if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index]))
+ {
+ if (WantsSizeField)
+ {
+ ZEN_ASSERT_SLOW(Sizes[Index] == (uint64_t)-1);
+ Sizes[Index] = Payload.GetSize();
+ }
+ }
+ else
+ {
+ ZEN_WARN("oplog '{}/{}': payload for project chunk for id {} is not a valid compressed binary.",
+ Project.Identifier,
+ Oplog.OplogId(),
+ Ids[Index]);
+ }
+ }
+ else if (WantsSizeField)
+ {
+ ZEN_ASSERT_SLOW(Sizes[Index] == (uint64_t)-1);
+ Sizes[Index] = Payload.GetSize();
+ }
+ }
+ else
+ {
+ if (WantsSizeField)
+ {
+ ZEN_ASSERT_SLOW(Sizes[Index] == (uint64_t)-1);
+ Sizes[Index] = Payload.GetSize();
+ }
+ if (WantsRawSizeField)
+ {
+ ZEN_ASSERT_SLOW(Sizes[Index] == (uint64_t)-1);
+ RawSizes[Index] = Payload.GetSize();
+ }
+ }
+ }
+ else
+ {
+ ZEN_WARN("oplog '{}/{}': failed getting payload for chunk for id {}",
+ Project.Identifier,
+ Oplog.OplogId(),
+ Ids[Index]);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("oplog '{}/{}': failed getting project chunk info for id {}. Reason: '{}'",
+ Project.Identifier,
+ Oplog.OplogId(),
+ Ids[Index],
+ Ex.what());
+ }
+ return true;
+ },
+ &WorkerPool,
+ 256u * 1024u);
+ }
+
+ CbObjectWriter Response(64u + Count * ((WantsIdField ? (5 + sizeof(Oid::OidBits)) : 0) +
+ (WantsRawHashField ? (10 + sizeof(IoHash::Hash)) : 0) + (WantsSizeField ? 16 : 0) +
+ (WantsRawSizeField ? 16 : 0)));
+ Response.BeginArray("chunkinfos"sv);
+
+ for (size_t Index = 0; Index < Count; Index++)
+ {
+ Response.BeginObject();
+ if (WantsIdField)
+ {
+ Response << "id"sv << Ids[Index];
+ }
+ if (WantsRawHashField)
+ {
+ Response << "rawhash"sv << Hashes[Index];
+ }
+ if (WantsSizeField && Sizes[Index] != (uint64_t)-1)
+ {
+ Response << "size"sv << Sizes[Index];
+ }
+ if (WantsRawSizeField && RawSizes[Index] != (uint64_t)-1)
+ {
+ Response << "rawsize"sv << RawSizes[Index];
+ }
+ Response.EndObject();
+ }
+ Response.EndArray();
+
+ return Response.Save();
+}
+
+CbObject
+ProjectStore::GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, const Oid& ChunkId)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("ProjectStore::GetChunkInfo");
+
+ using namespace std::literals;
+
+ auto Log = [&InLog]() { return InLog; };
+
+ IoBuffer Chunk = Oplog.FindChunk(Project.RootDir, ChunkId, nullptr);
+ if (!Chunk)
+ {
+ return {};
+ }
+
+ uint64_t ChunkSize = Chunk.GetSize();
+ if (Chunk.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize);
+ if (!IsCompressed)
+ {
+ throw std::runtime_error(
+ fmt::format("Chunk info request for malformed chunk id '{}/{}'/'{}'", Project.Identifier, Oplog.OplogId(), ChunkId));
+ }
+ ChunkSize = RawSize;
+ }
+
+ CbObjectWriter Response;
+ Response << "size"sv << ChunkSize;
+ return Response.Save();
+}
+
+static ProjectStore::GetChunkRangeResult
+ExtractRange(IoBuffer&& Chunk, uint64_t Offset, uint64_t Size, ZenContentType AcceptType)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ ProjectStore::GetChunkRangeResult Result;
+
+ Result.ContentType = Chunk.GetContentType();
+
+ if (Result.ContentType == ZenContentType::kCompressedBinary)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize);
+ if (!Compressed)
+ {
+ Result.Error = ProjectStore::GetChunkRangeResult::EError::MalformedContent;
+ Result.ErrorDescription = fmt::format("Malformed payload, not a compressed buffer");
+ return Result;
+ }
+
+ const bool IsFullRange = (Offset == 0) && ((Size == ~(0ull)) || (Size == RawSize));
+
+ if (IsFullRange)
+ {
+ if (AcceptType == ZenContentType::kBinary)
+ {
+ Result.Chunk = Compressed.DecompressToComposite();
+ Result.ContentType = ZenContentType::kBinary;
+ }
+ else
+ {
+ Result.Chunk = Compressed.GetCompressed();
+ }
+ Result.RawSize = 0;
+ }
+ else
+ {
+ if (Size == ~(0ull) || (Offset + Size) > RawSize)
+ {
+ if (Offset < RawSize)
+ {
+ Size = RawSize - Offset;
+ }
+ else
+ {
+ Size = 0;
+ }
+ }
+
+ if (Size == 0)
+ {
+ Result.Error = ProjectStore::GetChunkRangeResult::EError::OutOfRange;
+ Result.ErrorDescription =
+ fmt::format("Chunk request for range outside of compressed chunk. Offset: {}, Size: {}, ChunkSize: {}",
+ Offset,
+ Size,
+ RawSize);
+ return Result;
+ }
+
+ if (AcceptType == ZenContentType::kBinary)
+ {
+ Result.Chunk = CompositeBuffer(Compressed.Decompress(Offset, Size));
+ Result.ContentType = ZenContentType::kBinary;
+ }
+ else
+ {
+ // Value will be a range of compressed blocks that covers the requested range
+ // The client will have to compensate for any offsets that do not land on an even block size multiple
+ Result.Chunk = Compressed.GetRange(Offset, Size).GetCompressed();
+ }
+ Result.RawSize = RawSize;
+ }
+ Result.RawHash = RawHash;
+ }
+ else
+ {
+ const uint64_t ChunkSize = Chunk.GetSize();
+
+ const bool IsFullRange = (Offset == 0) && ((Size == ~(0ull)) || (Size == ChunkSize));
+ if (IsFullRange)
+ {
+ Result.Chunk = CompositeBuffer(SharedBuffer(std::move(Chunk)));
+ Result.RawSize = 0;
+ }
+ else
+ {
+ if (Size == ~(0ull) || (Offset + Size) > ChunkSize)
+ {
+ if (Offset < ChunkSize)
+ {
+ Size = ChunkSize - Offset;
+ }
+ else
+ {
+ Size = 0;
+ }
+ }
+
+ if (Size == 0)
+ {
+ Result.Error = ProjectStore::GetChunkRangeResult::EError::OutOfRange;
+ Result.ErrorDescription =
+ fmt::format("Chunk request for range outside of compressed chunk. Offset: {}, Size: {}, ChunkSize: {}",
+ Offset,
+ Size,
+ ChunkSize);
+ return Result;
+ }
+
+ Result.Chunk = CompositeBuffer(SharedBuffer(IoBuffer(std::move(Chunk), Offset, Size)));
+ Result.RawSize = ChunkSize;
+ }
+ }
+ Result.Error = ProjectStore::GetChunkRangeResult::EError::Ok;
+ return Result;
+}
+
+ProjectStore::GetChunkRangeResult
+ProjectStore::GetChunkRange(LoggerRef InLog,
+ Project& Project,
+ Oplog& Oplog,
+ const Oid& ChunkId,
+ uint64_t Offset,
+ uint64_t Size,
+ ZenContentType AcceptType,
+ uint64_t* OptionalInOutModificationTag)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ ZEN_TRACE_CPU("ProjectStore::GetChunkRange");
+
+ auto Log = [&InLog]() { return InLog; };
+
+ uint64_t OldTag = OptionalInOutModificationTag == nullptr ? 0 : *OptionalInOutModificationTag;
+ IoBuffer Chunk = Oplog.FindChunk(Project.RootDir, ChunkId, OptionalInOutModificationTag);
+ if (!Chunk)
+ {
+ return GetChunkRangeResult{.Error = GetChunkRangeResult::EError::NotFound,
+ .ErrorDescription = fmt::format("Chunk range request for chunk {}/{}/{} failed, payload not found",
+ Project.Identifier,
+ Oplog.OplogId(),
+ ChunkId)};
+ }
+ if (OptionalInOutModificationTag != nullptr && OldTag == *OptionalInOutModificationTag)
+ {
+ return {.Error = GetChunkRangeResult::EError::NotModified};
+ }
+
+ return ExtractRange(std::move(Chunk), Offset, Size, AcceptType);
+}
+
+IoBuffer
+ProjectStore::GetChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("ProjectStore::GetChunk");
+ ZEN_UNUSED(Project, Oplog);
+
+ IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash);
+
+ if (!Chunk)
+ {
+ return {};
+ }
+
+ Chunk.SetContentType(ZenContentType::kCompressedBinary);
+ return Chunk;
+}
+
+IoBuffer
+ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const Oid& ChunkId)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("ProjectStore::GetChunk");
+
+ Ref<Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {};
+ }
+ Ref<Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact */ false, /*VerifyPathOnDisk*/ false);
+ if (!Oplog)
+ {
+ return {};
+ }
+ return Oplog->FindChunk(Project->RootDir, ChunkId, /*OptOutModificationTag*/ nullptr);
+}
+
+IoBuffer
+ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const IoHash& Cid)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("ProjectStore::GetChunk");
+
+ Ref<Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {};
+ }
+ Ref<Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact */ false, /*VerifyPathOnDisk*/ false);
+ if (!Oplog)
+ {
+ return {};
+ }
+ return m_CidStore.FindChunkByCid(Cid);
+}
+
+bool
+ProjectStore::PutChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash, IoBuffer&& Chunk)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("ProjectStore::PutChunk");
+
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize);
+ if (RawHash != ChunkHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Chunk request for invalid payload format for chunk {}/{}/{}", Project.Identifier, Oplog.OplogId(), ChunkHash));
+ }
+
+ Oplog.CaptureAddedAttachments(std::vector<IoHash>{ChunkHash});
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk, ChunkHash);
+ return Result.New;
+}
+
+std::vector<ProjectStore::ChunkResult>
+ProjectStore::GetChunks(Project& Project, Oplog& Oplog, std::span<const ChunkRequest> Requests)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("ProjectStore::GetChunks");
+
+ ZEN_ASSERT(!Requests.empty());
+
+ std::vector<ProjectStore::ChunkResult> Results;
+ size_t RequestCount = Requests.size();
+
+ Results.resize(RequestCount);
+
+ if (RequestCount > 1)
+ {
+ std::vector<IoHash> ChunkRawHashes;
+ std::vector<size_t> ChunkRawHashesRequestIndex;
+ std::vector<Oid> ChunkIds;
+ std::vector<size_t> ChunkIdsRequestIndex;
+
+ ChunkRawHashes.reserve(RequestCount);
+ ChunkRawHashesRequestIndex.reserve(RequestCount);
+ ChunkIds.reserve(RequestCount);
+ ChunkIdsRequestIndex.reserve(RequestCount);
+
+ for (size_t RequestIndex = 0; RequestIndex < Requests.size(); RequestIndex++)
+ {
+ const ChunkRequest& Request = Requests[RequestIndex];
+ if (Request.Id.index() == 0)
+ {
+ ChunkRawHashes.push_back(std::get<IoHash>(Request.Id));
+ ChunkRawHashesRequestIndex.push_back(RequestIndex);
+ }
+ else
+ {
+ ChunkIds.push_back(std::get<Oid>(Request.Id));
+ ChunkIdsRequestIndex.push_back(RequestIndex);
+ }
+ }
+
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();
+ if (!ChunkRawHashes.empty())
+ {
+ Oplog.IterateChunks(
+ ChunkRawHashes,
+ true,
+ [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool {
+ if (Payload)
+ {
+ size_t RequestIndex = ChunkRawHashesRequestIndex[Index];
+ const ChunkRequest& Request = Requests[RequestIndex];
+ ChunkResult& Result = Results[RequestIndex];
+ Result.Exists = true;
+ if (!Request.SkipData)
+ {
+ Result.ChunkBuffer = std::move(Payload);
+ Result.ChunkBuffer.MakeOwned();
+ }
+ Result.ModTag = ModTag;
+ }
+ return true;
+ },
+ &WorkerPool,
+ 8u * 1024);
+ }
+ if (!ChunkIdsRequestIndex.empty())
+ {
+ Oplog.IterateChunks(
+ Project.RootDir,
+ ChunkIds,
+ true,
+ [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool {
+ if (Payload)
+ {
+ size_t RequestIndex = ChunkIdsRequestIndex[Index];
+ const ChunkRequest& Request = Requests[RequestIndex];
+ ChunkResult& Result = Results[RequestIndex];
+ Result.Exists = true;
+ if (!Request.SkipData)
+ {
+ Result.ChunkBuffer = std::move(Payload);
+ Result.ChunkBuffer.MakeOwned();
+ }
+ Result.ModTag = ModTag;
+ }
+ return true;
+ },
+ &WorkerPool,
+ 8u * 1024);
+ }
+ }
+ else
+ {
+ const ChunkRequest& Request = Requests.front();
+ ChunkResult& Result = Results.front();
+
+ if (Request.Id.index() == 0)
+ {
+ const IoHash& ChunkHash = std::get<IoHash>(Request.Id);
+ IoBuffer Payload = m_CidStore.FindChunkByCid(ChunkHash);
+ if (Payload)
+ {
+ Result.Exists = true;
+ Result.ModTag = GetModificationTagFromRawHash(ChunkHash);
+ if (!Request.SkipData)
+ {
+ Result.ChunkBuffer = std::move(Payload);
+ Result.ChunkBuffer.MakeOwned();
+ }
+ }
+ }
+ else
+ {
+ const Oid& ChunkId = std::get<Oid>(Request.Id);
+ uint64_t ModTag = 0;
+ IoBuffer Payload = Oplog.FindChunk(Project.RootDir, ChunkId, &ModTag);
+ if (Payload)
+ {
+ Result.Exists = true;
+ Result.ModTag = ModTag;
+ if (!Request.SkipData)
+ {
+ Result.ChunkBuffer = std::move(Payload);
+ Result.ChunkBuffer.MakeOwned();
+ }
+ }
+ }
+ }
+ return Results;
+}
+
+std::vector<ProjectStore::ChunkRequest>
+ProjectStore::ParseChunksRequests(Project& Project, Oplog& Oplog, const CbObject& Cb)
+{
+ ZEN_TRACE_CPU("Store::Rpc::getchunks");
+
+ using namespace std::literals;
+
+ std::vector<ChunkRequest> Requests;
+
+ if (auto RequestFieldView = Cb["Request"sv]; RequestFieldView.IsObject())
+ {
+ CbObjectView RequestView = RequestFieldView.AsObjectView();
+ bool SkipData = RequestView["SkipData"].AsBool(false);
+ CbArrayView ChunksArray = RequestView["Chunks"sv].AsArrayView();
+
+ size_t RequestCount = ChunksArray.Num();
+ if (RequestCount > 0)
+ {
+ Requests.reserve(RequestCount);
+ for (CbFieldView FieldView : ChunksArray)
+ {
+ CbObjectView ChunkObject = FieldView.AsObjectView();
+ ChunkRequest ChunkRequest = {.Offset = ChunkObject["Offset"sv].AsUInt64(0),
+ .Size = ChunkObject["Size"sv].AsUInt64((uint64_t)-1),
+ .SkipData = SkipData};
+ if (CbFieldView InputModificationTagView = ChunkObject.FindView("ModTag"); InputModificationTagView.IsInteger())
+ {
+ ChunkRequest.ModTag = InputModificationTagView.AsUInt64();
+ }
+ if (CbFieldView RawHashView = ChunkObject.FindView("RawHash"sv); RawHashView.IsHash())
+ {
+ const IoHash ChunkHash = RawHashView.AsHash();
+ ChunkRequest.Id = ChunkHash;
+ }
+ else if (CbFieldView IdView = ChunkObject.FindView("Oid"sv); IdView.IsObjectId())
+ {
+ const Oid ChunkId = IdView.AsObjectId();
+ ChunkRequest.Id = ChunkId;
+ }
+ else
+ {
+ throw std::runtime_error(
+ fmt::format("oplog '{}/{}': malformed getchunks rpc request object, chunk request has no identifier",
+ Project.Identifier,
+ Oplog.OplogId()));
+ }
+ Requests.emplace_back(std::move(ChunkRequest));
+ }
+ }
+ }
+ else if (CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); ChunksArray)
+ {
+ // Legacy full chunks only by rawhash
+
+ size_t RequestCount = ChunksArray.Num();
+
+ Requests.reserve(RequestCount);
+
+ std::vector<IoHash> Cids;
+ Cids.reserve(ChunksArray.Num());
+ for (CbFieldView FieldView : ChunksArray)
+ {
+ Requests.push_back(ProjectStore::ChunkRequest{.Id = FieldView.AsHash()});
+ }
+ }
+ else
+ {
+ throw std::runtime_error(fmt::format("oplog '{}/{}': malformed getchunks rpc request object", Project.Identifier, Oplog.OplogId()));
+ }
+ return Requests;
+}
+
+CbPackage
+ProjectStore::WriteChunksRequestResponse(Project& Project,
+ Oplog& Oplog,
+ std::vector<ChunkRequest>&& Requests,
+ std::vector<ChunkResult>&& Results)
+{
+ using namespace std::literals;
+
+ CbPackage ResponsePackage;
+
+ CbObjectWriter ResponseWriter(32 + Requests.size() * 64u);
+ ResponseWriter.BeginArray("Chunks"sv);
+ {
+ for (size_t Index = 0; Index < Requests.size(); Index++)
+ {
+ const ChunkRequest& Request = Requests[Index];
+ ChunkResult& Result = Results[Index];
+ if (Result.Exists)
+ {
+ ResponseWriter.BeginObject();
+ {
+ if (Request.Id.index() == 0)
+ {
+ const IoHash& RawHash = std::get<IoHash>(Request.Id);
+ ResponseWriter.AddHash("Id", RawHash);
+ }
+ else
+ {
+ const Oid& Id = std::get<Oid>(Request.Id);
+ ResponseWriter.AddObjectId("Id", Id);
+ }
+ if (!Request.ModTag.has_value() || Request.ModTag.value() != Result.ModTag)
+ {
+ ResponseWriter.AddInteger("ModTag", Result.ModTag);
+ if (!Request.SkipData)
+ {
+ auto ExtractRangeResult = ExtractRange(std::move(Result.ChunkBuffer),
+ Request.Offset,
+ Request.Size,
+ ZenContentType::kCompressedBinary);
+ if (ExtractRangeResult.Error == GetChunkRangeResult::EError::Ok)
+ {
+ if (ExtractRangeResult.ContentType == ZenContentType::kCompressedBinary)
+ {
+ ZEN_ASSERT(ExtractRangeResult.RawHash != IoHash::Zero);
+ CompressedBuffer CompressedValue =
+ CompressedBuffer::FromCompressedNoValidate(std::move(ExtractRangeResult.Chunk));
+ ZEN_ASSERT(CompressedValue);
+
+ if (ExtractRangeResult.RawSize != 0)
+ {
+ // This really could use some thought so we don't send the same data if we get a request for
+ // multiple ranges from the same chunk block
+
+ uint64_t FragmentRawOffset = 0;
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize = 0;
+ if (CompressedValue.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ {
+ if (BlockSize > 0)
+ {
+ FragmentRawOffset = (Request.Offset / BlockSize) * BlockSize;
+ }
+ else
+ {
+ FragmentRawOffset = Request.Offset;
+ }
+ uint64_t FragmentRawLength = CompressedValue.DecodeRawSize();
+
+ IoHashStream FragmentHashStream;
+ FragmentHashStream.Append(ExtractRangeResult.RawHash.Hash,
+ sizeof(ExtractRangeResult.RawHash.Hash));
+ FragmentHashStream.Append(&FragmentRawOffset, sizeof(FragmentRawOffset));
+ FragmentHashStream.Append(&FragmentRawLength, sizeof(FragmentRawLength));
+ IoHash FragmentHash = FragmentHashStream.GetHash();
+
+ ResponseWriter.AddHash("FragmentHash", FragmentHash);
+ ResponseWriter.AddInteger("FragmentOffset", FragmentRawOffset);
+ ResponseWriter.AddInteger("RawSize", ExtractRangeResult.RawSize);
+ ResponsePackage.AddAttachment(CbAttachment(CompressedValue, FragmentHash));
+ }
+ else
+ {
+ std::string ErrorString = "Failed to get compression parameters from partial compressed buffer";
+ ResponseWriter.AddString("Error", ErrorString);
+ ZEN_WARN("oplog '{}/{}': {}", Project.Identifier, Oplog.OplogId(), ErrorString);
+ }
+ }
+ else
+ {
+ ResponseWriter.AddHash("RawHash"sv, ExtractRangeResult.RawHash);
+ ResponsePackage.AddAttachment(CbAttachment(std::move(CompressedValue), ExtractRangeResult.RawHash));
+ }
+ }
+ else
+ {
+ IoHashStream HashStream;
+ ZEN_ASSERT(Request.Id.index() == 1);
+ const Oid& Id = std::get<Oid>(Request.Id);
+ HashStream.Append(Id.OidBits, sizeof(Id.OidBits));
+ HashStream.Append(&Request.Offset, sizeof(Request.Offset));
+ HashStream.Append(&Request.Size, sizeof(Request.Size));
+ IoHash Hash = HashStream.GetHash();
+
+ ResponseWriter.AddHash("Hash"sv, Hash);
+ if (ExtractRangeResult.RawSize != 0)
+ {
+ ResponseWriter.AddInteger("Size", ExtractRangeResult.RawSize);
+ }
+ ResponsePackage.AddAttachment(CbAttachment(std::move(ExtractRangeResult.Chunk), Hash));
+ }
+ }
+ else
+ {
+ std::string ErrorString =
+ fmt::format("Failed fetching chunk range ({})", ExtractRangeResult.ErrorDescription);
+ ResponseWriter.AddString("Error", ErrorString);
+ ZEN_WARN("oplog '{}/{}': {}", Project.Identifier, Oplog.OplogId(), ErrorString);
+ }
+ }
+ }
+ }
+ ResponseWriter.EndObject();
+ }
+ }
+ }
+ ResponseWriter.EndArray();
+ ResponsePackage.SetObject(ResponseWriter.Save());
+
+ return ResponsePackage;
+}
+
+bool
+ProjectStore::AreDiskWritesAllowed() const
+{
+ return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
+}
+
+void
+ProjectStore::EnableUpdateCapture()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ m_ProjectsLock.WithExclusiveLock([&]() {
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ ZEN_ASSERT(!m_CapturedProjects);
+ m_CapturedProjects = std::make_unique<std::vector<std::string>>();
+ }
+ else
+ {
+ ZEN_ASSERT(m_CapturedProjects);
+ }
+ m_UpdateCaptureRefCounter++;
+ });
+}
+
+void
+ProjectStore::DisableUpdateCapture()
+{
+ m_ProjectsLock.WithExclusiveLock([&]() {
+ ZEN_ASSERT(m_CapturedProjects);
+ ZEN_ASSERT(m_UpdateCaptureRefCounter > 0);
+ m_UpdateCaptureRefCounter--;
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ m_CapturedProjects.reset();
+ }
+ });
+}
+
+std::vector<std::string>
+ProjectStore::GetCapturedProjectsLocked()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ if (m_CapturedProjects)
+ {
+ return *m_CapturedProjects;
+ }
+ return {};
+}
+
+std::string
+ProjectStore::GetGcName(GcCtx&)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ return fmt::format("projectstore: '{}'", m_ProjectBasePath.string());
+}
+
+class ProjectStoreGcStoreCompactor : public GcStoreCompactor
+{
+public:
+ ProjectStoreGcStoreCompactor(ProjectStore& ProjectStore,
+ const std::filesystem::path& BasePath,
+ std::vector<std::filesystem::path>&& OplogPathsToRemove,
+ std::vector<std::filesystem::path>&& ProjectPathsToRemove)
+ : m_ProjectStore(ProjectStore)
+ , m_BasePath(BasePath)
+ , m_OplogPathsToRemove(std::move(OplogPathsToRemove))
+ , m_ProjectPathsToRemove(std::move(ProjectPathsToRemove))
+ {
+ }
+
+ virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>&) override
+ {
+ ZEN_TRACE_CPU("Store::CompactStore");
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: projectstore [COMPACT] '{}': RemovedDisk: {} in {}",
+ m_BasePath,
+ NiceBytes(Stats.RemovedDisk),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ size_t CompactOplogCount = 0;
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ for (const std::filesystem::path& OplogPath : m_OplogPathsToRemove)
+ {
+ uint64_t OplogSize = ProjectStore::Oplog::TotalSize(OplogPath);
+ if (DeleteDirectories(OplogPath))
+ {
+ ZEN_DEBUG("GCV2: projectstore [COMPACT] '{}': removed oplog folder '{}', removed {}",
+ m_BasePath,
+ OplogPath,
+ NiceBytes(OplogSize));
+ Stats.RemovedDisk += OplogSize;
+ }
+ else
+ {
+ ZEN_WARN("GCV2: projectstore [COMPACT] '{}': Failed to remove oplog folder '{}'", m_BasePath, OplogPath);
+ }
+ }
+
+ for (const std::filesystem::path& ProjectPath : m_ProjectPathsToRemove)
+ {
+ uint64_t ProjectSize = ProjectStore::Project::TotalSize(ProjectPath);
+ if (DeleteDirectories(ProjectPath))
+ {
+ ZEN_DEBUG("GCV2: projectstore [COMPACT] '{}': removed project folder '{}', removed {}",
+ m_BasePath,
+ ProjectPath,
+ NiceBytes(ProjectSize));
+ Stats.RemovedDisk += ProjectSize;
+ }
+ else
+ {
+ ZEN_WARN("GCV2: projectstore [COMPACT] '{}': Failed to remove project folder '{}'", m_BasePath, ProjectPath);
+ }
+ }
+ }
+
+ for (auto ProjectIt : m_ProjectStore.m_Projects)
+ {
+ Ref<ProjectStore::Project> Project = ProjectIt.second;
+ std::vector<std::string> OplogsToCompact = Project->GetOplogsToCompact();
+ CompactOplogCount += OplogsToCompact.size();
+ for (const std::string& OplogId : OplogsToCompact)
+ {
+ Ref<ProjectStore::Oplog> OpLog;
+ {
+ RwLock::SharedLockScope __(Project->m_ProjectLock);
+ if (auto OpIt = Project->m_Oplogs.find(OplogId); OpIt != Project->m_Oplogs.end())
+ {
+ OpLog = OpIt->second;
+ }
+ else
+ {
+ Stopwatch OplogTimer;
+ std::filesystem::path OplogBasePath = Project->BasePathForOplog(OplogId);
+ OpLog = new ProjectStore::Oplog(
+ Project->Log(),
+ Project->Identifier,
+ OplogId,
+ Project->m_CidStore,
+ OplogBasePath,
+ std::filesystem::path{},
+ ProjectStore::Oplog::EMode::kFull); // We need it to be a full read so we can write a new index snapshot
+ OpLog->Read();
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: projectstore [COMPACT] '{}': read oplog '{}/{}' at '{}' in {}",
+ m_BasePath,
+ Project->Identifier,
+ OplogId,
+ OplogBasePath,
+ NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs()));
+ }
+ }
+
+ if (OpLog)
+ {
+ const uint64_t PreSize = OpLog->TotalSize();
+
+ OpLog->Compact(!Ctx.Settings.IsDeleteMode,
+ /*RetainLSNs*/ true,
+ fmt::format("GCV2: projectstore [COMPACT] '{}': ", m_BasePath));
+
+ const uint64_t PostSize = OpLog->TotalSize();
+
+ const uint64_t FreedSize = (PreSize > PostSize) ? (PreSize - PostSize) : 0;
+
+ Stats.RemovedDisk += FreedSize;
+ }
+ }
+ }
+ }
+
+ if (!Ctx.Settings.IsDeleteMode)
+ {
+ ZEN_DEBUG("GCV2: projectstore [COMPACT] '{}': Skipped deleting of {} oplogs and {} projects, skipped compacting {} oplogs",
+ m_BasePath,
+ m_OplogPathsToRemove.size(),
+ m_ProjectPathsToRemove.size(),
+ CompactOplogCount);
+ }
+
+ m_ProjectPathsToRemove.clear();
+ m_OplogPathsToRemove.clear();
+ }
+
+ virtual std::string GetGcName(GcCtx&) override { return fmt::format("projectstore: '{}'", m_BasePath.string()); }
+
+private:
+ ProjectStore& m_ProjectStore;
+ std::filesystem::path m_BasePath;
+ std::vector<std::filesystem::path> m_OplogPathsToRemove;
+ std::vector<std::filesystem::path> m_ProjectPathsToRemove;
+};
+
+GcStoreCompactor*
+ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
+{
+ ZEN_TRACE_CPU("Store::RemoveExpiredData");
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: projectstore [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {} in {}",
+ m_ProjectBasePath,
+ Stats.CheckedCount,
+ Stats.FoundCount,
+ Stats.DeletedCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ std::vector<std::filesystem::path> OplogPathsToRemove;
+ std::vector<std::filesystem::path> ProjectPathsToRemove;
+
+ std::vector<Ref<Project>> ExpiredProjects;
+ std::vector<Ref<Project>> Projects;
+
+ DiscoverProjects();
+
+ {
+ RwLock::SharedLockScope Lock(m_ProjectsLock);
+ for (auto& Kv : m_Projects)
+ {
+ Stats.CheckedCount++;
+ if (Kv.second->IsExpired(Ctx.Settings.ProjectStoreExpireTime))
+ {
+ ExpiredProjects.push_back(Kv.second);
+ continue;
+ }
+ Projects.push_back(Kv.second);
+ }
+ }
+
+ size_t ExpiredOplogCount = 0;
+ for (const Ref<Project>& Project : Projects)
+ {
+ if (Ctx.IsCancelledFlag)
+ {
+ break;
+ }
+
+ std::vector<std::string> ExpiredOplogs;
+
+ std::vector<std::string> OpLogs = Project->ScanForOplogs();
+ for (const std::string& OplogId : OpLogs)
+ {
+ Stats.CheckedCount++;
+ if (Project->IsExpired(Ctx.Settings.ProjectStoreExpireTime, OplogId))
+ {
+ ExpiredOplogs.push_back(OplogId);
+ }
+ else if (!Project->IsOplogTouchedSince(GcClock::Now() - std::chrono::minutes(15), OplogId))
+ {
+ if (Project->TryUnloadOplog(OplogId))
+ {
+ ZEN_INFO("GCV2: projectstore [REMOVE EXPIRED] '{}': Unloaded oplog {}/{} due to inactivity",
+ m_ProjectBasePath,
+ Project->Identifier,
+ OplogId);
+ }
+ }
+ }
+
+ std::filesystem::path ProjectPath = BasePathForProject(Project->Identifier);
+ ExpiredOplogCount += ExpiredOplogs.size();
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ for (const std::string& OplogId : ExpiredOplogs)
+ {
+ std::filesystem::path RemovePath;
+ if (Project->RemoveOplog(OplogId, RemovePath))
+ {
+ if (!RemovePath.empty())
+ {
+ OplogPathsToRemove.push_back(RemovePath);
+ }
+ Stats.DeletedCount++;
+ }
+ }
+ Project->Flush();
+ }
+ }
+
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ for (const Ref<Project>& Project : ExpiredProjects)
+ {
+ std::string ProjectId = Project->Identifier;
+ {
+ {
+ if (!Project->IsExpired(Ctx.Settings.ProjectStoreExpireTime))
+ {
+ ZEN_DEBUG(
+ "GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project no longer "
+ "expired.",
+ m_ProjectBasePath,
+ ProjectId);
+ continue;
+ }
+ }
+ std::filesystem::path RemovePath;
+ bool Success = RemoveProject(ProjectId, RemovePath);
+ if (!Success)
+ {
+ ZEN_DEBUG(
+ "GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project folder is locked.",
+ m_ProjectBasePath,
+ ProjectId);
+ continue;
+ }
+ if (!RemovePath.empty())
+ {
+ ProjectPathsToRemove.push_back(RemovePath);
+ }
+ }
+ }
+ Stats.DeletedCount += ExpiredProjects.size();
+ }
+
+ size_t ExpiredProjectCount = ExpiredProjects.size();
+ Stats.FoundCount += ExpiredOplogCount + ExpiredProjectCount;
+ return new ProjectStoreGcStoreCompactor(*this, m_ProjectBasePath, std::move(OplogPathsToRemove), std::move(ProjectPathsToRemove));
+}
+
+class ProjectStoreReferenceChecker : public GcReferenceChecker
+{
+public:
+ ProjectStoreReferenceChecker(ProjectStore& InProjectStore) : m_ProjectStore(InProjectStore) { m_ProjectStore.EnableUpdateCapture(); }
+
+ virtual ~ProjectStoreReferenceChecker()
+ {
+ try
+ {
+ m_ProjectStore.DisableUpdateCapture();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~ProjectStoreReferenceChecker threw exception: '{}'", Ex.what());
+ }
+ }
+
+ virtual std::string GetGcName(GcCtx&) override { return "projectstore"; }
+ virtual void PreCache(GcCtx&) override {}
+
+ virtual void UpdateLockedState(GcCtx& Ctx) override
+ {
+ ZEN_TRACE_CPU("Store::UpdateLockedState");
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ Stopwatch Timer;
+
+ std::vector<Ref<ProjectStore::Oplog>> AddedOplogs;
+
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} in {} new oplogs",
+ "projectstore",
+ m_References.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ AddedOplogs.size());
+ });
+
+ std::vector<std::string> AddedProjects = m_ProjectStore.GetCapturedProjectsLocked();
+ for (const std::string& AddedProject : AddedProjects)
+ {
+ if (auto It = m_ProjectStore.m_Projects.find(AddedProject); It != m_ProjectStore.m_Projects.end())
+ {
+ ProjectStore::Project& Project = *It->second;
+ for (auto& OplogPair : Project.m_Oplogs)
+ {
+ Ref<ProjectStore::Oplog> Oplog = OplogPair.second;
+ AddedOplogs.push_back(Oplog);
+ }
+ }
+ }
+ for (auto& ProjectPair : m_ProjectStore.m_Projects)
+ {
+ ProjectStore::Project& Project = *ProjectPair.second;
+
+ std::vector<std::string> AddedOplogNames(Project.GetCapturedOplogsLocked());
+ for (const std::string& OplogName : AddedOplogNames)
+ {
+ if (auto It = Project.m_Oplogs.find(OplogName); It != Project.m_Oplogs.end())
+ {
+ Ref<ProjectStore::Oplog> Oplog = It->second;
+ AddedOplogs.push_back(Oplog);
+ }
+ }
+ }
+
+ for (const Ref<ProjectStore::Oplog>& Oplog : AddedOplogs)
+ {
+ size_t BaseReferenceCount = m_References.size();
+
+ Stopwatch InnerTimer;
+ const auto __ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} from {}",
+ Oplog->m_BasePath,
+ m_References.size() - BaseReferenceCount,
+ NiceTimeSpanMs(InnerTimer.GetElapsedTimeMs()),
+ Oplog->OplogId());
+ });
+
+ Oplog->GetAttachmentsLocked(m_References, Ctx.Settings.StoreProjectAttachmentMetaData);
+ if (std::vector<IoHash> PendingChunkReferences = Oplog->GetPendingChunkReferencesLocked(); !PendingChunkReferences.empty())
+ {
+ m_References.insert(m_References.end(), PendingChunkReferences.begin(), PendingChunkReferences.end());
+ }
+ }
+
+ FilterReferences(Ctx, fmt::format("projectstore [LOCKSTATE] '{}'", "projectstore"), m_References);
+ }
+
+ virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override
+ {
+ ZEN_TRACE_CPU("Store::GetUnusedReferences");
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ size_t InitialCount = IoCids.size();
+ size_t UsedCount = InitialCount;
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}",
+ "projectstore",
+ UsedCount,
+ InitialCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_References, IoCids);
+ UsedCount = IoCids.size() - UnusedReferences.size();
+ return UnusedReferences;
+ }
+
+private:
+ ProjectStore& m_ProjectStore;
+ std::vector<IoHash> m_References;
+};
+
+class ProjectStoreOplogReferenceChecker : public GcReferenceChecker
+{
+public:
+ ProjectStoreOplogReferenceChecker(ProjectStore& InProjectStore, Ref<ProjectStore::Project> InProject, std::string_view InOplog)
+ : m_ProjectStore(InProjectStore)
+ , m_Project(InProject)
+ , m_OplogId(InOplog)
+ {
+ m_Project->EnableUpdateCapture();
+ }
+
+ virtual ~ProjectStoreOplogReferenceChecker()
+ {
+ try
+ {
+ m_Project->DisableUpdateCapture();
+
+ if (m_OplogHasUpdateCapture)
+ {
+ RwLock::SharedLockScope _(m_Project->m_ProjectLock);
+ if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end())
+ {
+ Ref<ProjectStore::Oplog> Oplog = It->second;
+ Oplog->DisableUpdateCapture();
+ m_OplogHasUpdateCapture = false;
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~ProjectStoreOplogReferenceChecker threw exception: '{}'", Ex.what());
+ }
+ }
+
+ virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}/{}'", m_Project->Identifier, m_OplogId); }
+
+ virtual void PreCache(GcCtx& Ctx) override
+ {
+ ZEN_TRACE_CPU("Store::Oplog::PreCache");
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': precached {} references in {} from {}/{}",
+ m_OplogBasePath,
+ m_References.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ m_Project->Identifier,
+ m_OplogId);
+ });
+
+ m_OplogBasePath = m_Project->BasePathForOplog(m_OplogId);
+ {
+ Ref<ProjectStore::Oplog> Oplog;
+
+ RwLock::SharedLockScope __(m_Project->m_ProjectLock);
+ if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end())
+ {
+ Oplog = It->second;
+ Oplog->EnableUpdateCapture();
+ m_OplogHasUpdateCapture = true;
+ }
+ else if (ProjectStore::Oplog::ExistsAt(m_OplogBasePath))
+ {
+ Stopwatch OplogTimer;
+ Oplog = new ProjectStore::Oplog(m_Project->Log(),
+ m_Project->Identifier,
+ m_OplogId,
+ m_Project->m_CidStore,
+ m_OplogBasePath,
+ std::filesystem::path{},
+ ProjectStore::Oplog::EMode::kBasicReadOnly);
+ Oplog->Read();
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': read oplog '{}/{}' in {}",
+ m_OplogBasePath,
+ m_Project->Identifier,
+ m_OplogId,
+ NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs()));
+ }
+ }
+ else
+ {
+ return;
+ }
+
+ RwLock::SharedLockScope ___(Oplog->m_OplogLock);
+ if (Ctx.IsCancelledFlag)
+ {
+ return;
+ }
+
+ GcClock::TimePoint CompactExpireTime = GcClock::Now() - std::chrono::minutes(30);
+ if (!m_Project->IsOplogTouchedSince(CompactExpireTime, m_OplogId))
+ {
+ const uint32_t CompactUnusedThreshold = 25;
+ if (Oplog->GetUnusedSpacePercent() >= CompactUnusedThreshold)
+ {
+ m_Project->AddOplogToCompact(m_OplogId);
+ }
+ }
+
+ Oplog->GetAttachmentsLocked(m_References, Ctx.Settings.StoreProjectAttachmentMetaData);
+ m_OplogAccessTime = m_Project->LastOplogAccessTime(m_OplogId);
+ }
+ FilterReferences(Ctx, fmt::format("projectstore [PRECACHE] '{}'", m_OplogBasePath), m_References);
+ }
+
+ virtual void UpdateLockedState(GcCtx& Ctx) override
+ {
+ ZEN_TRACE_CPU("Store::Oplog::UpdateLockedState");
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} from {}/{}",
+ m_OplogBasePath,
+ m_AddedReferences.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ m_Project->Identifier,
+ m_OplogId);
+ });
+
+ if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end())
+ {
+ Ref<ProjectStore::Oplog> Oplog = It->second;
+ Oplog->IterateCapturedOpsLocked([&](const Oid& Key, ProjectStore::LogSequenceNumber LSN, const CbObjectView& UpdateOp) -> bool {
+ ZEN_UNUSED(Key, LSN);
+ UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_AddedReferences.emplace_back(Visitor.AsAttachment()); });
+ return true;
+ });
+ std::vector<IoHash> AddedAttachments = Oplog->GetCapturedAttachmentsLocked();
+ m_AddedReferences.insert(m_AddedReferences.end(), AddedAttachments.begin(), AddedAttachments.end());
+ if (std::vector<IoHash> PendingChunkReferences = Oplog->GetPendingChunkReferencesLocked(); !PendingChunkReferences.empty())
+ {
+ m_AddedReferences.insert(m_AddedReferences.end(), PendingChunkReferences.begin(), PendingChunkReferences.end());
+ }
+ }
+ else if (m_Project->LastOplogAccessTime(m_OplogId) > m_OplogAccessTime && ProjectStore::Oplog::ExistsAt(m_OplogBasePath))
+ {
+ Stopwatch OplogTimer;
+ {
+ Ref<ProjectStore::Oplog> Oplog(new ProjectStore::Oplog(m_Project->Log(),
+ m_Project->Identifier,
+ m_OplogId,
+ m_Project->m_CidStore,
+ m_OplogBasePath,
+ std::filesystem::path{},
+ ProjectStore::Oplog::EMode::kBasicReadOnly));
+ Oplog->Read();
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': read oplog '{}/{}' in {}",
+ m_OplogBasePath,
+ m_Project->Identifier,
+ m_OplogId,
+ NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs()));
+ }
+
+ OplogTimer.Reset();
+
+ Oplog->GetAttachmentsLocked(m_AddedReferences, Ctx.Settings.StoreProjectAttachmentMetaData);
+ }
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': read referenced attachments from oplog '{}/{}' in {}",
+ m_OplogBasePath,
+ m_Project->Identifier,
+ m_OplogId,
+ NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs()));
+ }
+ }
+ FilterReferences(Ctx, fmt::format("projectstore [LOCKSTATE] '{}'", m_OplogBasePath), m_AddedReferences);
+ }
+
+ virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override
+ {
+ ZEN_TRACE_CPU("Store::Oplog::GetUnusedReferences");
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ const size_t InitialCount = IoCids.size();
+ size_t UsedCount = InitialCount;
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {} from {}/{}",
+ m_OplogBasePath,
+ UsedCount,
+ InitialCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ m_Project->Identifier,
+ m_OplogId);
+ });
+
+ std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_References, IoCids);
+ UnusedReferences = KeepUnusedReferences(m_AddedReferences, UnusedReferences);
+ UsedCount = IoCids.size() - UnusedReferences.size();
+ return UnusedReferences;
+ }
+
+ ProjectStore& m_ProjectStore;
+ Ref<ProjectStore::Project> m_Project;
+ std::string m_OplogId;
+ std::filesystem::path m_OplogBasePath;
+ bool m_OplogHasUpdateCapture = false;
+ std::vector<IoHash> m_References;
+ std::vector<IoHash> m_AddedReferences;
+ GcClock::TimePoint m_OplogAccessTime;
+};
+
+std::vector<GcReferenceChecker*>
+ProjectStore::CreateReferenceCheckers(GcCtx& Ctx)
+{
+ ZEN_TRACE_CPU("Store::CreateReferenceCheckers");
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ size_t ProjectCount = 0;
+ size_t OplogCount = 0;
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: projectstore [CREATE CHECKERS] '{}': opened {} projects and {} oplogs in {}",
+ m_ProjectBasePath,
+ ProjectCount,
+ OplogCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ DiscoverProjects();
+
+ std::vector<Ref<ProjectStore::Project>> Projects;
+ std::vector<GcReferenceChecker*> Checkers;
+ Checkers.emplace_back(new ProjectStoreReferenceChecker(*this));
+ {
+ RwLock::SharedLockScope Lock(m_ProjectsLock);
+ Projects.reserve(m_Projects.size());
+
+ for (auto& Kv : m_Projects)
+ {
+ Projects.push_back(Kv.second);
+ }
+ }
+ ProjectCount += Projects.size();
+ try
+ {
+ for (const Ref<ProjectStore::Project>& Project : Projects)
+ {
+ std::vector<std::string> OpLogs = Project->ScanForOplogs();
+ Checkers.reserve(Checkers.size() + OpLogs.size());
+ for (const std::string& OpLogId : OpLogs)
+ {
+ Checkers.emplace_back(new ProjectStoreOplogReferenceChecker(*this, Project, OpLogId));
+ OplogCount++;
+ }
+ }
+ }
+ catch (const std::exception&)
+ {
+ while (!Checkers.empty())
+ {
+ delete Checkers.back();
+ Checkers.pop_back();
+ }
+ throw;
+ }
+
+ return Checkers;
+}
+
+std::vector<RwLock::SharedLockScope>
+ProjectStore::LockState(GcCtx& Ctx)
+{
+ ZEN_TRACE_CPU("Store::LockState");
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ std::vector<RwLock::SharedLockScope> Locks;
+ Locks.emplace_back(RwLock::SharedLockScope(m_ProjectsLock));
+ for (auto& ProjectIt : m_Projects)
+ {
+ std::vector<RwLock::SharedLockScope> ProjectLocks = ProjectIt.second->GetGcReferencerLocks();
+ for (auto It = std::make_move_iterator(ProjectLocks.begin()); It != std::make_move_iterator(ProjectLocks.end()); It++)
+ {
+ Locks.emplace_back(std::move(*It));
+ }
+ }
+ return Locks;
+}
+
+class ProjectStoreOplogReferenceValidator : public GcReferenceValidator
+{
+public:
+ ProjectStoreOplogReferenceValidator(ProjectStore& InProjectStore, std::string_view InProject, std::string_view InOplog)
+ : m_ProjectStore(InProjectStore)
+ , m_ProjectId(InProject)
+ , m_OplogId(InOplog)
+ {
+ }
+
+ virtual ~ProjectStoreOplogReferenceValidator() {}
+
+ virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}/{}'", m_ProjectId, m_OplogId); }
+
+ virtual void Validate(GcCtx& Ctx, GcReferenceValidatorStats& Stats) override
+ {
+ ZEN_TRACE_CPU("Store::Validate");
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ ProjectStore::Oplog::ValidationResult Result;
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ std::string Status = Result.IsEmpty() ? "OK" : "Missing data";
+ ZEN_INFO("GCV2: projectstore [VALIDATE] '{}/{}': Validated in {}. OpCount: {}, MinLSN: {}, MaxLSN: {}, Status: {}",
+ m_ProjectId,
+ m_OplogId,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ Result.OpCount,
+ Result.LSNLow.Number,
+ Result.LSNHigh.Number,
+ Status);
+ });
+ Ref<ProjectStore::Oplog> Oplog;
+ Ref<ProjectStore::Project> Project = m_ProjectStore.OpenProject(m_ProjectId);
+ if (Project)
+ {
+ {
+ RwLock::SharedLockScope __(Project->m_ProjectLock);
+ if (auto It = Project->m_Oplogs.find(m_OplogId); It != Project->m_Oplogs.end())
+ {
+ Oplog = It->second;
+ }
+ else
+ {
+ Stopwatch OplogTimer;
+
+ std::filesystem::path OplogBasePath = Project->BasePathForOplog(m_OplogId);
+ Oplog = Ref<ProjectStore::Oplog>(new ProjectStore::Oplog(Project->Log(),
+ Project->Identifier,
+ m_OplogId,
+ Project->m_CidStore,
+ OplogBasePath,
+ std::filesystem::path{},
+ ProjectStore::Oplog::EMode::kBasicReadOnly));
+ Oplog->Read();
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: projectstore [VALIDATE] '{}': read oplog '{}/{}' in {}",
+ OplogBasePath,
+ Project->Identifier,
+ m_OplogId,
+ NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs()));
+ }
+
+ if (Ctx.IsCancelledFlag)
+ {
+ return;
+ }
+ }
+ }
+
+ if (Oplog)
+ {
+ Result = Oplog->Validate(Project->RootDir, Ctx.IsCancelledFlag, nullptr);
+ if (Ctx.IsCancelledFlag)
+ {
+ return;
+ }
+ Stats.CheckedCount = Result.OpCount;
+ Stats.MissingChunks = Result.MissingChunks.size();
+ Stats.MissingFiles = Result.MissingFiles.size();
+ Stats.MissingMetas = Result.MissingMetas.size();
+ Stats.MissingAttachments = Result.MissingAttachments.size();
+ }
+
+ if (!Result.IsEmpty())
+ {
+ ZEN_WARN("GCV2: projectstore [VALIDATE] '{}/{}': Missing data: Files: {}, Chunks: {}, Metas: {}, Attachments: {}",
+ m_ProjectId,
+ m_OplogId,
+ Result.MissingFiles.size(),
+ Result.MissingChunks.size(),
+ Result.MissingMetas.size(),
+ Result.MissingAttachments.size());
+ }
+ }
+ }
+
+ ProjectStore& m_ProjectStore;
+ std::string m_ProjectId;
+ std::string m_OplogId;
+};
+
+std::vector<GcReferenceValidator*>
+ProjectStore::CreateReferenceValidators(GcCtx& Ctx)
+{
+ if (Ctx.Settings.SkipCidDelete)
+ {
+ return {};
+ }
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ DiscoverProjects();
+
+ std::vector<std::pair<std::string, std::string>> Oplogs;
+ {
+ RwLock::SharedLockScope _(m_ProjectsLock);
+ for (auto& ProjectPair : m_Projects)
+ {
+ ProjectStore::Project& Project = *ProjectPair.second;
+ std::vector<std::string> OpLogs = Project.ScanForOplogs();
+ for (const std::string& OplogName : OpLogs)
+ {
+ Oplogs.push_back({Project.Identifier, OplogName});
+ }
+ }
+ }
+ std::vector<GcReferenceValidator*> Validators;
+ Validators.reserve(Oplogs.size());
+ for (const std::pair<std::string, std::string>& Oplog : Oplogs)
+ {
+ Validators.push_back(new ProjectStoreOplogReferenceValidator(*this, Oplog.first, Oplog.second));
+ }
+
+ return Validators;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+Oid
+OpKeyStringAsOid(std::string_view OpKey)
+{
+ eastl::fixed_vector<uint8_t, 512> Buffer;
+ return OpKeyStringAsOid(OpKey, Buffer);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+namespace testutils {
+ using namespace std::literals;
+
+ static std::string OidAsString(const Oid& Id)
+ {
+ StringBuilder<25> OidStringBuilder;
+ Id.ToString(OidStringBuilder);
+ return OidStringBuilder.ToString();
+ }
+
+ static CbPackage CreateBulkDataOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
+ {
+ CbPackage Package;
+ CbObjectWriter Object;
+ Object << "key"sv << OidAsString(Id);
+ if (!Attachments.empty())
+ {
+ Object.BeginArray("bulkdata");
+ for (const auto& Attachment : Attachments)
+ {
+ CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash());
+ Object.BeginObject();
+ Object << "id"sv << Attachment.first;
+ Object << "type"sv
+ << "Standard"sv;
+ Object << "data"sv << Attach;
+ Object.EndObject();
+
+ Package.AddAttachment(Attach);
+ }
+ Object.EndArray();
+ }
+ Package.SetObject(Object.Save());
+ return Package;
+ };
+
+ static CbPackage CreateFilesOplogPackage(const Oid& Id,
+ const std::filesystem::path ProjectRootDir,
+ const std::span<const std::pair<Oid, std::filesystem::path>>& Attachments)
+ {
+ CbPackage Package;
+ CbObjectWriter Object;
+ Object << "key"sv << OidAsString(Id);
+ if (!Attachments.empty())
+ {
+ Object.BeginArray("files");
+ for (const auto& Attachment : Attachments)
+ {
+ std::filesystem::path ServerPath = std::filesystem::relative(Attachment.second, ProjectRootDir);
+ std::filesystem::path ClientPath = ServerPath; // dummy
+ Object.BeginObject();
+ Object << "id"sv << Attachment.first;
+ Object << "serverpath"sv << ServerPath.string();
+ Object << "clientpath"sv << ClientPath.string();
+ Object.EndObject();
+ }
+ Object.EndArray();
+ }
+ Package.SetObject(Object.Save());
+ return Package;
+ };
+
+ static std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(
+ const std::span<const size_t>& Sizes,
+ OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast,
+ uint64_t BlockSize = 0)
+ {
+ std::vector<std::pair<Oid, CompressedBuffer>> Result;
+ Result.reserve(Sizes.size());
+ for (size_t Size : Sizes)
+ {
+ CompressedBuffer Compressed =
+ CompressedBuffer::Compress(SharedBuffer(CreateSemiRandomBlob(Size)), OodleCompressor::Mermaid, CompressionLevel, BlockSize);
+ Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
+ }
+ return Result;
+ }
+
+ static uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64_t RawOffset)
+ {
+ if (RawOffset > 0)
+ {
+ uint64_t BlockSize = 0;
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ {
+ return 0;
+ }
+ return BlockSize > 0 ? RawOffset % BlockSize : 0;
+ }
+ return 0;
+ }
+
+ template<typename ChunkType>
+ CbObject BuildChunksRequest(bool SkipData,
+ std::string_view IdName,
+ const std::vector<ChunkType>& Chunks,
+ const std::vector<std::pair<size_t, size_t>>& Ranges,
+ const std::vector<uint64_t>& ModTags)
+ {
+ CbObjectWriter Request;
+ Request.BeginObject("Request"sv);
+ {
+ if (SkipData)
+ {
+ Request.AddBool("SkipData"sv, true);
+ }
+ if (!Chunks.empty())
+ {
+ Request.BeginArray("Chunks");
+ for (size_t Index = 0; Index < Chunks.size(); Index++)
+ {
+ Request.BeginObject();
+ {
+ Request << IdName << Chunks[Index];
+ if (!ModTags.empty())
+ {
+ Request << "ModTag" << ModTags[Index];
+ }
+ if (!Ranges.empty())
+ {
+ Request << "Offset" << Ranges[Index].first;
+ Request << "Size" << Ranges[Index].second;
+ }
+ }
+ Request.EndObject();
+ }
+ Request.EndArray();
+ }
+ }
+ Request.EndObject();
+ return Request.Save();
+ };
+
+ CbObject BuildChunksRequest(bool SkipData,
+ const std::vector<Oid>& Chunks,
+ const std::vector<std::pair<size_t, size_t>>& Ranges,
+ const std::vector<uint64_t>& ModTags)
+ {
+ return BuildChunksRequest<Oid>(SkipData, "Oid", Chunks, Ranges, ModTags);
+ }
+
+ CbObject BuildChunksRequest(bool SkipData,
+ const std::vector<IoHash>& Chunks,
+ const std::vector<std::pair<size_t, size_t>>& Ranges,
+ const std::vector<uint64_t>& ModTags)
+ {
+ return BuildChunksRequest<IoHash>(SkipData, "RawHash", Chunks, Ranges, ModTags);
+ }
+
+} // namespace testutils
+
+TEST_CASE("project.opkeys")
+{
+ using namespace std::literals;
+
+ const std::string_view LongKey =
+ "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+ "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+ "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"sv;
+
+ // Not a test per se, this code just exercises the key computation logic to ensure all
+ // edge cases are handled by the bug workaround logic
+
+ for (int i = 1; i < 300; ++i)
+ {
+ CbObjectWriter Cbo;
+ Cbo << "key"sv << LongKey.substr(0, i);
+
+ const Oid KeyId = ComputeOpKey(Cbo.Save());
+ }
+
+ {
+ CbObjectWriter Cbo;
+ Cbo << "key"sv
+ << "abcdef";
+
+ const Oid KeyId = ComputeOpKey(Cbo.Save());
+ const Oid CorrectId = Oid::FromHexString(
+ "7a03540e"
+ "ecb0daa9"
+ "00f2949e");
+
+ CHECK(KeyId == CorrectId);
+ }
+
+ {
+ CbObjectWriter Cbo;
+ Cbo << "key"sv
+ << "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+ "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+ "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890";
+
+ const Oid KeyId = ComputeOpKey(Cbo.Save());
+ const Oid CorrectId = Oid::FromHexString(
+ "c5e88c79"
+ "06b7fa38"
+ "7b0d2efd");
+
+ CHECK(KeyId == CorrectId);
+ }
+}
+
+TEST_CASE("project.store.create")
+{
+ using namespace std::literals;
+
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::string_view ProjectName("proj1"sv);
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore";
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
+ std::filesystem::path RootDir = TempDir.Path() / "root";
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
+ std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
+ std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject";
+
+ Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / ProjectName,
+ ProjectName,
+ RootDir.string(),
+ EngineRootDir.string(),
+ ProjectRootDir.string(),
+ ProjectFilePath.string()));
+ CHECK(ProjectStore.DeleteProject(ProjectName));
+ CHECK(!Project->Exists(BasePath));
+}
+
+TEST_CASE("project.store.lifetimes")
+{
+ using namespace std::literals;
+
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore";
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
+ std::filesystem::path RootDir = TempDir.Path() / "root";
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
+ std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
+ std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject";
+
+ Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ ProjectRootDir.string(),
+ ProjectFilePath.string()));
+ Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog1", {});
+ CHECK(Oplog);
+
+ std::filesystem::path DeletePath;
+ CHECK(Project->PrepareForDelete(DeletePath));
+ CHECK(!DeletePath.empty());
+ CHECK(!Project->OpenOplog("oplog1", /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true));
+ // Oplog is now invalid, but pointer can still be accessed since we store old oplog pointers
+ CHECK(Oplog->OplogCount() == 0);
+ // Project is still valid since we have a Ref to it
+ CHECK(Project->Identifier == "proj1"sv);
+}
+
+TEST_CASE("project.store.gc")
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore";
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
+ std::filesystem::path RootDir = TempDir.Path() / "root";
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
+
+ std::filesystem::path Project1RootDir = TempDir.Path() / "game1";
+ std::filesystem::path Project1FilePath = TempDir.Path() / "game1" / "game.uproject";
+ {
+ CreateDirectories(Project1FilePath.parent_path());
+ BasicFile ProjectFile;
+ ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate);
+ }
+ std::filesystem::path Project1OplogPath = TempDir.Path() / "game1" / "saves" / "cooked" / ".projectstore";
+ {
+ CreateDirectories(Project1OplogPath.parent_path());
+ BasicFile OplogFile;
+ OplogFile.Open(Project1OplogPath, BasicFile::Mode::kTruncate);
+ }
+
+ std::filesystem::path Project2RootDir = TempDir.Path() / "game2";
+ std::filesystem::path Project2FilePath = TempDir.Path() / "game2" / "game.uproject";
+ {
+ CreateDirectories(Project2FilePath.parent_path());
+ BasicFile ProjectFile;
+ ProjectFile.Open(Project2FilePath, BasicFile::Mode::kTruncate);
+ }
+ std::filesystem::path Project2Oplog1Path = TempDir.Path() / "game1" / "saves" / "cooked" / ".projectstore";
+ {
+ CreateDirectories(Project2Oplog1Path.parent_path());
+ BasicFile OplogFile;
+ OplogFile.Open(Project2Oplog1Path, BasicFile::Mode::kTruncate);
+ }
+ std::filesystem::path Project2Oplog2Path = TempDir.Path() / "game2" / "saves" / "cooked" / ".projectstore";
+ {
+ CreateDirectories(Project2Oplog2Path.parent_path());
+ BasicFile OplogFile;
+ OplogFile.Open(Project2Oplog2Path, BasicFile::Mode::kTruncate);
+ }
+
+ {
+ Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ Project1RootDir.string(),
+ Project1FilePath.string()));
+ Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1", Project1OplogPath);
+ CHECK(Oplog);
+
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {}));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77})));
+ Oplog->AppendNewOplogEntry(
+ CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99})));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122})));
+ }
+
+ {
+ Ref<ProjectStore::Project> Project2(ProjectStore.NewProject(BasePath / "proj2"sv,
+ "proj2"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ Project2RootDir.string(),
+ Project2FilePath.string()));
+ {
+ Ref<ProjectStore::Oplog> Oplog = Project2->NewOplog("oplog2", Project2Oplog1Path);
+ CHECK(Oplog);
+
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {}));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{177})));
+ Oplog->AppendNewOplogEntry(
+ CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9123, 383, 590, 96})));
+ Oplog->AppendNewOplogEntry(
+ CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{535, 221})));
+ }
+ {
+ Ref<ProjectStore::Oplog> Oplog = Project2->NewOplog("oplog3", Project2Oplog2Path);
+ CHECK(Oplog);
+
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {}));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{137})));
+ Oplog->AppendNewOplogEntry(
+ CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9723, 683, 594, 98})));
+ Oplog->AppendNewOplogEntry(
+ CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{531, 271})));
+ }
+ }
+
+ {
+ GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24),
+ .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24),
+ .IsDeleteMode = true};
+ GcResult Result = Gc.CollectGarbage(Settings);
+ CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(21u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK(ProjectStore.OpenProject("proj1"sv));
+ CHECK(ProjectStore.OpenProject("proj2"sv));
+ }
+
+ {
+ GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24),
+ .IsDeleteMode = true};
+ GcResult Result = Gc.CollectGarbage(Settings);
+ CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(21u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK(ProjectStore.OpenProject("proj1"sv));
+ CHECK(ProjectStore.OpenProject("proj2"sv));
+ }
+
+ RemoveFile(Project1FilePath);
+
+ {
+ GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24),
+ .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24),
+ .IsDeleteMode = true};
+ GcResult Result = Gc.CollectGarbage(Settings);
+ CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(21u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK(ProjectStore.OpenProject("proj1"sv));
+ CHECK(ProjectStore.OpenProject("proj2"sv));
+ }
+
+ {
+ GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true};
+ GcResult Result = Gc.CollectGarbage(Settings);
+ CHECK_EQ(4u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(21u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(7u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK(!ProjectStore.OpenProject("proj1"sv));
+ CHECK(ProjectStore.OpenProject("proj2"sv));
+ }
+
+ RemoveFile(Project2Oplog1Path);
+ {
+ GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24),
+ .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true};
+ GcResult Result = Gc.CollectGarbage(Settings);
+ CHECK_EQ(3u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(14u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK(!ProjectStore.OpenProject("proj1"sv));
+ CHECK(ProjectStore.OpenProject("proj2"sv));
+ }
+
+ {
+ GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true};
+ GcResult Result = Gc.CollectGarbage(Settings);
+ CHECK_EQ(3u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(14u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK(!ProjectStore.OpenProject("proj1"sv));
+ CHECK(ProjectStore.OpenProject("proj2"sv));
+ }
+
+ RemoveFile(Project2FilePath);
+ {
+ GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24),
+ .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true};
+ GcResult Result = Gc.CollectGarbage(Settings);
+ CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount);
+ CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount);
+ CHECK_EQ(14u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount);
+ CHECK_EQ(14u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount);
+ CHECK(!ProjectStore.OpenProject("proj1"sv));
+ CHECK(!ProjectStore.OpenProject("proj2"sv));
+ }
+}
+
+TEST_CASE("project.store.gc.prep")
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore";
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
+ std::filesystem::path RootDir = TempDir.Path() / "root";
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
+
+ std::filesystem::path Project1RootDir = TempDir.Path() / "game1";
+ std::filesystem::path Project1FilePath = TempDir.Path() / "game1" / "game.uproject";
+ {
+ CreateDirectories(Project1FilePath.parent_path());
+ BasicFile ProjectFile;
+ ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate);
+ }
+ std::filesystem::path Project1OplogPath = TempDir.Path() / "game1" / "saves" / "cooked" / ".projectstore";
+ {
+ CreateDirectories(Project1OplogPath.parent_path());
+ BasicFile OplogFile;
+ OplogFile.Open(Project1OplogPath, BasicFile::Mode::kTruncate);
+ }
+
+ std::vector<std::pair<Oid, CompressedBuffer>> OpAttachments = CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99});
+ std::vector<IoHash> OpChunkHashes;
+ for (const auto& Chunk : OpAttachments)
+ {
+ OpChunkHashes.push_back(Chunk.second.DecodeRawHash());
+ }
+
+ {
+ Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ Project1RootDir.string(),
+ Project1FilePath.string()));
+ Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath);
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments));
+ }
+ {
+ Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
+ Project1->DeleteOplog("oplog1"sv);
+ }
+
+ // Equivalent of a `prep` existance check call
+ for (auto Attachment : OpAttachments)
+ {
+ CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ }
+
+ {
+ GcSettings Settings = {.CacheExpireTime = GcClock::Now(),
+ .ProjectStoreExpireTime = GcClock::Now(),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true};
+ GcResult Result = Gc.CollectGarbage(Settings);
+ }
+
+ // If a gc comes in between our prep and op write the chunks will be removed
+ for (auto Attachment : OpAttachments)
+ {
+ CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ }
+
+ {
+ // Make sure the chunks are stored but not the referencing op
+ Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
+ Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath);
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments));
+ Project1->DeleteOplog("oplog1"sv);
+ }
+ {
+ Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
+ Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath);
+
+ // Equivalent of a `prep` call with tracking of ops
+ CHECK(Oplog->CheckPendingChunkReferences(OpChunkHashes, std::chrono::hours(1)).empty());
+ }
+
+ for (auto Attachment : OpAttachments)
+ {
+ CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ }
+
+ {
+ GcSettings Settings = {.CacheExpireTime = GcClock::Now(),
+ .ProjectStoreExpireTime = GcClock::Now(),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true};
+ GcResult Result = Gc.CollectGarbage(Settings);
+ }
+
+ // Attachments should now be retained
+ for (auto Attachment : OpAttachments)
+ {
+ CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ }
+
+ {
+ GcSettings Settings = {.CacheExpireTime = GcClock::Now(),
+ .ProjectStoreExpireTime = GcClock::Now(),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true};
+ GcResult Result = Gc.CollectGarbage(Settings);
+ }
+
+ // Attachments should now be retained across multiple GCs if retain time is still valud
+ for (auto Attachment : OpAttachments)
+ {
+ CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ }
+
+ {
+ Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
+ Ref<ProjectStore::Oplog> Oplog = Project1->OpenOplog("oplog1"sv, true, true);
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments));
+ Oplog->RemovePendingChunkReferences(OpChunkHashes);
+ CHECK(Oplog->GetPendingChunkReferencesLocked().size() == 0);
+ }
+ for (auto Attachment : OpAttachments)
+ {
+ CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ }
+ {
+ Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
+ Project1->DeleteOplog("oplog1"sv);
+ }
+
+ {
+ GcSettings Settings = {.CacheExpireTime = GcClock::Now(),
+ .ProjectStoreExpireTime = GcClock::Now(),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true};
+ GcResult Result = Gc.CollectGarbage(Settings);
+ }
+
+ for (auto Attachment : OpAttachments)
+ {
+ CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ }
+
+ {
+ Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
+ Project1->DeleteOplog("oplog1"sv);
+ }
+ {
+ // Make sure the chunks are stored but not the referencing op
+ Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
+ Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath);
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments));
+ Project1->DeleteOplog("oplog1"sv);
+ }
+
+ // Caution - putting breakpoints and stepping through this part of the test likely makes it fails due to expiry time of pending
+ // chunks
+ {
+ Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
+ Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath);
+
+ CHECK(Oplog->CheckPendingChunkReferences(OpChunkHashes, std::chrono::milliseconds(100)).empty());
+ }
+
+ // This pass they should be retained and while the ops are picked up in GC we are blocked from adding our op
+ {
+ GcSettings Settings = {.CacheExpireTime = GcClock::Now(),
+ .ProjectStoreExpireTime = GcClock::Now(),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true};
+ GcResult Result = Gc.CollectGarbage(Settings);
+ }
+ for (auto Attachment : OpAttachments)
+ {
+ CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ }
+
+ Sleep(200);
+ // This pass they should also be retained since our age retention has kept them alive and they will now be picked up and the
+ // retention cleared
+ {
+ GcSettings Settings = {.CacheExpireTime = GcClock::Now(),
+ .ProjectStoreExpireTime = GcClock::Now(),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true};
+ GcResult Result = Gc.CollectGarbage(Settings);
+ }
+ for (auto Attachment : OpAttachments)
+ {
+ CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ }
+
+ // This pass the retention time has expired and the last GC pass cleared the entries
+ {
+ GcSettings Settings = {.CacheExpireTime = GcClock::Now(),
+ .ProjectStoreExpireTime = GcClock::Now(),
+ .CollectSmallObjects = true,
+ .IsDeleteMode = true};
+ GcResult Result = Gc.CollectGarbage(Settings);
+ }
+
+ for (auto Attachment : OpAttachments)
+ {
+ CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ }
+}
+
+TEST_CASE("project.store.rpc.getchunks")
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
+ std::filesystem::path RootDir = TempDir.Path() / "root"sv;
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;
+
+ std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv;
+ std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv;
+ {
+ CreateDirectories(Project1FilePath.parent_path());
+ BasicFile ProjectFile;
+ ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate);
+ }
+
+ std::vector<Oid> OpIds;
+ OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()});
+ std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments;
+ Oid FilesOpId = Oid::NewOid();
+ std::vector<std::pair<Oid, std::filesystem::path>> FilesOpIdAttachments;
+ {
+ Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ Project1RootDir.string(),
+ Project1FilePath.string()));
+ Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, {});
+ CHECK(Oplog);
+ Attachments[OpIds[0]] = {};
+ Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77});
+ Attachments[OpIds[2]] =
+ CreateAttachments(std::initializer_list<size_t>{200 * 1024, 314 * 1024, 690, 99}, OodleCompressionLevel::VeryFast, 128 * 1024);
+ Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122});
+ for (auto It : Attachments)
+ {
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(It.first, It.second));
+ }
+
+ std::filesystem::path UncompressedFilePath = RootDir / "content" / "uncompressed_file";
+ CreateDirectories(UncompressedFilePath.parent_path());
+ IoBuffer FileBlob = CreateRandomBlob(81823 * 2);
+ WriteFile(UncompressedFilePath, FileBlob);
+ FilesOpIdAttachments.push_back({Oid::NewOid(), UncompressedFilePath});
+ Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(FilesOpId, RootDir, FilesOpIdAttachments));
+ }
+
+ auto GetChunks = [](zen::ProjectStore& ProjectStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, const CbObject& Cb) {
+ std::vector<ProjectStore::ChunkRequest> Requests = ProjectStore.ParseChunksRequests(Project, Oplog, Cb);
+ std::vector<ProjectStore::ChunkResult> Results =
+ Requests.empty() ? std::vector<ProjectStore::ChunkResult>{} : ProjectStore.GetChunks(Project, Oplog, Requests);
+ return ProjectStore.WriteChunksRequestResponse(Project, Oplog, std::move(Requests), std::move(Results));
+ };
+
+ Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
+ CHECK(Project1);
+ Ref<ProjectStore::Oplog> Oplog1 = Project1->OpenOplog("oplog1"sv, false, true);
+ CHECK(Oplog1);
+ // Invalid request
+ {
+ CbObjectWriter Request;
+ Request.BeginObject("WrongName"sv);
+ Request.EndObject();
+ CbPackage Response;
+ CHECK_THROWS(GetChunks(ProjectStore, *Project1, *Oplog1, Request.Save()));
+ }
+
+ // Empty request
+ {
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, std::vector<IoHash>{}, {}, {}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(0, Chunks.Num());
+ }
+ // Single non-existing chunk by RawHash
+ IoHash NotFoundIoHash = IoHash::Max;
+ {
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundIoHash}, {}, {}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(0, Chunks.Num());
+ }
+ {
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundIoHash}, {}, {}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(0, Chunks.Num());
+ }
+ // Single non-existing chunk by Id
+ Oid NotFoundId = Oid::NewOid();
+ {
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundId}, {}, {}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(0, Chunks.Num());
+ }
+ {
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundId}, {}, {}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(0, Chunks.Num());
+ }
+ // Single existing chunk by RawHash
+ {
+ // Fresh fetch
+ IoHash FirstAttachmentHash = Attachments[OpIds[2]][1].second.DecodeRawHash();
+ uint64_t ResponseModTag = 0;
+ {
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {}));
+
+ CHECK_EQ(1, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ IoHash Id = Chunk["Id"].AsHash();
+ CHECK_EQ(FirstAttachmentHash, Id);
+ ResponseModTag = Chunk["ModTag"].AsUInt64();
+ CHECK_NE(0, ResponseModTag);
+ IoHash AttachmentHash = Chunk["RawHash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompressedBuffer Buffer = Attachment->AsCompressedBinary();
+ CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash());
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Fetch with matching ModTag
+ {
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {ResponseModTag}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ IoHash Id = Chunk["Id"].AsHash();
+ CHECK_EQ(FirstAttachmentHash, Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Fetch with mismatching ModTag
+ {
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)}));
+
+ CHECK_EQ(1, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ IoHash Id = Chunk["Id"].AsHash();
+ CHECK_EQ(FirstAttachmentHash, Id);
+ ResponseModTag = Chunk["ModTag"].AsUInt64();
+ CHECK_NE(0, ResponseModTag);
+ IoHash AttachmentHash = Chunk["RawHash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompressedBuffer Buffer = Attachment->AsCompressedBinary();
+ CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash());
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Fresh modtime query
+ {
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ IoHash Id = Chunk["Id"].AsHash();
+ CHECK_EQ(FirstAttachmentHash, Id);
+ uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64();
+ CHECK_EQ(ResponseModTag, ResponseModTag2);
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Modtime query with matching ModTag
+ {
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {ResponseModTag}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ IoHash Id = Chunk["Id"].AsHash();
+ CHECK_EQ(FirstAttachmentHash, Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Modtime query with mismatching ModTag
+ {
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ IoHash Id = Chunk["Id"].AsHash();
+ CHECK_EQ(FirstAttachmentHash, Id);
+ uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64();
+ CHECK_EQ(ResponseModTag, ResponseModTag2);
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ // Single existing CID chunk by Id
+ {
+ Oid FirstAttachmentId = Attachments[OpIds[2]][1].first;
+ uint64_t ResponseModTag = 0;
+ {
+ // Full chunk request
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {}));
+ CHECK_EQ(1, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ ResponseModTag = Chunk["ModTag"].AsUInt64();
+ CHECK_NE(0, ResponseModTag);
+ IoHash AttachmentHash = Chunk["RawHash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompressedBuffer Buffer = Attachment->AsCompressedBinary();
+ CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash());
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ {
+ // Partial chunk request
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{130 * 1024, 8100}}, {}));
+ CHECK_EQ(1, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ ResponseModTag = Chunk["ModTag"].AsUInt64();
+ CHECK_NE(0, ResponseModTag);
+ IoHash AttachmentHash = Chunk["FragmentHash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ std::uint64_t FragmentStart = Chunk["FragmentOffset"].AsUInt64();
+ CompressedBuffer Buffer = Attachment->AsCompressedBinary();
+ CHECK(FragmentStart <= 130 * 1024);
+ CHECK(FragmentStart + Buffer.DecodeRawSize() >= 130 * 1024 + 8100);
+ auto ResponseDecompressedBuffer = Buffer.Decompress(130 * 1024 - FragmentStart, 8100);
+ auto ExpectedDecompressedBuffer = Attachments[OpIds[2]][1].second.Decompress(130 * 1024, 8100);
+ CHECK(ResponseDecompressedBuffer.AsIoBuffer().GetView().EqualBytes(ExpectedDecompressedBuffer.AsIoBuffer().GetView()));
+ CHECK_EQ(Chunk["RawSize"sv].AsUInt64(), Attachments[OpIds[2]][1].second.DecodeRawSize());
+ CHECK(!Chunk.FindView("Size"));
+ }
+ {
+ // Fetch with matching ModTag
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ {
+ // Fetch with mismatching ModTag
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}));
+ CHECK_EQ(1, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ uint64_t ResponseModTag3 = Chunk["ModTag"].AsUInt64();
+ CHECK_EQ(ResponseModTag, ResponseModTag3);
+ IoHash AttachmentHash = Chunk["RawHash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompressedBuffer Buffer = Attachment->AsCompressedBinary();
+ CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash());
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Fresh modtime query
+ {
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64();
+ CHECK_EQ(ResponseModTag, ResponseModTag2);
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Modtime query with matching ModTag
+ {
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Modtime query with mismatching ModTag
+ {
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64();
+ CHECK_EQ(ResponseModTag, ResponseModTag2);
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+
+ // Single existing file chunk by Id
+ {
+ Oid FirstAttachmentId = FilesOpIdAttachments[0].first;
+ uint64_t ResponseModTag = 0;
+ {
+ // Full chunk request
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {}));
+ CHECK_EQ(1, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ ResponseModTag = Chunk["ModTag"].AsUInt64();
+ CHECK_NE(0, ResponseModTag);
+ IoHash AttachmentHash = Chunk["Hash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompositeBuffer Buffer = Attachment->AsCompositeBinary();
+ CHECK_EQ(IoHash::HashBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten()), IoHash::HashBuffer(Buffer));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ {
+ // Partial chunk request
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{81823, 5434}}, {}));
+ CHECK_EQ(1, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ ResponseModTag = Chunk["ModTag"].AsUInt64();
+ CHECK_NE(0, ResponseModTag);
+
+ IoHash AttachmentHash = Chunk["Hash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompositeBuffer Buffer = Attachment->AsCompositeBinary();
+ CHECK_EQ(IoHash::HashBuffer(IoBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten(), 81823, 5434)),
+ IoHash::HashBuffer(Buffer));
+ CHECK_EQ(Chunk["Size"sv].AsUInt64(), FileSizeFromPath(FilesOpIdAttachments[0].second));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ {
+ // Fetch with matching ModTag
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("Hash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ {
+ // Fetch with mismatching ModTag
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}));
+ CHECK_EQ(1, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ uint64_t ResponseModTag3 = Chunk["ModTag"].AsUInt64();
+ CHECK_EQ(ResponseModTag, ResponseModTag3);
+ IoHash AttachmentHash = Chunk["Hash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompositeBuffer Buffer = Attachment->AsCompositeBinary();
+ CHECK_EQ(IoHash::HashBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten()), IoHash::HashBuffer(Buffer));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Fresh modtime query
+ {
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64();
+ CHECK_EQ(ResponseModTag, ResponseModTag2);
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Modtime query with matching ModTag
+ {
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("Hash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Modtime query with mismatching ModTag
+ {
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}));
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64();
+ CHECK_EQ(ResponseModTag, ResponseModTag2);
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+
+ // Multi RawHash Request
+ {
+ std::vector<CompressedBuffer> AttachmentBuffers{Attachments[OpIds[1]][0].second,
+ Attachments[OpIds[2]][0].second,
+ Attachments[OpIds[2]][1].second};
+ std::vector<IoHash> AttachmentHashes{AttachmentBuffers[0].DecodeRawHash(),
+ AttachmentBuffers[1].DecodeRawHash(),
+ AttachmentBuffers[2].DecodeRawHash()};
+ std::vector<uint64_t> ResponseModTags(3, 0);
+ {
+ // Fresh fetch
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, {}));
+
+ CHECK_EQ(3, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ IoHash Id = Chunk["Id"].AsHash();
+
+ auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id);
+ CHECK(It != AttachmentHashes.end());
+ ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It);
+ CHECK_EQ(AttachmentHashes[Index], Id);
+ ResponseModTags[Index] = Chunk["ModTag"].AsUInt64();
+ CHECK_NE(0, ResponseModTags[Index]);
+ IoHash AttachmentHash = Chunk["RawHash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompressedBuffer Buffer = Attachment->AsCompressedBinary();
+ CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash());
+ CHECK(AttachmentBuffers[Index].GetCompressed().Flatten().GetView().EqualBytes(Buffer.GetCompressed().Flatten().GetView()));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ {
+ // Fetch with matching ModTag
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, ResponseModTags));
+
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ IoHash Id = Chunk["Id"].AsHash();
+
+ auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id);
+ CHECK(It != AttachmentHashes.end());
+ ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It);
+ CHECK_EQ(AttachmentHashes[Index], Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ {
+ // Fresh modtime query
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, {}));
+
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ IoHash Id = Chunk["Id"].AsHash();
+
+ auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id);
+ CHECK(It != AttachmentHashes.end());
+ ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It);
+ CHECK_EQ(AttachmentHashes[Index], Id);
+ CHECK_EQ(ResponseModTags[Index], Chunk["ModTag"].AsUInt64());
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ {
+ // Modtime query with matching ModTags
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, ResponseModTags));
+
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ IoHash Id = Chunk["Id"].AsHash();
+
+ auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id);
+ CHECK(It != AttachmentHashes.end());
+ ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It);
+ CHECK_EQ(AttachmentHashes[Index], Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ {
+ // Modtime query with mismatching ModTags
+ std::vector<uint64_t> MismatchingModTags(ResponseModTags);
+ for (uint64_t& Tag : MismatchingModTags)
+ {
+ Tag++;
+ }
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, MismatchingModTags));
+
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ IoHash Id = Chunk["Id"].AsHash();
+
+ auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id);
+ CHECK(It != AttachmentHashes.end());
+ ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It);
+ CHECK_EQ(AttachmentHashes[Index], Id);
+ CHECK(Chunk["ModTag"].AsUInt64() == ResponseModTags[Index]);
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ }
+ // Multi Id Request
+ {
+ std::vector<CompressedBuffer> AttachmentBuffers{Attachments[OpIds[1]][0].second,
+ Attachments[OpIds[2]][0].second,
+ Attachments[OpIds[2]][1].second};
+ std::vector<IoHash> AttachmentHashes{AttachmentBuffers[0].DecodeRawHash(),
+ AttachmentBuffers[1].DecodeRawHash(),
+ AttachmentBuffers[2].DecodeRawHash()};
+ std::vector<Oid> AttachedIds{Attachments[OpIds[1]][0].first, Attachments[OpIds[2]][0].first, Attachments[OpIds[2]][1].first};
+ std::vector<uint64_t> ResponseModTags(3, 0);
+ {
+ // Fresh fetch
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, {}));
+
+ CHECK_EQ(3, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ Oid Id = Chunk["Id"].AsObjectId();
+
+ auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id);
+ CHECK(It != AttachedIds.end());
+ ptrdiff_t Index = std::distance(AttachedIds.begin(), It);
+ CHECK_EQ(AttachedIds[Index], Id);
+ ResponseModTags[Index] = Chunk["ModTag"].AsUInt64();
+ CHECK_NE(0, ResponseModTags[Index]);
+ IoHash AttachmentHash = Chunk["RawHash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompressedBuffer Buffer = Attachment->AsCompressedBinary();
+ CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash());
+ CHECK(AttachmentBuffers[Index].GetCompressed().Flatten().GetView().EqualBytes(Buffer.GetCompressed().Flatten().GetView()));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ {
+ // Fetch with matching ModTag
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, ResponseModTags));
+
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ Oid Id = Chunk["Id"].AsObjectId();
+
+ auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id);
+ CHECK(It != AttachedIds.end());
+ ptrdiff_t Index = std::distance(AttachedIds.begin(), It);
+ CHECK_EQ(AttachedIds[Index], Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ {
+ // Fresh modtime query
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, {}));
+
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ Oid Id = Chunk["Id"].AsObjectId();
+
+ auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id);
+ CHECK(It != AttachedIds.end());
+ ptrdiff_t Index = std::distance(AttachedIds.begin(), It);
+ CHECK_EQ(AttachedIds[Index], Id);
+ CHECK_EQ(ResponseModTags[Index], Chunk["ModTag"].AsUInt64());
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ {
+ // Modtime query with matching ModTags
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, ResponseModTags));
+
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ Oid Id = Chunk["Id"].AsObjectId();
+
+ auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id);
+ CHECK(It != AttachedIds.end());
+ ptrdiff_t Index = std::distance(AttachedIds.begin(), It);
+ CHECK_EQ(AttachedIds[Index], Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ {
+ // Modtime query with mismatching ModTags
+ std::vector<uint64_t> MismatchingModTags(ResponseModTags);
+ for (uint64_t& Tag : MismatchingModTags)
+ {
+ Tag++;
+ }
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, MismatchingModTags));
+
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ Oid Id = Chunk["Id"].AsObjectId();
+
+ auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id);
+ CHECK(It != AttachedIds.end());
+ ptrdiff_t Index = std::distance(AttachedIds.begin(), It);
+ CHECK_EQ(AttachedIds[Index], Id);
+ CHECK(Chunk["ModTag"].AsUInt64() == ResponseModTags[Index]);
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ }
+}
+
+TEST_CASE("project.store.partial.read")
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
+ std::filesystem::path RootDir = TempDir.Path() / "root"sv;
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;
+
+ std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv;
+ std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv;
+ {
+ CreateDirectories(Project1FilePath.parent_path());
+ BasicFile ProjectFile;
+ ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate);
+ }
+
+ std::vector<Oid> OpIds;
+ OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()});
+ std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments;
+ {
+ Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ Project1RootDir.string(),
+ Project1FilePath.string()));
+ Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, {});
+ CHECK(Oplog);
+ Attachments[OpIds[0]] = {};
+ Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77});
+ Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{7123, 9583, 690, 99});
+ Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122});
+ for (auto It : Attachments)
+ {
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(It.first, It.second));
+ }
+ }
+ Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
+ CHECK(Project1);
+ Ref<ProjectStore::Oplog> Oplog1 = Project1->OpenOplog("oplog1"sv, false, true);
+ CHECK(Oplog1);
+ {
+ uint64_t ModificationTag = 0;
+
+ auto Result = ProjectStore.GetChunkRange(Log(),
+ *Project1,
+ *Oplog1,
+ Attachments[OpIds[1]][0].first,
+ 0,
+ ~0ull,
+ ZenContentType::kCompressedBinary,
+ &ModificationTag);
+
+ CHECK_EQ(ProjectStore::GetChunkRangeResult::EError::Ok, Result.Error);
+
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Attachment = CompressedBuffer::FromCompressed(Result.Chunk, RawHash, RawSize);
+ CHECK(RawSize == Attachments[OpIds[1]][0].second.DecodeRawSize());
+
+ auto Result2 = ProjectStore.GetChunkRange(Log(),
+ *Project1,
+ *Oplog1,
+ Attachments[OpIds[1]][0].first,
+ 0,
+ ~0ull,
+ ZenContentType::kCompressedBinary,
+ &ModificationTag);
+ CHECK_EQ(ProjectStore::GetChunkRangeResult::EError::NotModified, Result2.Error);
+ }
+
+ {
+ uint64_t FullChunkModificationTag = 0;
+ {
+ auto Result = ProjectStore.GetChunkRange(Log(),
+ *Project1,
+ *Oplog1,
+ Attachments[OpIds[2]][1].first,
+ 0,
+ ~0ull,
+ ZenContentType::kCompressedBinary,
+ &FullChunkModificationTag);
+ CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::Ok);
+ CHECK(Result.Chunk);
+ CHECK(CompressedBuffer::FromCompressedNoValidate(std::move(Result.Chunk)).DecodeRawSize() ==
+ Attachments[OpIds[2]][1].second.DecodeRawSize());
+ }
+ {
+ auto Result = ProjectStore.GetChunkRange(Log(),
+ *Project1,
+ *Oplog1,
+ Attachments[OpIds[2]][1].first,
+ 0,
+ ~0ull,
+ ZenContentType::kCompressedBinary,
+ &FullChunkModificationTag);
+ CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::NotModified);
+ }
+ }
+ {
+ uint64_t PartialChunkModificationTag = 0;
+ {
+ auto Result = ProjectStore.GetChunkRange(Log(),
+ *Project1,
+ *Oplog1,
+ Attachments[OpIds[2]][1].first,
+ 5,
+ 1773,
+ ZenContentType::kCompressedBinary,
+ &PartialChunkModificationTag);
+ CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::Ok);
+
+ IoHash PartialRawHash;
+ uint64_t PartialRawSize;
+ CompressedBuffer PartialCompressedResult = CompressedBuffer::FromCompressed(Result.Chunk, PartialRawHash, PartialRawSize);
+ CHECK(PartialRawSize >= 1773);
+
+ uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5);
+ SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed);
+ SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress();
+ const uint8_t* FullDataPtr = &(reinterpret_cast<const uint8_t*>(FullDecompressed.GetView().GetData())[5]);
+ const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData());
+ CHECK(FullDataPtr[0] == PartialDataPtr[0]);
+ }
+
+ {
+ auto Result = ProjectStore.GetChunkRange(Log(),
+ *Project1,
+ *Oplog1,
+ Attachments[OpIds[2]][1].first,
+ 0,
+ 1773,
+ ZenContentType::kCompressedBinary,
+ &PartialChunkModificationTag);
+ CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::NotModified);
+ }
+ }
+}
+
+TEST_CASE("project.store.block")
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, 2257, 3685, 3489,
+ 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251,
+ 491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225});
+
+ std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes);
+ std::vector<std::pair<IoHash, FetchChunkFunc>> Chunks;
+ Chunks.reserve(AttachmentSizes.size());
+ for (const auto& It : AttachmentsWithId)
+ {
+ Chunks.push_back(
+ std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
+ return {Buffer.DecodeRawSize(), Buffer};
+ }));
+ }
+ ChunkBlockDescription Block;
+ CompressedBuffer BlockBuffer = GenerateChunkBlock(std::move(Chunks), Block);
+ uint64_t HeaderSize;
+ CHECK(IterateChunkBlock(
+ BlockBuffer.Decompress(),
+ [](CompressedBuffer&&, const IoHash&) {},
+ HeaderSize));
+}
+
+TEST_CASE("project.store.iterateoplog")
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
+ std::filesystem::path RootDir = TempDir.Path() / "root"sv;
+ std::filesystem::path EngineRootDir = TempDir.Path() / "enginesv";
+
+ std::filesystem::path ProjectRootDir = TempDir.Path() / "game"sv;
+ std::filesystem::path ProjectFilePath = TempDir.Path() / "game"sv / "game.uproject"sv;
+ {
+ CreateDirectories(ProjectFilePath.parent_path());
+ BasicFile ProjectFile;
+ ProjectFile.Open(ProjectFilePath, BasicFile::Mode::kTruncate);
+ }
+ std::filesystem::path ProjectOplogPath = TempDir.Path() / "game"sv / "saves"sv / "cooked"sv / ".projectstore"sv;
+ {
+ CreateDirectories(ProjectOplogPath.parent_path());
+ BasicFile OplogFile;
+ OplogFile.Open(ProjectOplogPath, BasicFile::Mode::kTruncate);
+ }
+
+ Ref<ProjectStore::Project> TestProject(ProjectStore.NewProject(BasePath / "proj"sv,
+ "proj"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ ProjectRootDir.string(),
+ ProjectFilePath.string()));
+ Ref<ProjectStore::Oplog> Oplog = TestProject->NewOplog("oplog"sv, ProjectOplogPath);
+ CHECK(Oplog);
+
+ struct TestOidData
+ {
+ Oid KeyAsOidNotOplogId = Oid::NewOid();
+ std::string Key = KeyAsOidNotOplogId.ToString();
+ bool bFound = false;
+ };
+ constexpr int NumTestOids = 4;
+ TestOidData TestOids[NumTestOids];
+ for (const TestOidData& TestOid : TestOids)
+ {
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(TestOid.KeyAsOidNotOplogId, {}));
+ }
+ int Count = 0;
+
+ auto ResetTest = [&Count, &TestOids]() {
+ Count = 0;
+ for (TestOidData& TestOid : TestOids)
+ {
+ TestOid.bFound = false;
+ }
+ };
+ auto IncrementCount = [&Count](CbObjectView /* Op */) { ++Count; };
+ auto MarkFound = [&TestOids, &Count](ProjectStore::LogSequenceNumber /* LSN */, const Oid& /* InId */, CbObjectView Op) {
+ for (TestOidData& TestOid : TestOids)
+ {
+ if (Op["key"sv].AsString() == TestOid.Key)
+ {
+ TestOid.bFound = true;
+ ++Count;
+ }
+ }
+ };
+
+ // Tests of IterateOpLog and IterateOplogWithKey, with various Paging arguments
+ {
+ ResetTest();
+ Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{});
+ CHECK(Count == NumTestOids);
+ ResetTest();
+ Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{});
+ CHECK(Count == NumTestOids);
+ ResetTest();
+ Oplog->IterateOplogWithKey(MarkFound);
+ CHECK(Count == NumTestOids);
+
+ Count = 0;
+ for (int Start = 0; Start < NumTestOids; ++Start)
+ {
+ for (int Size = 0; Size < NumTestOids - Start; ++Size)
+ {
+ ResetTest();
+ Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{Start, Size});
+ CHECK(Count == Size);
+ ResetTest();
+ Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{Start, Size});
+ CHECK(Count == Size);
+ }
+
+ // Out of range Size arguments
+ ResetTest();
+ Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{Start, -1});
+ CHECK(Count == NumTestOids - Start);
+ ResetTest();
+ Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{Start, NumTestOids * 2});
+ CHECK(Count == NumTestOids - Start);
+ }
+
+ // Out of range Start arguments
+ for (int Size = 0; Size < NumTestOids; ++Size)
+ {
+ ResetTest();
+ Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{-1, Size});
+ CHECK(Count == Size);
+ ResetTest();
+ Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{-1, Size});
+ CHECK(Count == Size);
+
+ ResetTest();
+ Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{NumTestOids, Size});
+ CHECK(Count == 0);
+ ResetTest();
+ Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{NumTestOids, Size});
+ CHECK(Count == 0);
+ }
+ // Out of range Start and Size arguments
+ ResetTest();
+ Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{-1, -1});
+ CHECK(Count == NumTestOids);
+ ResetTest();
+ Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{-1, -1});
+ CHECK(Count == NumTestOids);
+
+ ResetTest();
+ Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{-1, 2 * NumTestOids});
+ CHECK(Count == NumTestOids);
+ ResetTest();
+ Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{-1, 2 * NumTestOids});
+ CHECK(Count == NumTestOids);
+
+ ResetTest();
+ Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{NumTestOids, -1});
+ CHECK(Count == 0);
+ ResetTest();
+ Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{NumTestOids, -1});
+ CHECK(Count == 0);
+
+ ResetTest();
+ Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{NumTestOids, 2 * NumTestOids});
+ CHECK(Count == 0);
+ ResetTest();
+ Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{NumTestOids, 2 * NumTestOids});
+ CHECK(Count == 0);
+ }
+}
+
+#endif
+
+void
+prj_forcelink()
+{
+}
+
+} // namespace zen