diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-02 16:35:22 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-02 16:35:22 +0200 |
| commit | 289379b55d19e08c47afb54a363fda9478623b9d (patch) | |
| tree | da2eb6aa2f95833c0c7063c6f71dc9576512d7c1 /src/zenserver | |
| parent | fix for RPC replay issue (wrong content-type) (#536) (diff) | |
| download | zen-289379b55d19e08c47afb54a363fda9478623b9d.tar.xz zen-289379b55d19e08c47afb54a363fda9478623b9d.zip | |
move projectstore to zenstore (#541)
Diffstat (limited to 'src/zenserver')
| -rw-r--r-- | src/zenserver/projectstore/httpprojectstore.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver/projectstore/oplogreferencedset.cpp | 95 | ||||
| -rw-r--r-- | src/zenserver/projectstore/oplogreferencedset.h | 46 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 8098 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 583 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 2 | ||||
| -rw-r--r-- | src/zenserver/vfs/vfsimpl.cpp | 2 | ||||
| -rw-r--r-- | src/zenserver/vfs/vfsimpl.h | 3 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 1 | ||||
| -rw-r--r-- | src/zenserver/zenserver.h | 2 |
10 files changed, 5 insertions, 8831 deletions
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index 175d131e1..c227c0f9e 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -2,8 +2,6 @@ #include "httpprojectstore.h" -#include "oplogreferencedset.h" -#include "projectstore.h" #include "remoteprojectstore.h" #include <zencore/compactbinarybuilder.h> @@ -18,6 +16,8 @@ #include <zencore/stream.h> #include <zencore/trace.h> #include <zenhttp/packageformat.h> +#include <zenstore/oplogreferencedset.h> +#include <zenstore/projectstore.h> #include <zenstore/zenstore.h> #include <zenutil/openprocesscache.h> #include <zenutil/workerpools.h> diff --git a/src/zenserver/projectstore/oplogreferencedset.cpp b/src/zenserver/projectstore/oplogreferencedset.cpp deleted file mode 100644 index b5cbc6f4b..000000000 --- a/src/zenserver/projectstore/oplogreferencedset.cpp +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "oplogreferencedset.h" - -#include "projectstore.h" - -#include <zencore/iobuffer.h> -#include <zencore/logging.h> -#include <zencore/string.h> - -namespace zen { - -std::optional<OplogReferencedSet> -OplogReferencedSet::LoadFromChunk(const IoBuffer& ChunkData) -{ - using namespace std::literals; - - if (!ChunkData || ChunkData.Size() == 0) - { - return std::optional<OplogReferencedSet>(); - } - std::string_view ChunkText(reinterpret_cast<const char*>(ChunkData.Data()), ChunkData.Size()); - - AsciiSet TrimWhitespace(" \t\r\n"); - const char* FirstNonComment = nullptr; - uint64_t Version = 0; - - // Parse the initial comment block: the leading block of empty or comment lines. - ForEachStrTok(ChunkText, '\n', [&Version, &FirstNonComment, &TrimWhitespace](std::string_view Line) { - Line = AsciiSet::TrimSuffixWith(AsciiSet::TrimPrefixWith(Line, TrimWhitespace), TrimWhitespace); - if (Line.empty()) - { - return true; // empty line, keep reading - } - if (!Line.starts_with('#')) - { - FirstNonComment = Line.data(); - return false; // non-comment line, stop - } - - // Comment line in the header block of comments in the file. Interpret it if it is a version line. - // Skip past the '#' and whitespace. - Line.remove_prefix(1); - Line = AsciiSet::TrimPrefixWith(Line, TrimWhitespace); - - // Parse "# Version <uint64>". - constexpr std::string_view VersionStr("Version "sv); - if (Line.starts_with(VersionStr)) - { - std::string_view VersionToken = Line; - VersionToken.remove_prefix(VersionStr.length()); - VersionToken.remove_suffix(AsciiSet::TrimPrefixWithout(VersionToken, TrimWhitespace).length()); - std::optional<uint64_t> ParsedVersion = ParseInt<uint64_t>(VersionToken); - if (ParsedVersion) - { - Version = *ParsedVersion; - } - } - return true; - }); - - // Report no referencedset if the version is below our minimum version. - constexpr uint64_t MinSupportedVersion = 1; - if (Version < MinSupportedVersion) - { - ZEN_WARN("ReferencedSet is below the minimum supported version, ignoring it. Version: {}, minimum version: {}.", - Version, - MinSupportedVersion); - return std::optional<OplogReferencedSet>(); - } - - // Parse the remaining lines after the leading comment block. - ChunkText.remove_prefix(FirstNonComment ? FirstNonComment - ChunkText.data() : ChunkText.length()); - - eastl::fixed_vector<uint8_t, 256> TmpBuffer; - - OplogReferencedSet Result; - ForEachStrTok(ChunkText, '\n', [&Result, &TrimWhitespace, &TmpBuffer](std::string_view Line) { - Line = AsciiSet::TrimSuffixWith(AsciiSet::TrimPrefixWith(Line, TrimWhitespace), TrimWhitespace); - if (!Line.empty() && !Line.starts_with('#')) - { - Result.Set.emplace(OpKeyStringAsOid(Line, TmpBuffer)); - } - return true; - }); - return std::optional<OplogReferencedSet>(std::move(Result)); -} - -void -OplogReferencedSet::Clear() -{ - Set.clear(); -} - -} // namespace zen diff --git a/src/zenserver/projectstore/oplogreferencedset.h b/src/zenserver/projectstore/oplogreferencedset.h deleted file mode 100644 index dcc156060..000000000 --- a/src/zenserver/projectstore/oplogreferencedset.h +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/uid.h> - -#include <optional> -#include <string_view> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <tsl/robin_set.h> -ZEN_THIRD_PARTY_INCLUDES_END - -namespace zen { - -class IoBuffer; - -/** - * @brief Records which keys in the oplog (entry["op"]["key"]) are in the ReferencedSet reported by the client that uploaded the oplog. - * - * An oplog can contain ops from an earlier incremental cook result that are no longer referenced in the most recent cook; - * the OplogReferencedSet allows clients that need to view only referenced-by-head-cook entries to trim the oplog down to - * those entries. - * - * Keys are case-sensitive; client must ensure that capitalization matches between the ReferencedSet and the oplog keys. - */ -class OplogReferencedSet -{ -public: - inline bool Contains(const Oid& OplogId) const { return Set.contains(OplogId); } - static inline bool IsNonPackage(std::string_view OplogKey) - { - // A referencedset always includes all non-package keys - return OplogKey.empty() || !OplogKey.starts_with('/'); - } - void Clear(); - - static std::optional<OplogReferencedSet> LoadFromChunk(const IoBuffer& ChunkData); - - static constexpr std::string_view ReferencedSetOplogKey = "ReferencedSet"; - -private: - tsl::robin_set<Oid, Oid::Hasher> Set; -}; - -} // namespace zen diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp deleted file mode 100644 index 5034a250a..000000000 --- a/src/zenserver/projectstore/projectstore.cpp +++ /dev/null @@ -1,8098 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "projectstore.h" - -#include <zencore/assertfmt.h> -#include <zencore/compactbinaryutil.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/except.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/memory/llm.h> -#include <zencore/scopeguard.h> -#include <zencore/stream.h> -#include <zencore/timer.h> -#include <zencore/trace.h> -#include <zenstore/caslog.h> -#include <zenstore/cidstore.h> -#include <zenstore/scrubcontext.h> -#include <zenutil/parallelwork.h> -#include <zenutil/referencemetadata.h> -#include <zenutil/workerpools.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <tsl/robin_set.h> -#include <xxh3.h> -ZEN_THIRD_PARTY_INCLUDES_END - -#if ZEN_WITH_TESTS -# include <zencore/testing.h> -# include <zencore/testutils.h> -# include <zenutil/chunkblock.h> - -# include <unordered_map> -#endif // ZEN_WITH_TESTS - -namespace zen { - -const FLLMTag& -GetProjectstoreTag() -{ - static FLLMTag _("store", FLLMTag("project")); - - return _; -} - -namespace { - bool PrepareDirectoryDelete(const std::filesystem::path& Dir, std::filesystem::path& OutDeleteDir) - { - std::filesystem::path DroppedBucketPath; - do - { - if (!IsDir(Dir)) - { - return true; - } - - StringBuilder<64> MovedId; - Oid::NewOid().ToString(MovedId); - - std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), MovedId); - DroppedBucketPath = Dir.parent_path() / DroppedName; - if (IsDir(DroppedBucketPath)) - { - if (!DeleteDirectories(DroppedBucketPath)) - { - ZEN_INFO("Drop directory '{}' for '{}' already exists but could not be removed, attempting different name.", - DroppedBucketPath, - Dir); - continue; - } - if (IsDir(DroppedBucketPath)) - { - ZEN_INFO("Drop directory '{}' for '{}' still exists after remove, attempting different name.", DroppedBucketPath, Dir); - continue; - } - } - - int RenameAttempt = 0; - do - { - std::error_code Ec; - RenameDirectory(Dir, DroppedBucketPath, Ec); - if (!Ec) - { - OutDeleteDir = DroppedBucketPath; - return true; - } - if (IsDir(DroppedBucketPath)) - { - ZEN_INFO("Can't rename '{}' to still existing drop directory '{}'. Reason: '{}'. Attempting different name.", - Dir, - DroppedBucketPath, - Ec.message()); - break; - } - if (++RenameAttempt == 10) - { - ZEN_INFO("Can't rename '{}' to drop directory '{}' after {} attempts. Reason: {}.", - Dir, - DroppedBucketPath, - RenameAttempt, - Ec.message()); - return false; - } - ZEN_INFO("Can't rename '{}' to drop directory '{}', pausing and retrying. Reason: {}.", - Dir, - DroppedBucketPath, - Ec.message()); - Sleep(100); - } while (true); - - } while (true); - - return false; - } - - bool IsFileOlderThan(const std::filesystem::path& CheckPath, const std::filesystem::path& ReferencePath) - { - std::error_code Ec; - std::filesystem::file_time_type CheckWriteTime = std::filesystem::last_write_time(CheckPath, Ec); - if (Ec) - { - return true; - } - std::filesystem::file_time_type ReferenceWriteTime = std::filesystem::last_write_time(ReferencePath, Ec); - if (Ec) - { - return true; - } - return CheckWriteTime < ReferenceWriteTime; - } - - std::pair<int32_t, int32_t> GetPagedRange(int32_t TotalSize, const ProjectStore::Oplog::Paging& EntryPaging) - { - int32_t Start = std::clamp(EntryPaging.Start, 0, TotalSize); - int32_t End = EntryPaging.Count < 0 ? TotalSize : (Start + std::min<int32_t>(EntryPaging.Count, TotalSize - Start)); - return {Start, End}; - } - -#pragma pack(push) -#pragma pack(1) - struct OplogIndexHeader - { - OplogIndexHeader() {} - - struct BodyV1 - { - uint64_t LogPosition = 0; - uint32_t LSNCount = 0; - uint64_t KeyCount = 0; - uint32_t OpAddressMapCount = 0; - uint32_t LatestOpMapCount = 0; - uint64_t ChunkMapCount = 0; - uint64_t MetaMapCount = 0; - uint64_t FileMapCount = 0; - }; - - struct BodyV2 - { - uint64_t LogPosition = 0; - uint64_t KeyCount = 0; - uint32_t OpPayloadIndexCount = 0; - uint32_t OpPayloadCount = 0; - uint32_t ChunkMapCount = 0; - uint32_t MetaMapCount = 0; - uint32_t FileMapCount = 0; - uint32_t MaxLSN = 0; - uint32_t NextOpCoreOffset = 0; // note: Multiple of oplog data alignment! - uint32_t Reserved[2] = {0, 0}; - }; - - static constexpr uint32_t ExpectedMagic = 0x7569647a; // 'zidx'; - static constexpr uint32_t Version1 = 1; - static constexpr uint32_t Version2 = 2; - static constexpr uint32_t CurrentVersion = Version2; - static constexpr uint64_t DataAlignment = 8; - - uint32_t Magic = ExpectedMagic; - uint32_t Version = CurrentVersion; - - union - { - BodyV1 V1; - BodyV2 V2; - }; - - uint32_t Checksum = 0; - - static uint32_t ComputeChecksum(const OplogIndexHeader& Header) - { - return XXH32(&Header.Magic, sizeof(OplogIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); - } - }; - -#pragma pack(pop) - - static_assert(sizeof(OplogIndexHeader) == 64); - - static std::uint64_t GetModificationTagFromRawHash(const IoHash& Hash) - { - IoHash::Hasher H; - return H(Hash); - } - - static std::uint64_t GetModificationTagFromModificationTime(IoBuffer FileBuffer) - { - IoBufferFileReference FileRef; - if (FileBuffer.GetFileReference(FileRef)) - { - std::error_code Ec; - uint64_t ModificationTick = GetModificationTickFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - return ModificationTick; - } - } - return {}; - } - -} // namespace - -////////////////////////////////////////////////////////////////////////// - -Oid -ComputeOpKey(const CbObjectView& Op) -{ - using namespace std::literals; - - eastl::fixed_vector<uint8_t, 256> KeyData; - - Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { - auto Begin = reinterpret_cast<const uint8_t*>(Data); - auto End = Begin + Size; - KeyData.insert(KeyData.end(), Begin, End); - }); - - XXH3_128 KeyHash128; - - // This logic currently exists to work around a problem caused by misusing the xxhash - // functions in the past. Short keys are evaluated using the old and buggy - // path but longer paths are evaluated properly. In the future all key lengths - // should be evaluated using the proper path, this is a temporary workaround to - // maintain compatibility with existing disk state. - if (KeyData.size() < 240) - { - XXH3_128Stream_deprecated KeyHasher; - KeyHasher.Append(KeyData.data(), KeyData.size()); - KeyHash128 = KeyHasher.GetHash(); - } - else - { - KeyHash128 = XXH3_128::HashMemory(KeyData.data(), KeyData.size()); - } - - Oid KeyHash; - memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); - - return KeyHash; -} - -struct ProjectStore::OplogStorage : public RefCounted -{ - OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath) - { - } - - ~OplogStorage() - { - ZEN_DEBUG("oplog '{}/{}': closing oplog storage at {}", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - m_OplogStoragePath); - try - { - m_Oplog.Close(); - m_OpBlobs.Close(); - } - catch (const std::exception& Ex) - { - ZEN_WARN("oplog '{}/{}': flushing oplog at '{}' failed. Reason: '{}'", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - m_OplogStoragePath, - Ex.what()); - } - } - - [[nodiscard]] bool Exists() const { return Exists(m_OplogStoragePath); } - [[nodiscard]] static bool Exists(const std::filesystem::path& BasePath) - { - return IsFile(GetLogPath(BasePath)) && IsFile(GetBlobsPath(BasePath)); - } - [[nodiscard]] bool IsValid() const { return IsValid(m_OplogStoragePath); } - [[nodiscard]] static bool IsValid(const std::filesystem::path& BasePath) - { - return TCasLogFile<OplogEntry>::IsValid(GetLogPath(BasePath)); - } - void WipeState() const - { - std::error_code Ec; - RemoveFile(GetLogPath(), Ec); - RemoveFile(GetBlobsPath(), Ec); - } - - static bool Delete(const std::filesystem::path& BasePath) { return DeleteDirectories(BasePath); } - - uint64_t OpBlobsSize() const { return FileSizeFromPath(GetBlobsPath()); } - - uint64_t OpsSize() const { return OpsSize(m_OplogStoragePath); } - static uint64_t OpsSize(const std::filesystem::path& BasePath) - { - if (Exists(BasePath)) - { - std::error_code DummyEc; - return FileSizeFromPath(GetLogPath(BasePath)) + FileSizeFromPath(GetBlobsPath(BasePath)); - } - return 0; - } - - uint32_t MaxLSN() const { return m_MaxLsn; } - void SetMaxLSNAndNextWriteAddress(uint32_t MaxLSN, const OplogEntryAddress& NextOpFileOffset) - { - m_MaxLsn.store(MaxLSN); - m_NextOpsOffset = RoundUp((NextOpFileOffset.Offset * m_OpsAlign) + NextOpFileOffset.Size, m_OpsAlign); - } - - void SetMaxLSNAndNextOpOffset(uint32_t MaxLSN, uint32_t NextOpOffset) - { - m_MaxLsn.store(MaxLSN); - m_NextOpsOffset = NextOpOffset * m_OpsAlign; - } - - uint32_t GetNextOpOffset() const { return gsl::narrow<uint32_t>(m_NextOpsOffset / m_OpsAlign); } - - enum EMode - { - Create, - Read, - Write - }; - - void Open(EMode InMode) - { - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::OplogStorage::Open"); - - CasLogFile::Mode LogMode = CasLogFile::Mode::kRead; - BasicFile::Mode BlobsMode = BasicFile::Mode::kRead; - - if (InMode == EMode::Create) - { - ZEN_DEBUG("oplog '{}/{}': initializing storage at '{}'", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - m_OplogStoragePath); - - DeleteDirectories(m_OplogStoragePath); - CreateDirectories(m_OplogStoragePath); - - LogMode = CasLogFile::Mode::kTruncate; - BlobsMode = BasicFile::Mode::kTruncate; - } - else if (InMode == EMode::Write) - { - LogMode = CasLogFile::Mode::kWrite; - BlobsMode = BasicFile::Mode::kWrite; - ZEN_DEBUG("oplog '{}/{}': opening storage at '{}'", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - m_OplogStoragePath); - } - else if (InMode == EMode::Read) - { - ZEN_DEBUG("oplog '{}/{}': opening read only storage at '{}'", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - m_OplogStoragePath); - } - - m_Oplog.Open(GetLogPath(m_OplogStoragePath), LogMode); - m_Oplog.Initialize(); - - m_OpBlobs.Open(GetBlobsPath(m_OplogStoragePath), BlobsMode); - - ZEN_ASSERT(IsPow2(m_OpsAlign)); - ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1))); - } - - IoBuffer GetOpBuffer(BasicFileBuffer& OpBlobsBuffer, const OplogEntry& LogEntry) const - { - ZEN_MEMSCOPE(GetProjectstoreTag()); - - const uint64_t OpFileOffset = LogEntry.OpCoreAddress.Offset * m_OpsAlign; - const MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreAddress.Size, OpFileOffset); - if (OpBufferView.GetSize() == LogEntry.OpCoreAddress.Size) - { - return IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize()); - } - else - { - IoBuffer OpBuffer(LogEntry.OpCoreAddress.Size); - OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreAddress.Size, OpFileOffset); - return OpBuffer; - } - } - - uint64_t GetEffectiveBlobsSize(std::span<const Oplog::OplogPayload> Addresses) const - { - uint64_t EffectiveSize = 0; - for (const Oplog::OplogPayload& OpPayload : Addresses) - { - EffectiveSize += RoundUp(OpPayload.Address.Size, m_OpsAlign); - } - return EffectiveSize; - } - - void Compact(std::span<const ProjectStore::LogSequenceNumber> LSNs, - std::function<void(const Oid& OpKeyHash, - ProjectStore::LogSequenceNumber OldLSN, - ProjectStore::LogSequenceNumber NewLSN, - const OplogEntryAddress& NewAddress)>&& Callback, - bool RetainLSNs, - bool DryRun) - { - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::OplogStorage::Compact"); - - ZEN_INFO("oplog '{}/{}': compacting at '{}'", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - m_OplogStoragePath); - - Stopwatch Timer; - - StringBuilder<64> OplogName; - Oid::NewOid().ToString(OplogName); - - std::filesystem::path OplogPath = m_OplogStoragePath / OplogName.c_str(); - - std::error_code Ec; - TCasLogFile<OplogEntry> Oplog; - Oplog.Open(OplogPath, CasLogFile::Mode::kTruncate); - (void)Oplog.Initialize(); - - TemporaryFile OpBlobs; - OpBlobs.CreateTemporary(m_OplogStoragePath, Ec); - if (Ec) - { - throw std::system_error(Ec, fmt::format("Failed to create temp file for op blob at '{}'", m_OplogStoragePath)); - } - - try - { - std::vector<OplogEntry> Ops; - Ops.reserve(LSNs.size()); - - ProjectStore::LsnMap<size_t> LSNToIndex; - LSNToIndex.reserve(LSNs.size()); - for (ProjectStore::LogSequenceNumber LSN : LSNs) - { - LSNToIndex[LSN] = (size_t)-1; - } - - RwLock::ExclusiveLockScope Lock(m_RwLock); - - const uint64_t SkipEntryCount = 0; - m_Oplog.Replay( - [&](const OplogEntry& LogEntry) { - if (auto It = LSNToIndex.find(LogEntry.OpLsn); It != LSNToIndex.end()) - { - if (It->second != (size_t)-1) - { - Ops[It->second] = LogEntry; - } - else - { - LSNToIndex[LogEntry.OpLsn] = Ops.size(); - Ops.push_back(LogEntry); - } - } - }, - SkipEntryCount); - - std::sort(Ops.begin(), Ops.end(), [&](const OplogEntry& Lhs, const OplogEntry& Rhs) { - return Lhs.OpCoreAddress.Offset < Rhs.OpCoreAddress.Offset; - }); - - std::vector<ProjectStore::LogSequenceNumber> OldLSNs; - OldLSNs.reserve(Ops.size()); - - uint64_t OpWriteOffset = 0; - uint32_t MaxLSN = 0; - { - BasicFileBuffer OldBlobsBuffer(m_OpBlobs, 65536); - BasicFileWriter NewOpBlobsBuffer(OpBlobs, 65536); - - for (OplogEntry& LogEntry : Ops) - { - OldLSNs.push_back(LogEntry.OpLsn); - - IoBuffer OpBuffer = GetOpBuffer(OldBlobsBuffer, LogEntry); - if (RetainLSNs) - { - MaxLSN = Max(MaxLSN, LogEntry.OpLsn.Number); - } - else - { - LogEntry.OpLsn = ProjectStore::LogSequenceNumber(++MaxLSN); - } - LogEntry.OpCoreAddress.Offset = gsl::narrow<uint32_t>(OpWriteOffset / m_OpsAlign); - NewOpBlobsBuffer.Write(OpBuffer.GetData(), LogEntry.OpCoreAddress.Size, OpWriteOffset); - OpWriteOffset = RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign); - } - Oplog.Append(Ops); - } - - uint64_t OldOpLogSize = m_Oplog.GetLogSize(); - uint64_t OldOpBlobsSize = m_OpBlobs.FileSize(); - - if (!DryRun) - { - m_Oplog.Close(); - m_OpBlobs.Close(); - - Oplog.Close(); - RenameFile(OplogPath, GetLogPath(), Ec); - if (Ec) - { - throw std::system_error( - Ec, - fmt::format("Oplog::Compact failed to rename temporary oplog blob storage file from '{}' to '{}'", - OplogPath, - GetLogPath())); - } - OpBlobs.MoveTemporaryIntoPlace(GetBlobsPath(), Ec); - if (Ec) - { - // We failed late - clean everything up as best we can - RemoveFile(OpBlobs.GetPath(), Ec); - RemoveFile(GetLogPath(), Ec); - RemoveFile(GetBlobsPath(), Ec); - throw std::system_error(Ec, - fmt::format("Oplog::Compact failed to rename temporary oplog file from '{}' to '{}'", - OpBlobs.GetPath(), - GetBlobsPath())); - } - - m_Oplog.Open(GetLogPath(), CasLogFile::Mode::kWrite); - m_Oplog.Initialize(); - - m_OpBlobs.Open(GetBlobsPath(), BasicFile::Mode::kWrite); - - m_MaxLsn.store(MaxLSN); - m_NextOpsOffset.store(OpWriteOffset); - } - - for (size_t Index = 0; Index < Ops.size(); Index++) - { - const OplogEntry& LogEntry = Ops[Index]; - Callback(LogEntry.OpKeyHash, OldLSNs[Index], LogEntry.OpLsn, LogEntry.OpCoreAddress); - } - - ZEN_INFO("oplog '{}/{}': compact completed in {} - Max LSN# {}, New size: {}, old size {}.", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - m_MaxLsn.load(), - NiceBytes(m_Oplog.GetLogSize() + m_OpBlobs.FileSize()), - NiceBytes(OldOpLogSize + OldOpBlobsSize)); - } - catch (const std::exception& /*Ex*/) - { - RemoveFile(OpBlobs.GetPath(), Ec); - throw; - } - } - - static std::filesystem::path GetLogPath(const std::filesystem::path& OplogStoragePath) - { - using namespace std::literals; - return OplogStoragePath / "ops.zlog"sv; - } - - static std::filesystem::path GetBlobsPath(const std::filesystem::path& OplogStoragePath) - { - using namespace std::literals; - return OplogStoragePath / "ops.zops"sv; - } - - std::filesystem::path GetLogPath() const { return GetLogPath(m_OplogStoragePath); } - - std::filesystem::path GetBlobsPath() const { return GetBlobsPath(m_OplogStoragePath); } - - void ReadOplogEntriesFromLog(std::vector<OplogEntry>& OutOpLogEntries, uint64_t& OutInvalidEntries, uint64_t SkipEntryCount) - { - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::OplogStorage::ReadOplogEntriesFromLog"); - - if (m_Oplog.GetLogCount() == SkipEntryCount) - { - return; - } - - Stopwatch Timer; - - uint64_t OpsBlockSize = m_OpBlobs.FileSize(); - - { - tsl::robin_map<Oid, size_t, Oid::Hasher> LatestKeys; - if (m_Oplog.GetLogCount() > SkipEntryCount) - { - LatestKeys.reserve(m_Oplog.GetLogCount() - SkipEntryCount); - } - - m_Oplog.Replay( - [&](const OplogEntry& LogEntry) { - if (LogEntry.IsTombstone()) - { - if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It == LatestKeys.end()) - { - ZEN_SCOPED_WARN("found tombstone referencing unknown key {}", LogEntry.OpKeyHash); - ++OutInvalidEntries; - return; - } - } - else if (LogEntry.OpCoreAddress.Size == 0) - { - ZEN_SCOPED_WARN("skipping zero size op {}", LogEntry.OpKeyHash); - ++OutInvalidEntries; - return; - } - else if (LogEntry.OpLsn.Number == 0) - { - ZEN_SCOPED_WARN("skipping zero lsn op {}", LogEntry.OpKeyHash); - ++OutInvalidEntries; - return; - } - - const uint64_t OpFileOffset = LogEntry.OpCoreAddress.Offset * m_OpsAlign; - if ((OpFileOffset + LogEntry.OpCoreAddress.Size) > OpsBlockSize) - { - ZEN_SCOPED_WARN("skipping out of bounds op {}", LogEntry.OpKeyHash); - ++OutInvalidEntries; - return; - } - - if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It != LatestKeys.end()) - { - OplogEntry& Entry = OutOpLogEntries[It->second]; - - if (LogEntry.IsTombstone() && Entry.IsTombstone()) - { - ZEN_SCOPED_WARN("found double tombstone - {}", LogEntry.OpKeyHash); - } - - Entry = LogEntry; - } - else - { - const size_t OpIndex = OutOpLogEntries.size(); - LatestKeys[LogEntry.OpKeyHash] = OpIndex; - OutOpLogEntries.push_back(LogEntry); - } - }, - SkipEntryCount); - } - } - - void ReplayLog(std::function<void(CbObjectView, const OplogEntry&)>&& Handler, uint64_t SkipEntryCount = 0) - { - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::OplogStorage::ReplayLog"); - - if (m_Oplog.GetLogCount() == SkipEntryCount) - { - return; - } - - Stopwatch Timer; - - std::vector<OplogEntry> OpLogEntries; - uint64_t InvalidEntries = 0; - - ReadOplogEntriesFromLog(OpLogEntries, InvalidEntries, SkipEntryCount); - - uint64_t TombstoneEntries = 0; - - BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); - - uint32_t MaxOpLsn = m_MaxLsn; - uint64_t NextOpFileOffset = m_NextOpsOffset; - - std::sort(OpLogEntries.begin(), OpLogEntries.end(), [&](const OplogEntry& Lhs, const OplogEntry& Rhs) { - return Lhs.OpCoreAddress.Offset < Rhs.OpCoreAddress.Offset; - }); - - for (const OplogEntry& LogEntry : OpLogEntries) - { - if (LogEntry.IsTombstone()) - { - TombstoneEntries++; - } - else - { - IoBuffer OpBuffer = GetOpBuffer(OpBlobsBuffer, LogEntry); - - // Verify checksum, ignore op data if incorrect - - const uint32_t ExpectedOpCoreHash = LogEntry.OpCoreHash; - const uint32_t OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.GetData(), LogEntry.OpCoreAddress.Size) & 0xffffFFFF); - - if (OpCoreHash != ExpectedOpCoreHash) - { - ZEN_WARN("oplog '{}/{}': skipping bad checksum op - {}. Expected: {}, found: {}", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - LogEntry.OpKeyHash, - ExpectedOpCoreHash, - OpCoreHash); - } - else if (CbValidateError Err = ValidateCompactBinary(OpBuffer.GetView(), CbValidateMode::Default); - Err != CbValidateError::None) - { - ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Error: '{}'", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - LogEntry.OpKeyHash, - ToString(Err)); - } - else - { - CbObjectView OpView(OpBuffer.GetData()); - if (OpView.GetSize() != OpBuffer.GetSize()) - { - ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - LogEntry.OpKeyHash, - OpView.GetSize(), - OpBuffer.GetSize()); - } - else - { - Handler(OpView, LogEntry); - MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn.Number); - const uint64_t EntryNextOpFileOffset = - RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign); - NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset); - } - } - } - } - - m_MaxLsn = MaxOpLsn; - m_NextOpsOffset = NextOpFileOffset; - - ZEN_INFO("oplog '{}/{}': replay from '{}' completed in {} - Max LSN# {}, Next offset: {}, {} tombstones, {} invalid entries", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - m_OplogStoragePath, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - m_MaxLsn.load(), - m_NextOpsOffset.load(), - TombstoneEntries, - InvalidEntries); - } - - void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries, - const std::span<const Oplog::PayloadIndex> Order, - std::function<void(LogSequenceNumber Lsn, CbObjectView)>&& Handler) - { - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries"); - - BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); - - for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order) - { - const Oplog::OplogPayload& Entry = Entries[EntryOffset]; - - const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign; - MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset); - if (OpBufferView.GetSize() == Entry.Address.Size) - { - if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None) - { - CbObjectView OpView(OpBufferView.GetData()); - if (OpView.GetSize() != OpBufferView.GetSize()) - { - ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - Entry.Lsn.Number, - OpView.GetSize(), - OpBufferView.GetSize()); - } - else - { - Handler(Entry.Lsn, OpView); - } - } - else - { - ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - Entry.Lsn.Number, - ToString(Error)); - } - } - else - { - IoBuffer OpBuffer(Entry.Address.Size); - OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset); - OpBufferView = OpBuffer.GetView(); - if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None) - { - CbObjectView OpView(OpBuffer.Data()); - if (OpView.GetSize() != OpBuffer.GetSize()) - { - ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - Entry.Lsn.Number, - OpView.GetSize(), - OpBuffer.GetSize()); - } - else - { - Handler(Entry.Lsn, OpView); - } - } - else - { - ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - Entry.Lsn.Number, - ToString(Error)); - } - } - } - } - - CbObject GetOp(const OplogEntryAddress& Entry) - { - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::OplogStorage::GetOp"); - - IoBuffer OpBuffer(Entry.Size); - - const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign; - m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset); - - return CbObject(SharedBuffer(std::move(OpBuffer))); - } - - struct AppendOpData - { - MemoryView Buffer; - uint32_t OpCoreHash; - Oid KeyHash; - }; - - static OplogStorage::AppendOpData GetAppendOpData(const CbObjectView& Core) - { - ZEN_MEMSCOPE(GetProjectstoreTag()); - using namespace std::literals; - - AppendOpData OpData; - - OpData.Buffer = Core.GetView(); - const uint64_t WriteSize = OpData.Buffer.GetSize(); - OpData.OpCoreHash = uint32_t(XXH3_64bits(OpData.Buffer.GetData(), WriteSize) & 0xffffFFFF); - - ZEN_ASSERT(WriteSize != 0); - - OpData.KeyHash = ComputeOpKey(Core); - - return OpData; - } - - OplogEntry AppendOp(const AppendOpData& OpData) - { - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::OplogStorage::AppendOp"); - - uint64_t WriteSize = OpData.Buffer.GetSize(); - - RwLock::ExclusiveLockScope Lock(m_RwLock); - const uint64_t WriteOffset = m_NextOpsOffset; - const LogSequenceNumber OpLsn(++m_MaxLsn); - if (!OpLsn.Number) - { - ZEN_ERROR("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId()); - throw std::runtime_error(fmt::format("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId())); - } - m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign); - Lock.ReleaseNow(); - - ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign)); - - OplogEntry Entry = {.OpLsn = OpLsn, - .OpCoreAddress = {.Offset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign), .Size = uint32_t(WriteSize)}, - .OpCoreHash = OpData.OpCoreHash, - .OpKeyHash = OpData.KeyHash}; - - m_OpBlobs.Write(OpData.Buffer.GetData(), WriteSize, WriteOffset); - m_Oplog.Append(Entry); - - return Entry; - } - - std::vector<OplogEntry> AppendOps(std::span<const AppendOpData> Ops) - { - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::OplogStorage::AppendOps"); - - size_t OpCount = Ops.size(); - std::vector<std::pair<uint64_t, uint64_t>> OffsetAndSizes; - std::vector<LogSequenceNumber> OpLsns; - OffsetAndSizes.resize(OpCount); - OpLsns.resize(OpCount); - - for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) - { - OffsetAndSizes[OpIndex].second = Ops[OpIndex].Buffer.GetSize(); - } - - uint64_t WriteStart = 0; - uint64_t WriteLength = 0; - { - RwLock::ExclusiveLockScope Lock(m_RwLock); - WriteStart = m_NextOpsOffset; - ZEN_ASSERT(IsMultipleOf(WriteStart, m_OpsAlign)); - uint64_t WriteOffset = WriteStart; - for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) - { - OffsetAndSizes[OpIndex].first = WriteOffset - WriteStart; - OpLsns[OpIndex] = LogSequenceNumber(++m_MaxLsn); - if (!OpLsns[OpIndex]) - { - ZEN_ERROR("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId()); - throw std::runtime_error(fmt::format("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId())); - } - WriteOffset = RoundUp(WriteOffset + OffsetAndSizes[OpIndex].second, m_OpsAlign); - } - WriteLength = WriteOffset - WriteStart; - m_NextOpsOffset = RoundUp(WriteOffset, m_OpsAlign); - } - - IoBuffer WriteBuffer(WriteLength); - - std::vector<OplogEntry> Entries; - Entries.resize(OpCount); - for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) - { - MutableMemoryView WriteBufferView = WriteBuffer.GetMutableView().RightChop(OffsetAndSizes[OpIndex].first); - WriteBufferView.CopyFrom(Ops[OpIndex].Buffer); - Entries[OpIndex] = { - .OpLsn = OpLsns[OpIndex], - .OpCoreAddress = {.Offset = gsl::narrow_cast<uint32_t>((WriteStart + OffsetAndSizes[OpIndex].first) / m_OpsAlign), - .Size = uint32_t(OffsetAndSizes[OpIndex].second)}, - .OpCoreHash = Ops[OpIndex].OpCoreHash, - .OpKeyHash = Ops[OpIndex].KeyHash}; - } - - m_OpBlobs.Write(WriteBuffer.GetData(), WriteBuffer.GetSize(), WriteStart); - m_Oplog.Append(Entries); - - return Entries; - } - - void AppendTombstone(Oid KeyHash) - { - OplogEntry Entry = {.OpKeyHash = KeyHash}; - Entry.MakeTombstone(); - - m_Oplog.Append(Entry); - } - - void Flush() - { - m_Oplog.Flush(); - m_OpBlobs.Flush(); - } - uint64_t LogCount() const { return m_Oplog.GetLogCount(); } - - LoggerRef Log() { return m_OwnerOplog->Log(); } - -private: - ProjectStore::Oplog* m_OwnerOplog; - std::filesystem::path m_OplogStoragePath; - mutable RwLock m_RwLock; - TCasLogFile<OplogEntry> m_Oplog; - BasicFile m_OpBlobs; - std::atomic<uint64_t> m_NextOpsOffset{0}; - uint64_t m_OpsAlign = 32; - std::atomic<uint32_t> m_MaxLsn{0}; -}; - -////////////////////////////////////////////////////////////////////////// - -ProjectStore::Oplog::Oplog(const LoggerRef& InLog, - std::string_view ProjectIdentifier, - std::string_view Id, - CidStore& Store, - const std::filesystem::path& BasePath, - const std::filesystem::path& MarkerPath, - EMode State) -: m_Log(InLog) -, m_OuterProjectId(ProjectIdentifier) -, m_OplogId(Id) -, m_CidStore(Store) -, m_BasePath(BasePath) -, m_MarkerPath(MarkerPath) -, m_TempPath(m_BasePath / std::string_view("temp")) -, m_MetaPath(m_BasePath / std::string_view("ops.meta")) -, m_Mode(State) -, m_MetaValid(false) -, m_PendingPrepOpAttachmentsRetainEnd(GcClock::Now()) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - using namespace std::literals; - - m_Storage = new OplogStorage(this, m_BasePath); - OplogStorage::EMode StorageMode = OplogStorage::EMode::Write; - if (m_Mode == EMode::kBasicReadOnly) - { - StorageMode = OplogStorage::EMode::Read; - } - else - { - bool StoreExists = m_Storage->Exists(); - if (StoreExists) - { - if (!m_Storage->IsValid()) - { - ZEN_WARN("Invalid oplog found at '{}'. Wiping state for oplog.", m_BasePath); - m_Storage->WipeState(); - std::error_code DummyEc; - RemoveFile(m_MetaPath, DummyEc); - StoreExists = false; - } - } - if (!StoreExists) - { - StorageMode = OplogStorage::EMode::Create; - } - } - m_Storage->Open(StorageMode); - m_MetaValid = !IsFileOlderThan(m_MetaPath, m_Storage->GetBlobsPath()); - - CleanDirectory(m_TempPath, /*ForceRemoveReadOnlyFiles*/ false); -} - -ProjectStore::Oplog::~Oplog() -{ - if (m_Storage) - { - Flush(); - } -} - -void -ProjectStore::Oplog::Flush() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - ZEN_TRACE_CPU("Oplog::Flush"); - - if (m_Mode == EMode::kFull) - { - RwLock::SharedLockScope Lock(m_OplogLock); - - if (!m_Storage) - { - return; - } - - m_Storage->Flush(); - if (!m_MetaValid) - { - std::error_code DummyEc; - RemoveFile(m_MetaPath, DummyEc); - } - - uint64_t LogCount = m_Storage->LogCount(); - if (m_LogFlushPosition != LogCount || m_IsLegacySnapshot) - { - WriteIndexSnapshot(); - } - } -} - -void -ProjectStore::Oplog::RefreshLsnToPayloadOffsetMap(RwLock::ExclusiveLockScope&) -{ - if (!m_LsnToPayloadOffsetMap) - { - m_LsnToPayloadOffsetMap = std::make_unique<LsnMap<PayloadIndex>>(); - m_LsnToPayloadOffsetMap->reserve(m_OpLogPayloads.size()); - for (uint32_t PayloadOffset = 0; PayloadOffset < m_OpLogPayloads.size(); PayloadOffset++) - { - const OplogPayload& Payload = m_OpLogPayloads[PayloadOffset]; - if (Payload.Lsn.Number != 0 && Payload.Address.Size != 0) - { - m_LsnToPayloadOffsetMap->insert_or_assign(Payload.Lsn, PayloadIndex(PayloadOffset)); - } - } - } -} - -void -ProjectStore::Oplog::Scrub(ScrubContext& Ctx) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - ZEN_ASSERT(m_Mode == EMode::kFull); - - std::vector<std::pair<LogSequenceNumber, Oid>> BadEntries; - - using namespace std::literals; - - IterateOplogWithKey([&](LogSequenceNumber Lsn, const Oid& Key, CbObjectView Op) { - { - const Oid KeyHash = ComputeOpKey(Op); - if (KeyHash != Key) - { - BadEntries.push_back({Lsn, Key}); - ZEN_WARN("Scrub: oplog data does not match information from index (op:{} != index:{})", KeyHash, Key); - return; - } - } - - // TODO: Should we really delete an Op because it points to a missing or malformed Cid chunk? - - Op.IterateAttachments([&](CbFieldView Visitor) { - const IoHash Cid = Visitor.AsAttachment(); - if (Ctx.IsBadCid(Cid)) - { - // oplog entry references a CAS chunk which has been flagged as bad - BadEntries.push_back({Lsn, Key}); - return; - } - if (!m_CidStore.ContainsChunk(Cid)) - { - // oplog entry references a CAS chunk which is not present - BadEntries.push_back({Lsn, Key}); - return; - } - }); - }); - - if (!BadEntries.empty()) - { - if (Ctx.RunRecovery()) - { - ZEN_WARN("oplog '{}/{}': scrubbing found {} bad ops in oplog @ '{}', these will be removed from the index", - m_OuterProjectId, - m_OplogId, - BadEntries.size(), - m_BasePath); - - // Actually perform some clean-up - RwLock::ExclusiveLockScope Lock(m_OplogLock); - - RefreshLsnToPayloadOffsetMap(Lock); - - for (const auto& BadEntry : BadEntries) - { - const LogSequenceNumber BadLsn = BadEntry.first; - const Oid& BadKey = BadEntry.second; - if (auto It = m_OpToPayloadOffsetMap.find(BadKey); It != m_OpToPayloadOffsetMap.end()) - { - OplogPayload& Payload = m_OpLogPayloads[It->second]; - if (Payload.Lsn == BadLsn) - { - m_OpToPayloadOffsetMap.erase(It); - m_Storage->AppendTombstone(BadKey); - } - } - if (auto LsnIt = m_LsnToPayloadOffsetMap->find(BadLsn); LsnIt != m_LsnToPayloadOffsetMap->end()) - { - const PayloadIndex LsnPayloadOffset = LsnIt->second; - OplogPayload& LsnPayload = m_OpLogPayloads[LsnPayloadOffset]; - LsnPayload = {.Lsn = LogSequenceNumber(0), .Address = {.Offset = 0, .Size = 0}}; - } - m_LsnToPayloadOffsetMap->erase(BadLsn); - } - if (!BadEntries.empty()) - { - m_MetaValid = false; - } - } - else - { - ZEN_WARN("oplog '{}/{}': scrubbing found {} bad ops in oplog @ '{}' but no cleanup will be performed", - m_OuterProjectId, - m_OplogId, - BadEntries.size(), - m_BasePath); - } - } -} - -uint64_t -ProjectStore::Oplog::TotalSize(const std::filesystem::path& BasePath) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - using namespace std::literals; - - uint64_t Size = OplogStorage::OpsSize(BasePath); - std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; - if (IsFile(StateFilePath)) - { - Size += FileSizeFromPath(StateFilePath); - } - std::filesystem::path MetaFilePath = BasePath / "ops.meta"sv; - if (IsFile(MetaFilePath)) - { - Size += FileSizeFromPath(MetaFilePath); - } - std::filesystem::path IndexFilePath = BasePath / "ops.zidx"sv; - if (IsFile(IndexFilePath)) - { - Size += FileSizeFromPath(IndexFilePath); - } - - return Size; -} - -uint64_t -ProjectStore::Oplog::TotalSize() const -{ - return TotalSize(m_BasePath); -} - -void -ProjectStore::Oplog::ResetState() -{ - RwLock::ExclusiveLockScope _(m_OplogLock); - m_ChunkMap.clear(); - m_MetaMap.clear(); - m_FileMap.clear(); - m_OpToPayloadOffsetMap.clear(); - m_OpLogPayloads.clear(); - m_LsnToPayloadOffsetMap.reset(); - m_Storage = {}; -} - -bool -ProjectStore::Oplog::PrepareForDelete(std::filesystem::path& OutRemoveDirectory) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - RwLock::ExclusiveLockScope _(m_OplogLock); - m_UpdateCaptureRefCounter = 0; - m_CapturedOps.reset(); - m_CapturedAttachments.reset(); - m_PendingPrepOpAttachments.clear(); - m_ChunkMap.clear(); - m_MetaMap.clear(); - m_FileMap.clear(); - m_OpToPayloadOffsetMap.clear(); - m_OpLogPayloads.clear(); - m_LsnToPayloadOffsetMap.reset(); - m_Storage = {}; - if (PrepareDirectoryDelete(m_BasePath, OutRemoveDirectory)) - { - return true; - } - return false; -} - -bool -ProjectStore::Oplog::ExistsAt(const std::filesystem::path& BasePath) -{ - using namespace std::literals; - - std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; - return IsFile(StateFilePath); -} - -bool -ProjectStore::Oplog::Exists() const -{ - return ExistsAt(m_BasePath); -} - -void -ProjectStore::Oplog::Read() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - using namespace std::literals; - ZEN_TRACE_CPU("Oplog::Read"); - ZEN_LOG_SCOPE("Oplog::Read '{}'", m_OplogId); - - ZEN_DEBUG("oplog '{}/{}': reading from '{}'. State: {}", - m_OuterProjectId, - m_OplogId, - m_BasePath, - m_Mode == EMode::kFull ? "Full" : "BasicNoLookups"); - - std::optional<CbObject> Config = ReadStateFile(m_BasePath, [this]() { return Log(); }); - if (Config.has_value()) - { - if (Config.value().GetSize() == 0) - { - // Invalid config file - return; - } - m_MarkerPath = Config.value()["gcpath"sv].AsU8String(); - } - - if (!m_MetaValid) - { - std::error_code DummyEc; - RemoveFile(m_MetaPath, DummyEc); - } - - ZEN_ASSERT(!m_LsnToPayloadOffsetMap); - - ReadIndexSnapshot(); - - if (m_Mode == EMode::kFull) - { - m_Storage->ReplayLog( - [&](CbObjectView Op, const OplogEntry& OpEntry) { - const OplogEntryMapping OpMapping = GetMapping(Op); - // Update chunk id maps - for (const ChunkMapping& Chunk : OpMapping.Chunks) - { - m_ChunkMap.insert_or_assign(Chunk.Id, Chunk.Hash); - } - - for (const FileMapping& File : OpMapping.Files) - { - if (File.Hash != IoHash::Zero) - { - m_ChunkMap.insert_or_assign(File.Id, File.Hash); - } - m_FileMap.insert_or_assign(File.Id, - FileMapEntry{.ServerPath = File.Hash == IoHash::Zero ? File.ServerPath : std::string(), - .ClientPath = File.ClientPath}); - } - - for (const ChunkMapping& Meta : OpMapping.Meta) - { - m_MetaMap.insert_or_assign(Meta.Id, Meta.Hash); - } - - const PayloadIndex PayloadOffset(m_OpLogPayloads.size()); - - m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset); - m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress}); - }, - m_LogFlushPosition); - - if (m_Storage->LogCount() != m_LogFlushPosition || m_IsLegacySnapshot) - { - WriteIndexSnapshot(); - } - } - else - { - std::vector<OplogEntry> OpLogEntries; - uint64_t InvalidEntries; - m_Storage->ReadOplogEntriesFromLog(OpLogEntries, InvalidEntries, m_LogFlushPosition); - for (const OplogEntry& OpEntry : OpLogEntries) - { - const PayloadIndex PayloadOffset(m_OpLogPayloads.size()); - - m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset); - m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress}); - } - } -} - -void -ProjectStore::Oplog::Write() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_ASSERT(m_Mode != EMode::kBasicReadOnly); - - using namespace std::literals; - - BinaryWriter Mem; - - CbObjectWriter Cfg; - - Cfg << "gcpath"sv << PathToUtf8(m_MarkerPath); - - Cfg.Save(Mem); - - std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; - - ZEN_DEBUG("oplog '{}/{}': persisting config to '{}'", m_OuterProjectId, m_OplogId, StateFilePath); - - TemporaryFile::SafeWriteFile(StateFilePath, Mem.GetView()); -} - -void -ProjectStore::Oplog::Update(const std::filesystem::path& MarkerPath) -{ - if (m_MarkerPath == MarkerPath) - { - return; - } - Write(); -} - -bool -ProjectStore::Oplog::Reset() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_ASSERT(m_Mode == EMode::kFull); - std::filesystem::path MovedDir; - - { - RwLock::ExclusiveLockScope OplogLock(m_OplogLock); - m_Storage = {}; - if (!PrepareDirectoryDelete(m_BasePath, MovedDir)) - { - m_Storage = new OplogStorage(this, m_BasePath); - const bool StoreExists = m_Storage->Exists(); - m_Storage->Open(StoreExists ? OplogStorage::EMode::Write : OplogStorage::EMode::Create); - m_MetaValid = !IsFileOlderThan(m_MetaPath, m_Storage->GetBlobsPath()); - return false; - } - m_ChunkMap.clear(); - m_MetaMap.clear(); - m_FileMap.clear(); - m_OpToPayloadOffsetMap.clear(); - m_OpLogPayloads.clear(); - m_LsnToPayloadOffsetMap.reset(); - m_Storage = new OplogStorage(this, m_BasePath); - m_Storage->Open(OplogStorage::EMode::Create); - m_MetaValid = false; - CleanDirectory(m_TempPath, /*ForceRemoveReadOnlyFiles*/ false); - Write(); - } - // Erase content on disk - if (!MovedDir.empty()) - { - OplogStorage::Delete(MovedDir); - } - return true; -} - -bool -ProjectStore::Oplog::CanUnload() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - RwLock::SharedLockScope _(m_OplogLock); - - uint64_t LogCount = m_Storage->LogCount(); - if (m_LogFlushPosition != LogCount) - { - return false; // The oplog is not flushed so likely this is an active oplog - } - - if (!m_PendingPrepOpAttachments.empty()) - { - return false; // We have a pending oplog prep operation in flight - } - if (m_UpdateCaptureRefCounter > 0) - { - return false; // GC capture is enable for the oplog - } - return true; -} - -std::optional<CbObject> -ProjectStore::Oplog::ReadStateFile(const std::filesystem::path& BasePath, std::function<LoggerRef()>&& Log) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Oplog::ReadStateFile"); - using namespace std::literals; - - std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; - if (IsFile(StateFilePath)) - { - // ZEN_INFO("oplog '{}/{}': config read from '{}'", m_OuterProjectId, m_OplogId, StateFilePath); - - BasicFile Blob; - Blob.Open(StateFilePath, BasicFile::Mode::kRead); - - IoBuffer Obj = Blob.ReadAll(); - CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); - - if (ValidationError != CbValidateError::None) - { - ZEN_ERROR("validation error {} hit for oplog config at '{}'", ToString(ValidationError), StateFilePath); - return CbObject(); - } - - return LoadCompactBinaryObject(Obj); - } - ZEN_INFO("config for oplog not found at '{}'. Assuming legacy store", StateFilePath); - return {}; -} - -ProjectStore::Oplog::ValidationResult -ProjectStore::Oplog::Validate(const std::filesystem::path& ProjectRootDir, - std::atomic_bool& IsCancelledFlag, - WorkerThreadPool* OptionalWorkerPool) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - using namespace std::literals; - - ValidationResult Result; - - const size_t OpCount = OplogCount(); - - std::vector<Oid> KeyHashes; - std::vector<std::string> Keys; - std::vector<std::vector<IoHash>> Attachments; - std::vector<OplogEntryMapping> Mappings; - - KeyHashes.reserve(OpCount); - Keys.reserve(OpCount); - Mappings.reserve(OpCount); - - IterateOplogWithKey([&](LogSequenceNumber LSN, const Oid& Key, CbObjectView OpView) { - Result.LSNLow = Min(Result.LSNLow, LSN); - Result.LSNHigh = Max(Result.LSNHigh, LSN); - KeyHashes.push_back(Key); - Keys.emplace_back(std::string(OpView["key"sv].AsString())); - - std::vector<IoHash> OpAttachments; - OpView.IterateAttachments([&OpAttachments](CbFieldView Attachment) { OpAttachments.push_back(Attachment.AsAttachment()); }); - Attachments.emplace_back(std::move(OpAttachments)); - - Mappings.push_back(GetMapping(OpView)); - }); - - Result.OpCount = gsl::narrow<uint32_t>(Keys.size()); - - RwLock ResultLock; - - auto ValidateOne = [&](uint32_t OpIndex) { - const Oid& KeyHash = KeyHashes[OpIndex]; - const std::string& Key = Keys[OpIndex]; - const OplogEntryMapping& Mapping(Mappings[OpIndex]); - bool HasMissingEntries = false; - for (const ChunkMapping& Chunk : Mapping.Chunks) - { - if (!m_CidStore.ContainsChunk(Chunk.Hash)) - { - ResultLock.WithExclusiveLock([&]() { Result.MissingChunks.push_back({KeyHash, Chunk}); }); - HasMissingEntries = true; - } - } - - for (const ChunkMapping& Meta : Mapping.Meta) - { - if (!m_CidStore.ContainsChunk(Meta.Hash)) - { - ResultLock.WithExclusiveLock([&]() { Result.MissingMetas.push_back({KeyHash, Meta}); }); - HasMissingEntries = true; - } - } - - for (const FileMapping& File : Mapping.Files) - { - if (File.Hash == IoHash::Zero) - { - std::filesystem::path FilePath = ProjectRootDir / File.ServerPath; - if (!IsFile(FilePath)) - { - ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); - HasMissingEntries = true; - } - } - else if (!m_CidStore.ContainsChunk(File.Hash)) - { - ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); - HasMissingEntries = true; - } - } - - const std::vector<IoHash>& OpAttachments = Attachments[OpIndex]; - for (const IoHash& Attachment : OpAttachments) - { - if (!m_CidStore.ContainsChunk(Attachment)) - { - ResultLock.WithExclusiveLock([&]() { Result.MissingAttachments.push_back({KeyHash, Attachment}); }); - HasMissingEntries = true; - } - } - - if (HasMissingEntries) - { - ResultLock.WithExclusiveLock([&]() { Result.OpKeys.push_back({KeyHash, Key}); }); - } - }; - - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); - try - { - for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) - { - if (AbortFlag) - { - break; - } - if (OptionalWorkerPool) - { - Work.ScheduleWork(*OptionalWorkerPool, [&ValidateOne, Index = OpIndex](std::atomic<bool>& AbortFlag) { - ZEN_MEMSCOPE(GetProjectstoreTag()); - if (AbortFlag) - { - return; - } - ValidateOne(Index); - }); - } - else - { - ValidateOne(OpIndex); - } - } - } - catch (const std::exception& Ex) - { - AbortFlag.store(true); - ZEN_WARN("Failed validating oplogs in {}. Reason: '{}'", m_BasePath, Ex.what()); - } - Work.Wait(); - - { - // Check if we were deleted while we were checking the references without a lock... - RwLock::SharedLockScope _(m_OplogLock); - if (!m_Storage) - { - Result = {}; - } - } - - return Result; -} - -void -ProjectStore::Oplog::WriteIndexSnapshot() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Oplog::WriteIndexSnapshot"); - - ZEN_ASSERT(m_Mode == EMode::kFull); - - ZEN_DEBUG("oplog '{}/{}': write store snapshot at '{}'", m_OuterProjectId, m_OplogId, m_BasePath); - uint64_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("oplog '{}/{}': wrote store snapshot for '{}' containing {} entries in {}", - m_OuterProjectId, - m_OplogId, - m_BasePath, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - namespace fs = std::filesystem; - - const fs::path IndexPath = m_BasePath / "ops.zidx"; - try - { - std::vector<Oid> Keys; - std::vector<PayloadIndex> OpPayloadOffsets; - std::vector<IoHash> ChunkMapEntries; - std::vector<IoHash> MetaMapEntries; - std::vector<uint32_t> FilePathLengths; - std::vector<std::string> FilePaths; - uint64_t IndexLogPosition = 0; - { - IndexLogPosition = m_Storage->LogCount(); - Keys.reserve(m_OpToPayloadOffsetMap.size() + m_ChunkMap.size() + m_MetaMap.size() + m_FileMap.size()); - OpPayloadOffsets.reserve(m_OpToPayloadOffsetMap.size()); - for (const auto& Kv : m_OpToPayloadOffsetMap) - { - Keys.push_back(Kv.first); - OpPayloadOffsets.push_back(Kv.second); - } - - ChunkMapEntries.reserve(m_ChunkMap.size()); - for (const auto& It : m_ChunkMap) - { - Keys.push_back(It.first); - ChunkMapEntries.push_back(It.second); - } - - MetaMapEntries.reserve(m_MetaMap.size()); - for (const auto& It : m_MetaMap) - { - Keys.push_back(It.first); - MetaMapEntries.push_back(It.second); - } - - FilePathLengths.reserve(m_FileMap.size() * 2); - FilePaths.reserve(m_FileMap.size() * 2); - for (const auto& It : m_FileMap) - { - Keys.push_back(It.first); - FilePathLengths.push_back(gsl::narrow<uint32_t>(It.second.ServerPath.length())); - FilePathLengths.push_back(gsl::narrow<uint32_t>(It.second.ClientPath.length())); - FilePaths.push_back(It.second.ServerPath); - FilePaths.push_back(It.second.ClientPath); - } - } - - TemporaryFile ObjectIndexFile; - std::error_code Ec; - ObjectIndexFile.CreateTemporary(IndexPath.parent_path(), Ec); - if (Ec) - { - throw std::system_error(Ec, fmt::format("Failed to create temp file for index snapshot at '{}'", IndexPath)); - } - - { - BasicFileWriter IndexFile(ObjectIndexFile, 65536u); - - OplogIndexHeader Header; - Header.V2 = {.LogPosition = IndexLogPosition, - .KeyCount = gsl::narrow<uint64_t>(Keys.size()), - .OpPayloadIndexCount = gsl::narrow<uint32_t>(OpPayloadOffsets.size()), - .OpPayloadCount = gsl::narrow<uint32_t>(m_OpLogPayloads.size()), - .ChunkMapCount = gsl::narrow<uint32_t>(ChunkMapEntries.size()), - .MetaMapCount = gsl::narrow<uint32_t>(MetaMapEntries.size()), - .FileMapCount = gsl::narrow<uint32_t>(FilePathLengths.size() / 2), - .MaxLSN = m_Storage->MaxLSN(), - .NextOpCoreOffset = m_Storage->GetNextOpOffset()}; - - Header.Checksum = OplogIndexHeader::ComputeChecksum(Header); - - uint64_t Offset = 0; - IndexFile.Write(&Header, sizeof(OplogIndexHeader), Offset); - Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); - - IndexFile.Write(Keys.data(), Keys.size() * sizeof(Oid), Offset); - Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); - - IndexFile.Write(OpPayloadOffsets.data(), OpPayloadOffsets.size() * sizeof(PayloadIndex), Offset); - Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); - - IndexFile.Write(m_OpLogPayloads.data(), m_OpLogPayloads.size() * sizeof(OplogPayload), Offset); - Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); - - IndexFile.Write(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset); - Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); - - IndexFile.Write(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset); - Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); - - IndexFile.Write(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset); - Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); - - for (const auto& FilePath : FilePaths) - { - IndexFile.Write(FilePath.c_str(), FilePath.length(), Offset); - Offset += FilePath.length(); - } - } - - ObjectIndexFile.Flush(); - ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); - if (Ec) - { - throw std::system_error(Ec, - fmt::format("Snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", - ObjectIndexFile.GetPath(), - IndexPath, - Ec.message())); - } - EntryCount = m_OpLogPayloads.size(); - m_LogFlushPosition = IndexLogPosition; - m_IsLegacySnapshot = false; - } - catch (const std::exception& Err) - { - ZEN_WARN("oplog '{}/{}': snapshot FAILED, reason: '{}'", m_OuterProjectId, m_OplogId, Err.what()); - } -} - -void -ProjectStore::Oplog::ReadIndexSnapshot() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Oplog::ReadIndexSnapshot"); - - const std::filesystem::path IndexPath = m_BasePath / "ops.zidx"; - if (IsFile(IndexPath)) - { - uint64_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_DEBUG("oplog '{}/{}': index read from '{}' containing {} entries in {}", - m_OuterProjectId, - m_OplogId, - IndexPath, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - try - { - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); - uint64_t Size = ObjectIndexFile.FileSize(); - if (Size >= sizeof(OplogIndexHeader)) - { - OplogIndexHeader Header; - uint64_t Offset = 0; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - Offset += sizeof(OplogIndexHeader); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - - if (Header.Magic == OplogIndexHeader::ExpectedMagic) - { - uint32_t Checksum = OplogIndexHeader::ComputeChecksum(Header); - if (Header.Checksum != Checksum) - { - ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Checksum mismatch. Expected: {}, Found: {}", - m_OuterProjectId, - m_OplogId, - IndexPath, - Header.Checksum, - Checksum); - return; - } - - if (Header.Version == OplogIndexHeader::Version1) - { - uint32_t MaxLSN = 0; - OplogEntryAddress LastOpAddress{.Offset = 0, .Size = 0}; - - if (Header.V1.LatestOpMapCount + Header.V1.ChunkMapCount + Header.V1.MetaMapCount + Header.V1.FileMapCount != - Header.V1.KeyCount) - { - ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Key count mismatch. Expected: {}, Found: {}", - m_OuterProjectId, - m_OplogId, - IndexPath, - Header.V1.LatestOpMapCount + Header.V1.ChunkMapCount + Header.V1.MetaMapCount + Header.V1.FileMapCount, - Header.V1.KeyCount); - return; - } - - std::vector<uint32_t> LSNEntries(Header.V1.LSNCount); - ObjectIndexFile.Read(LSNEntries.data(), LSNEntries.size() * sizeof(uint32_t), Offset); - Offset += LSNEntries.size() * sizeof(uint32_t); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - size_t LSNOffset = 0; - - std::vector<Oid> Keys(Header.V1.KeyCount); - ObjectIndexFile.Read(Keys.data(), Keys.size() * sizeof(Oid), Offset); - Offset += Keys.size() * sizeof(Oid); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - size_t KeyOffset = 0; - - { - std::vector<OplogEntryAddress> AddressMapEntries(Header.V1.OpAddressMapCount); - ObjectIndexFile.Read(AddressMapEntries.data(), AddressMapEntries.size() * sizeof(OplogEntryAddress), Offset); - Offset += AddressMapEntries.size() * sizeof(OplogEntryAddress); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - - tsl::robin_map<uint32_t, PayloadIndex> LsnToPayloadOffset; - LsnToPayloadOffset.reserve(AddressMapEntries.size()); - - m_OpLogPayloads.reserve(AddressMapEntries.size()); - for (const OplogEntryAddress& Address : AddressMapEntries) - { - const uint32_t LSN = LSNEntries[LSNOffset++]; - LsnToPayloadOffset.insert_or_assign(LSN, PayloadIndex(m_OpLogPayloads.size())); - - m_OpLogPayloads.push_back({.Lsn = LogSequenceNumber(LSN), .Address = Address}); - if (Address.Offset > LastOpAddress.Offset) - { - LastOpAddress = Address; - } - MaxLSN = Max(MaxLSN, LSN); - } - - { - std::vector<uint32_t> LatestOpMapEntries(Header.V1.LatestOpMapCount); - ObjectIndexFile.Read(LatestOpMapEntries.data(), LatestOpMapEntries.size() * sizeof(uint32_t), Offset); - Offset += LatestOpMapEntries.size() * sizeof(uint32_t); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - m_OpToPayloadOffsetMap.reserve(LatestOpMapEntries.size()); - for (uint32_t Lsn : LatestOpMapEntries) - { - if (auto It = LsnToPayloadOffset.find(Lsn); It != LsnToPayloadOffset.end()) - { - m_OpToPayloadOffsetMap.insert_or_assign(Keys[KeyOffset++], It->second); - MaxLSN = Max(MaxLSN, Lsn); - } - } - } - } - if (m_Mode == EMode::kFull) - { - { - std::vector<IoHash> ChunkMapEntries(Header.V1.ChunkMapCount); - ObjectIndexFile.Read(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset); - Offset += ChunkMapEntries.size() * sizeof(IoHash); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - m_ChunkMap.reserve(ChunkMapEntries.size()); - for (const IoHash& ChunkId : ChunkMapEntries) - { - m_ChunkMap.insert_or_assign(Keys[KeyOffset++], ChunkId); - } - } - { - std::vector<IoHash> MetaMapEntries(Header.V1.MetaMapCount); - ObjectIndexFile.Read(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset); - Offset += MetaMapEntries.size() * sizeof(IoHash); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - m_MetaMap.reserve(MetaMapEntries.size()); - for (const IoHash& ChunkId : MetaMapEntries) - { - m_MetaMap.insert_or_assign(Keys[KeyOffset++], ChunkId); - } - } - { - m_FileMap.reserve(Header.V1.FileMapCount); - - std::vector<uint32_t> FilePathLengths(Header.V1.FileMapCount * 2); - ObjectIndexFile.Read(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset); - Offset += FilePathLengths.size() * sizeof(uint32_t); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - - BasicFileBuffer IndexFile(ObjectIndexFile, 65536); - auto ReadString([&IndexFile, &Offset](uint32_t Length) -> std::string { - MemoryView StringData = IndexFile.MakeView(Length, Offset); - if (StringData.GetSize() != Length) - { - throw std::runtime_error(fmt::format("Invalid format. Expected to read {} bytes but got {}", - Length, - uint32_t(StringData.GetSize()))); - } - Offset += Length; - return std::string((const char*)StringData.GetData(), Length); - }); - for (uint64_t FileLengthOffset = 0; FileLengthOffset < FilePathLengths.size();) - { - std::string ServerPath = ReadString(FilePathLengths[FileLengthOffset++]); - std::string ClientPath = ReadString(FilePathLengths[FileLengthOffset++]); - m_FileMap.insert_or_assign( - Keys[KeyOffset++], - FileMapEntry{.ServerPath = std::move(ServerPath), .ClientPath = std::move(ClientPath)}); - } - } - } - m_LogFlushPosition = Header.V1.LogPosition; - m_Storage->SetMaxLSNAndNextWriteAddress(MaxLSN, LastOpAddress); - EntryCount = Header.V1.LSNCount; - m_IsLegacySnapshot = true; - } - else if (Header.Version == OplogIndexHeader::Version2) - { - std::vector<Oid> Keys(Header.V2.KeyCount); - - ObjectIndexFile.Read(Keys.data(), Keys.size() * sizeof(Oid), Offset); - Offset += Keys.size() * sizeof(Oid); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - - std::vector<PayloadIndex> OpPayloadOffsets(Header.V2.OpPayloadIndexCount); - ObjectIndexFile.Read(OpPayloadOffsets.data(), OpPayloadOffsets.size() * sizeof(PayloadIndex), Offset); - Offset += OpPayloadOffsets.size() * sizeof(PayloadIndex); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - - m_OpLogPayloads.resize(Header.V2.OpPayloadCount); - ObjectIndexFile.Read(m_OpLogPayloads.data(), m_OpLogPayloads.size() * sizeof(OplogPayload), Offset); - Offset += m_OpLogPayloads.size() * sizeof(OplogPayload); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - - size_t KeyOffset = 0; - - m_OpToPayloadOffsetMap.reserve(Header.V2.OpPayloadIndexCount); - for (const PayloadIndex PayloadOffset : OpPayloadOffsets) - { - const Oid& Key = Keys[KeyOffset++]; - ZEN_ASSERT_SLOW(PayloadOffset.Index < Header.V2.OpPayloadCount); - m_OpToPayloadOffsetMap.insert({Key, PayloadOffset}); - } - - if (m_Mode == EMode::kFull) - { - { - std::vector<IoHash> ChunkMapEntries(Header.V2.ChunkMapCount); - ObjectIndexFile.Read(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset); - Offset += ChunkMapEntries.size() * sizeof(IoHash); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - m_ChunkMap.reserve(ChunkMapEntries.size()); - for (const IoHash& ChunkId : ChunkMapEntries) - { - m_ChunkMap.insert_or_assign(Keys[KeyOffset++], ChunkId); - } - } - { - std::vector<IoHash> MetaMapEntries(Header.V2.MetaMapCount); - ObjectIndexFile.Read(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset); - Offset += MetaMapEntries.size() * sizeof(IoHash); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - m_MetaMap.reserve(MetaMapEntries.size()); - for (const IoHash& ChunkId : MetaMapEntries) - { - m_MetaMap.insert_or_assign(Keys[KeyOffset++], ChunkId); - } - } - { - m_FileMap.reserve(Header.V2.FileMapCount); - - std::vector<uint32_t> FilePathLengths(Header.V2.FileMapCount * 2); - ObjectIndexFile.Read(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset); - Offset += FilePathLengths.size() * sizeof(uint32_t); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - - BasicFileBuffer IndexFile(ObjectIndexFile, 65536); - auto ReadString([&IndexFile, &Offset](uint32_t Length) -> std::string { - MemoryView StringData = IndexFile.MakeView(Length, Offset); - if (StringData.GetSize() != Length) - { - throw std::runtime_error(fmt::format("Invalid format. Expected to read {} bytes but got {}", - Length, - uint32_t(StringData.GetSize()))); - } - Offset += Length; - return std::string((const char*)StringData.GetData(), Length); - }); - for (uint64_t FileLengthOffset = 0; FileLengthOffset < FilePathLengths.size();) - { - std::string ServerPath = ReadString(FilePathLengths[FileLengthOffset++]); - std::string ClientPath = ReadString(FilePathLengths[FileLengthOffset++]); - m_FileMap.insert_or_assign( - Keys[KeyOffset++], - FileMapEntry{.ServerPath = std::move(ServerPath), .ClientPath = std::move(ClientPath)}); - } - } - } - m_LogFlushPosition = Header.V2.LogPosition; - m_Storage->SetMaxLSNAndNextOpOffset(Header.V2.MaxLSN, Header.V2.NextOpCoreOffset); - EntryCount = Header.V2.OpPayloadCount; - m_IsLegacySnapshot = false; - } - else - { - ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Unsupported version number {}", - m_OuterProjectId, - m_OplogId, - IndexPath, - Header.Version); - return; - } - } - else - { - ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'", m_OuterProjectId, m_OplogId, IndexPath); - } - } - } - catch (const std::exception& Ex) - { - m_OpToPayloadOffsetMap.clear(); - m_OpLogPayloads.clear(); - m_ChunkMap.clear(); - m_MetaMap.clear(); - m_FileMap.clear(); - m_LogFlushPosition = 0; - ZEN_ERROR("oplog '{}/{}': failed reading index snapshot from '{}'. Reason: '{}'", - m_OuterProjectId, - m_OplogId, - IndexPath, - Ex.what()); - } - } -} - -uint32_t -ProjectStore::Oplog::GetUnusedSpacePercent() const -{ - RwLock::SharedLockScope OplogLock(m_OplogLock); - return GetUnusedSpacePercentLocked(); -} - -uint32_t -ProjectStore::Oplog::GetUnusedSpacePercentLocked() const -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - const uint64_t ActualBlobsSize = m_Storage->OpBlobsSize(); - if (ActualBlobsSize == 0) - { - return 0; - } - - const uint64_t EffectiveBlobsSize = m_Storage->GetEffectiveBlobsSize(m_OpLogPayloads); - - if (EffectiveBlobsSize < ActualBlobsSize) - { - return gsl::narrow<uint32_t>((100 * (ActualBlobsSize - EffectiveBlobsSize)) / ActualBlobsSize); - } - - return 0; -} - -void -ProjectStore::Oplog::Compact(bool DryRun, bool RetainLSNs, std::string_view LogPrefix) -{ - RwLock::ExclusiveLockScope Lock(m_OplogLock); - Compact(Lock, DryRun, RetainLSNs, LogPrefix); -} - -void -ProjectStore::Oplog::Compact(RwLock::ExclusiveLockScope&, bool DryRun, bool RetainLSNs, std::string_view LogPrefix) -{ - ZEN_ASSERT(m_Mode == EMode::kFull); - - ZEN_MEMSCOPE(GetProjectstoreTag()); - - Stopwatch Timer; - - std::vector<LogSequenceNumber> LatestLSNs; - LatestLSNs.reserve(m_OpLogPayloads.size()); - for (const auto& Kv : m_OpToPayloadOffsetMap) - { - const OplogPayload& OpPayload = m_OpLogPayloads[Kv.second]; - LatestLSNs.push_back(OpPayload.Lsn); - } - - std::vector<OplogPayload> OpPayloads; - OpPayloads.reserve(LatestLSNs.size()); - - OidMap<PayloadIndex> OpToPayloadOffsetMap; - OpToPayloadOffsetMap.reserve(LatestLSNs.size()); - - uint64_t PreSize = TotalSize(); - - m_Storage->Compact( - LatestLSNs, - [&](const Oid& OpKeyHash, LogSequenceNumber, LogSequenceNumber NewLsn, const OplogEntryAddress& NewAddress) { - OpToPayloadOffsetMap.insert({OpKeyHash, PayloadIndex(OpPayloads.size())}); - OpPayloads.push_back({.Lsn = NewLsn, .Address = NewAddress}); - }, - RetainLSNs, - /*DryRun*/ DryRun); - - if (!DryRun) - { - m_OpToPayloadOffsetMap.swap(OpToPayloadOffsetMap); - m_OpLogPayloads.swap(OpPayloads); - m_LsnToPayloadOffsetMap.reset(); - WriteIndexSnapshot(); - } - - uint64_t PostSize = TotalSize(); - uint64_t FreedSize = (PreSize > PostSize) ? (PreSize - PostSize) : 0; - - ZEN_INFO("{} oplog '{}/{}': Compacted in {}. New size: {}, freeing: {}", - LogPrefix, - m_OuterProjectId, - m_OplogId, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - NiceBytes(PostSize), - NiceBytes(FreedSize)); -} - -IoBuffer -ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash) -{ - ZEN_ASSERT(m_Mode == EMode::kFull); - return m_CidStore.FindChunkByCid(RawHash); -} - -bool -ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes, - bool IncludeModTag, - const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool, - uint64_t LargeSizeLimit) -{ - ZEN_ASSERT(m_Mode == EMode::kFull); - return m_CidStore.IterateChunks( - RawHashes, - [&](size_t Index, const IoBuffer& Payload) { - return AsyncCallback(Index, Payload, IncludeModTag ? GetModificationTagFromRawHash(RawHashes[Index]) : 0); - }, - OptionalWorkerPool, - LargeSizeLimit); -} - -bool -ProjectStore::Oplog::IterateChunks(const std::filesystem::path& ProjectRootDir, - std::span<Oid> ChunkIds, - bool IncludeModTag, - const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool, - uint64_t LargeSizeLimit) -{ - ZEN_ASSERT(m_Mode == EMode::kFull); - ZEN_MEMSCOPE(GetProjectstoreTag()); - - std::vector<size_t> CidChunkIndexes; - std::vector<IoHash> CidChunkHashes; - std::vector<size_t> FileChunkIndexes; - std::vector<std::filesystem::path> FileChunkPaths; - { - RwLock::SharedLockScope OplogLock(m_OplogLock); - for (size_t ChunkIndex = 0; ChunkIndex < ChunkIds.size(); ChunkIndex++) - { - const Oid& ChunkId = ChunkIds[ChunkIndex]; - if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) - { - CidChunkIndexes.push_back(ChunkIndex); - CidChunkHashes.push_back(ChunkIt->second); - } - else if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) - { - CidChunkIndexes.push_back(ChunkIndex); - CidChunkHashes.push_back(ChunkIt->second); - } - else if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) - { - FileChunkIndexes.push_back(ChunkIndex); - FileChunkPaths.emplace_back(ProjectRootDir / FileIt->second.ServerPath); - } - } - } - if (OptionalWorkerPool) - { - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); - try - { - for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) - { - if (AbortFlag) - { - break; - } - Work.ScheduleWork( - *OptionalWorkerPool, - [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback]( - std::atomic<bool>& AbortFlag) { - if (AbortFlag) - { - return; - } - size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; - const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; - try - { - IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); - if (!Payload) - { - ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[FileChunkIndex], FilePath); - } - - if (!AsyncCallback(FileChunkIndex, - Payload, - IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0)) - { - AbortFlag.store(true); - } - } - catch (const std::exception& Ex) - { - ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'", - m_OuterProjectId, - m_OplogId, - FileChunkIndex, - FilePath, - Ex.what()); - } - }); - } - - if (!CidChunkHashes.empty() && !AbortFlag) - { - m_CidStore.IterateChunks( - CidChunkHashes, - [&](size_t Index, const IoBuffer& Payload) { - size_t CidChunkIndex = CidChunkIndexes[Index]; - if (AbortFlag) - { - return false; - } - return AsyncCallback(CidChunkIndex, - Payload, - IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0); - }, - OptionalWorkerPool, - LargeSizeLimit); - } - } - catch (const std::exception& Ex) - { - AbortFlag.store(true); - ZEN_WARN("Failed iterating oplog chunks in {}. Reason: '{}'", m_BasePath, Ex.what()); - } - - Work.Wait(); - - return !AbortFlag; - } - else - { - if (!CidChunkHashes.empty()) - { - m_CidStore.IterateChunks( - CidChunkHashes, - [&](size_t Index, const IoBuffer& Payload) { - size_t CidChunkIndex = CidChunkIndexes[Index]; - return AsyncCallback(CidChunkIndex, Payload, IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0); - }, - OptionalWorkerPool, - LargeSizeLimit); - } - - for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) - { - size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; - const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; - IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); - if (Payload) - { - bool Result = AsyncCallback(FileChunkIndex, Payload, IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0); - if (!Result) - { - return false; - } - } - } - } - return true; -} - -IoBuffer -ProjectStore::Oplog::FindChunk(const std::filesystem::path& ProjectRootDir, const Oid& ChunkId, uint64_t* OptOutModificationTag) -{ - ZEN_ASSERT(m_Mode == EMode::kFull); - - RwLock::SharedLockScope OplogLock(m_OplogLock); - if (!m_Storage) - { - return IoBuffer{}; - } - - if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) - { - IoHash ChunkHash = ChunkIt->second; - OplogLock.ReleaseNow(); - - IoBuffer Result = m_CidStore.FindChunkByCid(ChunkHash); - if (Result && OptOutModificationTag != nullptr) - { - *OptOutModificationTag = GetModificationTagFromRawHash(ChunkHash); - } - return Result; - } - - if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) - { - std::filesystem::path FilePath = ProjectRootDir / FileIt->second.ServerPath; - - OplogLock.ReleaseNow(); - - IoBuffer Result = IoBufferBuilder::MakeFromFile(FilePath); - if (!Result) - { - ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkId, FilePath); - } - else if (OptOutModificationTag != nullptr) - { - *OptOutModificationTag = GetModificationTagFromModificationTime(Result); - } - return Result; - } - - if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) - { - IoHash ChunkHash = MetaIt->second; - OplogLock.ReleaseNow(); - - IoBuffer Result = m_CidStore.FindChunkByCid(ChunkHash); - if (Result && OptOutModificationTag != nullptr) - { - *OptOutModificationTag = GetModificationTagFromRawHash(ChunkHash); - } - return Result; - } - - return {}; -} - -std::vector<ProjectStore::Oplog::ChunkInfo> -ProjectStore::Oplog::GetAllChunksInfo(const std::filesystem::path& ProjectRootDir) -{ - ZEN_ASSERT(m_Mode == EMode::kFull); - - ZEN_MEMSCOPE(GetProjectstoreTag()); - - // First just capture all the chunk ids - - std::vector<ChunkInfo> InfoArray; - - { - RwLock::SharedLockScope _(m_OplogLock); - - if (m_Storage) - { - const size_t NumEntries = m_FileMap.size() + m_ChunkMap.size(); - - InfoArray.reserve(NumEntries); - - for (const auto& Kv : m_FileMap) - { - InfoArray.push_back({.ChunkId = Kv.first}); - } - - for (const auto& Kv : m_ChunkMap) - { - InfoArray.push_back({.ChunkId = Kv.first}); - } - } - } - - for (ChunkInfo& Info : InfoArray) - { - if (IoBuffer Chunk = FindChunk(ProjectRootDir, Info.ChunkId, nullptr)) - { - Info.ChunkSize = Chunk.GetSize(); - } - } - - return InfoArray; -} - -void -ProjectStore::Oplog::IterateChunkMap(std::function<void(const Oid&, const IoHash&)>&& Fn) -{ - ZEN_ASSERT(m_Mode == EMode::kFull); - - RwLock::SharedLockScope _(m_OplogLock); - if (!m_Storage) - { - return; - } - - for (const auto& Kv : m_ChunkMap) - { - Fn(Kv.first, Kv.second); - } -} - -void -ProjectStore::Oplog::IterateFileMap( - std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn) -{ - ZEN_ASSERT(m_Mode == EMode::kFull); - - RwLock::SharedLockScope _(m_OplogLock); - if (!m_Storage) - { - return; - } - - for (const auto& Kv : m_FileMap) - { - Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath); - } -} - -void -ProjectStore::Oplog::IterateOplog(std::function<void(CbObjectView)>&& Handler, const Paging& EntryPaging) -{ - RwLock::SharedLockScope _(m_OplogLock); - IterateOplogLocked(std::move(Handler), EntryPaging); -} - -std::vector<ProjectStore::Oplog::PayloadIndex> -ProjectStore::Oplog::GetSortedOpPayloadRangeLocked(const Paging& EntryPaging, - tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher>* OutOptionalReverseKeyMap) -{ - std::pair<int32_t, int32_t> StartAndEnd = GetPagedRange(int32_t(m_OpToPayloadOffsetMap.size()), EntryPaging); - if (StartAndEnd.first == StartAndEnd.second) - { - return {}; - } - - const int32_t ReplayCount = StartAndEnd.second - StartAndEnd.first; - - auto Start = m_OpToPayloadOffsetMap.cbegin(); - std::advance(Start, StartAndEnd.first); - - auto End = Start; - std::advance(End, ReplayCount); - - std::vector<PayloadIndex> ReplayOrder; - ReplayOrder.reserve(ReplayCount); - - for (auto It = Start; It != End; It++) - { - const PayloadIndex PayloadOffset = It->second; - if (OutOptionalReverseKeyMap != nullptr) - { - OutOptionalReverseKeyMap->insert_or_assign(PayloadOffset, It->first); - } - ReplayOrder.push_back(PayloadOffset); - } - - std::sort(ReplayOrder.begin(), ReplayOrder.end(), [this](const PayloadIndex Lhs, const PayloadIndex Rhs) { - const OplogEntryAddress& LhsEntry = m_OpLogPayloads[Lhs].Address; - const OplogEntryAddress& RhsEntry = m_OpLogPayloads[Rhs].Address; - return LhsEntry.Offset < RhsEntry.Offset; - }); - - return ReplayOrder; -} - -void -ProjectStore::Oplog::IterateOplogLocked(std::function<void(CbObjectView)>&& Handler, const Paging& EntryPaging) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::Oplog::IterateOplogLocked"); - if (!m_Storage) - { - return; - } - - std::vector<PayloadIndex> ReplayOrder = GetSortedOpPayloadRangeLocked(EntryPaging, nullptr); - if (!ReplayOrder.empty()) - { - m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber, CbObjectView Op) { Handler(Op); }); - } -} - -void -ProjectStore::Oplog::IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Handler) -{ - IterateOplogWithKey(std::move(Handler), Paging{}); -} - -void -ProjectStore::Oplog::IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Handler, - const Paging& EntryPaging) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher> ReverseKeyMap; - std::vector<PayloadIndex> ReplayOrder; - - { - RwLock::SharedLockScope _(m_OplogLock); - if (m_Storage) - { - ReplayOrder = GetSortedOpPayloadRangeLocked(EntryPaging, &ReverseKeyMap); - if (!ReplayOrder.empty()) - { - uint32_t EntryIndex = 0; - m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, CbObjectView Op) { - const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex]; - Handler(Lsn, ReverseKeyMap.at(PayloadOffset), Op); - EntryIndex++; - }); - } - } - } -} - -static constexpr uint32_t OplogMetaDataExpectedMagic = 0x6f'74'6d'62; // 'omta'; - -void -ProjectStore::Oplog::GetAttachmentsLocked(std::vector<IoHash>& OutAttachments, bool StoreMetaDataOnDisk) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::Oplog::GetAttachmentsLocked"); - if (!m_Storage) - { - return; - } - - if (StoreMetaDataOnDisk && m_MetaValid) - { - IoBuffer MetadataPayload = IoBufferBuilder::MakeFromFile(m_MetaPath); - if (MetadataPayload) - { - ZEN_TRACE_CPU("Store::Oplog::GetAttachmentsFromMetaData"); - if (GetAttachmentsFromMetaData<Oid, IoHash>( - MetadataPayload, - OplogMetaDataExpectedMagic, - [&](std::span<const Oid> Keys, std::span<const uint32_t> AttachmentCounts, std::span<const IoHash> Attachments) { - ZEN_UNUSED(Keys); - ZEN_UNUSED(AttachmentCounts); - OutAttachments.insert(OutAttachments.end(), Attachments.begin(), Attachments.end()); - })) - { - return; - } - } - } - - std::vector<Oid> Keys; - std::vector<uint32_t> AttachmentCounts; - size_t AttachmentOffset = OutAttachments.size(); - - IterateOplogLocked( - [&](CbObjectView Op) { - using namespace std::literals; - size_t CurrentAttachmentCount = OutAttachments.size(); - Op.IterateAttachments([&](CbFieldView Visitor) { OutAttachments.emplace_back(Visitor.AsAttachment()); }); - if (StoreMetaDataOnDisk) - { - const Oid KeyHash = ComputeOpKey(Op); - Keys.push_back(KeyHash); - AttachmentCounts.push_back(gsl::narrow<uint32_t>(OutAttachments.size() - CurrentAttachmentCount)); - } - }, - Paging{}); - if (StoreMetaDataOnDisk) - { - const IoHash* FirstAttachment = OutAttachments.data() + AttachmentOffset; - size_t AttachmentCount = OutAttachments.size() - AttachmentOffset; - IoBuffer MetaPayload = BuildReferenceMetaData<Oid>(OplogMetaDataExpectedMagic, - Keys, - AttachmentCounts, - std::span<const IoHash>(FirstAttachment, AttachmentCount)) - .Flatten() - .AsIoBuffer(); - - const std::filesystem::path MetaPath = m_MetaPath; - std::error_code Ec; - TemporaryFile::SafeWriteFile(MetaPath, MetaPayload.GetView(), Ec); - if (Ec) - { - m_MetaValid = false; - ZEN_WARN("oplog '{}/{}': unable to set meta data meta path: '{}'. Reason: '{}'", - m_OuterProjectId, - m_OplogId, - MetaPath, - Ec.message()); - } - else - { - m_MetaValid = true; - } - } -} - -size_t -ProjectStore::Oplog::GetOplogEntryCount() const -{ - RwLock::SharedLockScope _(m_OplogLock); - if (!m_Storage) - { - return 0; - } - return m_OpToPayloadOffsetMap.size(); -} - -ProjectStore::LogSequenceNumber -ProjectStore::Oplog::GetOpIndexByKey(const Oid& Key) -{ - RwLock::SharedLockScope _(m_OplogLock); - if (!m_Storage) - { - return {}; - } - if (const auto LatestOp = m_OpToPayloadOffsetMap.find(Key); LatestOp != m_OpToPayloadOffsetMap.end()) - { - return m_OpLogPayloads[LatestOp->second].Lsn; - } - return {}; -} - -std::optional<CbObject> -ProjectStore::Oplog::GetOpByKey(const Oid& Key) -{ - RwLock::SharedLockScope _(m_OplogLock); - if (!m_Storage) - { - return {}; - } - - if (const auto LatestOp = m_OpToPayloadOffsetMap.find(Key); LatestOp != m_OpToPayloadOffsetMap.end()) - { - return m_Storage->GetOp(m_OpLogPayloads[LatestOp->second].Address); - } - - return {}; -} - -std::optional<CbObject> -ProjectStore::Oplog::GetOpByIndex(ProjectStore::LogSequenceNumber Index) -{ - { - RwLock::SharedLockScope _(m_OplogLock); - if (!m_Storage) - { - return {}; - } - - if (m_LsnToPayloadOffsetMap) - { - if (auto It = m_LsnToPayloadOffsetMap->find(Index); It != m_LsnToPayloadOffsetMap->end()) - { - return m_Storage->GetOp(m_OpLogPayloads[It->second].Address); - } - } - } - - RwLock::ExclusiveLockScope Lock(m_OplogLock); - if (!m_Storage) - { - return {}; - } - - RefreshLsnToPayloadOffsetMap(Lock); - if (auto It = m_LsnToPayloadOffsetMap->find(Index); It != m_LsnToPayloadOffsetMap->end()) - { - return m_Storage->GetOp(m_OpLogPayloads[It->second].Address); - } - return {}; -} - -void -ProjectStore::Oplog::CaptureAddedAttachments(std::span<const IoHash> AttachmentHashes) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - m_OplogLock.WithExclusiveLock([this, AttachmentHashes]() { - if (m_CapturedAttachments) - { - m_CapturedAttachments->reserve(m_CapturedAttachments->size() + AttachmentHashes.size()); - m_CapturedAttachments->insert(m_CapturedAttachments->end(), AttachmentHashes.begin(), AttachmentHashes.end()); - } - }); -} - -void -ProjectStore::Oplog::EnableUpdateCapture() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - m_OplogLock.WithExclusiveLock([&]() { - if (m_UpdateCaptureRefCounter == 0) - { - ZEN_ASSERT(!m_CapturedOps); - ZEN_ASSERT(!m_CapturedAttachments); - m_CapturedOps = std::make_unique<std::vector<Oid>>(); - m_CapturedAttachments = std::make_unique<std::vector<IoHash>>(); - } - else - { - ZEN_ASSERT(m_CapturedOps); - ZEN_ASSERT(m_CapturedAttachments); - } - m_UpdateCaptureRefCounter++; - }); -} - -void -ProjectStore::Oplog::DisableUpdateCapture() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - m_OplogLock.WithExclusiveLock([&]() { - ZEN_ASSERT(m_CapturedOps); - ZEN_ASSERT(m_CapturedAttachments); - ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); - m_UpdateCaptureRefCounter--; - if (m_UpdateCaptureRefCounter == 0) - { - m_CapturedOps.reset(); - m_CapturedAttachments.reset(); - } - }); -} - -void -ProjectStore::Oplog::IterateCapturedOpsLocked( - std::function<bool(const Oid& Key, ProjectStore::LogSequenceNumber LSN, const CbObjectView& UpdateOp)>&& Callback) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - if (m_CapturedOps) - { - if (!m_Storage) - { - return; - } - for (const Oid& OpKey : *m_CapturedOps) - { - if (const auto AddressEntryIt = m_OpToPayloadOffsetMap.find(OpKey); AddressEntryIt != m_OpToPayloadOffsetMap.end()) - { - const OplogPayload& OpPayload = m_OpLogPayloads[AddressEntryIt->second]; - Callback(OpKey, OpPayload.Lsn, m_Storage->GetOp(OpPayload.Address)); - } - } - } -} - -std::vector<IoHash> -ProjectStore::Oplog::GetCapturedAttachmentsLocked() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - if (m_CapturedAttachments) - { - return *m_CapturedAttachments; - } - return {}; -} - -std::vector<IoHash> -ProjectStore::Oplog::CheckPendingChunkReferences(std::span<const IoHash> ChunkHashes, const GcClock::Duration& RetainTime) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - m_OplogLock.WithExclusiveLock([&]() { - GcClock::TimePoint Now = GcClock::Now(); - if (m_PendingPrepOpAttachmentsRetainEnd < Now) - { - m_PendingPrepOpAttachments.clear(); - } - m_PendingPrepOpAttachments.insert(ChunkHashes.begin(), ChunkHashes.end()); - GcClock::TimePoint NewEndPoint = Now + RetainTime; - if (m_PendingPrepOpAttachmentsRetainEnd < NewEndPoint) - { - m_PendingPrepOpAttachmentsRetainEnd = NewEndPoint; - } - }); - - std::vector<IoHash> MissingChunks; - MissingChunks.reserve(ChunkHashes.size()); - for (const IoHash& FileHash : ChunkHashes) - { - if (!m_CidStore.ContainsChunk(FileHash)) - { - MissingChunks.push_back(FileHash); - } - } - - return MissingChunks; -} - -void -ProjectStore::Oplog::RemovePendingChunkReferences(std::span<const IoHash> ChunkHashes) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - m_OplogLock.WithExclusiveLock([&]() { - GcClock::TimePoint Now = GcClock::Now(); - if (m_PendingPrepOpAttachmentsRetainEnd < Now) - { - m_PendingPrepOpAttachments.clear(); - } - else - { - for (const IoHash& Chunk : ChunkHashes) - { - m_PendingPrepOpAttachments.erase(Chunk); - } - } - }); -} - -std::vector<IoHash> -ProjectStore::Oplog::GetPendingChunkReferencesLocked() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - std::vector<IoHash> Result; - Result.reserve(m_PendingPrepOpAttachments.size()); - Result.insert(Result.end(), m_PendingPrepOpAttachments.begin(), m_PendingPrepOpAttachments.end()); - GcClock::TimePoint Now = GcClock::Now(); - if (m_PendingPrepOpAttachmentsRetainEnd < Now) - { - m_PendingPrepOpAttachments.clear(); - } - return Result; -} - -void -ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, - const Oid& FileId, - const IoHash& Hash, - std::string_view ServerPath, - std::string_view ClientPath) -{ - FileMapEntry Entry; - - if (Hash != IoHash::Zero) - { - m_ChunkMap.insert_or_assign(FileId, Hash); - } - else - { - Entry.ServerPath = ServerPath; - } - - Entry.ClientPath = ClientPath; - - m_FileMap[FileId] = std::move(Entry); -} - -void -ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, const Oid& ChunkId, const IoHash& Hash) -{ - m_ChunkMap.insert_or_assign(ChunkId, Hash); -} - -void -ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, const Oid& ChunkId, const IoHash& Hash) -{ - m_MetaMap.insert_or_assign(ChunkId, Hash); -} - -ProjectStore::Oplog::OplogEntryMapping -ProjectStore::Oplog::GetMapping(CbObjectView Core) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - using namespace std::literals; - - OplogEntryMapping Result; - - // Update chunk id maps - for (CbFieldView Field : Core) - { - std::string_view FieldName = Field.GetName(); - if (FieldName == "package"sv) - { - CbObjectView PackageObj = Field.AsObjectView(); - Oid Id = PackageObj["id"sv].AsObjectId(); - IoHash Hash = PackageObj["data"sv].AsBinaryAttachment(); - Result.Chunks.emplace_back(ChunkMapping{Id, Hash}); - ZEN_DEBUG("oplog {}/{}: package data {} -> {}", m_OuterProjectId, m_OplogId, Id, Hash); - continue; - } - if (FieldName == "bulkdata"sv) - { - CbArrayView BulkDataArray = Field.AsArrayView(); - for (CbFieldView& Entry : BulkDataArray) - { - CbObjectView BulkObj = Entry.AsObjectView(); - Oid Id = BulkObj["id"sv].AsObjectId(); - IoHash Hash = BulkObj["data"sv].AsBinaryAttachment(); - Result.Chunks.emplace_back(ChunkMapping{Id, Hash}); - ZEN_DEBUG("oplog {}/{}: bulkdata {} -> {}", m_OuterProjectId, m_OplogId, Id, Hash); - } - continue; - } - if (FieldName == "packagedata"sv) - { - CbArrayView PackageDataArray = Field.AsArrayView(); - for (CbFieldView& Entry : PackageDataArray) - { - CbObjectView PackageDataObj = Entry.AsObjectView(); - Oid Id = PackageDataObj["id"sv].AsObjectId(); - IoHash Hash = PackageDataObj["data"sv].AsBinaryAttachment(); - Result.Chunks.emplace_back(ChunkMapping{Id, Hash}); - ZEN_DEBUG("oplog {}/{}: package {} -> {}", m_OuterProjectId, m_OplogId, Id, Hash); - } - continue; - } - if (FieldName == "files"sv) - { - CbArrayView FilesArray = Field.AsArrayView(); - Result.Files.reserve(FilesArray.Num()); - for (CbFieldView& Entry : FilesArray) - { - CbObjectView FileObj = Entry.AsObjectView(); - - std::string_view ServerPath = FileObj["serverpath"sv].AsString(); - std::string_view ClientPath = FileObj["clientpath"sv].AsString(); - Oid Id = FileObj["id"sv].AsObjectId(); - IoHash Hash = FileObj["data"sv].AsBinaryAttachment(); - if (ServerPath.empty() && Hash == IoHash::Zero) - { - ZEN_WARN("oplog {}/{}: invalid file for entry '{}', missing both 'serverpath' and 'data' fields", - m_OuterProjectId, - m_OplogId, - Id); - continue; - } - if (ClientPath.empty()) - { - ZEN_WARN("oplog {}/{}: invalid file for entry '{}', missing 'clientpath' field", m_OuterProjectId, m_OplogId, Id); - continue; - } - - Result.Files.emplace_back(FileMapping{Id, Hash, std::string(ServerPath), std::string(ClientPath)}); - ZEN_DEBUG("oplog {}/{}: file {} -> {}, ServerPath: {}, ClientPath: {}", - m_OuterProjectId, - m_OplogId, - Id, - Hash, - ServerPath, - ClientPath); - } - continue; - } - if (FieldName == "meta"sv) - { - CbArrayView MetaArray = Field.AsArrayView(); - Result.Meta.reserve(MetaArray.Num()); - for (CbFieldView& Entry : MetaArray) - { - CbObjectView MetaObj = Entry.AsObjectView(); - Oid Id = MetaObj["id"sv].AsObjectId(); - IoHash Hash = MetaObj["data"sv].AsBinaryAttachment(); - Result.Meta.emplace_back(ChunkMapping{Id, Hash}); - auto NameString = MetaObj["name"sv].AsString(); - ZEN_DEBUG("oplog {}/{}: meta data ({}) {} -> {}", m_OuterProjectId, m_OplogId, NameString, Id, Hash); - } - continue; - } - } - - return Result; -} - -ProjectStore::LogSequenceNumber -ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, - const OplogEntryMapping& OpMapping, - const OplogEntry& OpEntry) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing - // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however - - using namespace std::literals; - - // Update chunk id maps - for (const ChunkMapping& Chunk : OpMapping.Chunks) - { - AddChunkMapping(OplogLock, Chunk.Id, Chunk.Hash); - } - - for (const FileMapping& File : OpMapping.Files) - { - AddFileMapping(OplogLock, File.Id, File.Hash, File.ServerPath, File.ClientPath); - } - - for (const ChunkMapping& Meta : OpMapping.Meta) - { - AddMetaMapping(OplogLock, Meta.Id, Meta.Hash); - } - - const PayloadIndex PayloadOffset(m_OpLogPayloads.size()); - m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset); - m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress}); - - if (m_LsnToPayloadOffsetMap) - { - m_LsnToPayloadOffsetMap->insert_or_assign(OpEntry.OpLsn, PayloadOffset); - } - return OpEntry.OpLsn; -} - -ProjectStore::LogSequenceNumber -ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) -{ - ZEN_ASSERT(m_Mode == EMode::kFull); - - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntry"); - - const CbObject& Core = OpPackage.GetObject(); - const ProjectStore::LogSequenceNumber EntryId = AppendNewOplogEntry(Core); - if (!EntryId) - { - // The oplog has been deleted so just drop this - return EntryId; - } - - // Persist attachments after oplog entry so GC won't find attachments without references - - uint64_t AttachmentBytes = 0; - uint64_t NewAttachmentBytes = 0; - - auto Attachments = OpPackage.GetAttachments(); - - if (!Attachments.empty()) - { - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; - std::vector<uint64_t> WriteRawSizes; - - WriteAttachmentBuffers.reserve(Attachments.size()); - WriteRawHashes.reserve(Attachments.size()); - WriteRawSizes.reserve(Attachments.size()); - - for (const auto& Attach : Attachments) - { - ZEN_ASSERT(Attach.IsCompressedBinary()); - - const CompressedBuffer& AttachmentData = Attach.AsCompressedBinary(); - const uint64_t AttachmentSize = AttachmentData.DecodeRawSize(); - WriteAttachmentBuffers.push_back(AttachmentData.GetCompressed().Flatten().AsIoBuffer()); - WriteRawHashes.push_back(Attach.GetHash()); - WriteRawSizes.push_back(AttachmentSize); - AttachmentBytes += AttachmentSize; - } - - std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); - for (size_t Index = 0; Index < InsertResults.size(); Index++) - { - if (InsertResults[Index].New) - { - NewAttachmentBytes += WriteRawSizes[Index]; - } - } - } - - ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId.Number, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes)); - - return EntryId; -} - -RefPtr<ProjectStore::OplogStorage> -ProjectStore::Oplog::GetStorage() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - RefPtr<OplogStorage> Storage; - { - RwLock::SharedLockScope _(m_OplogLock); - Storage = m_Storage; - } - return Storage; -} - -ProjectStore::LogSequenceNumber -ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core) -{ - ZEN_ASSERT(m_Mode == EMode::kFull); - - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntry"); - - using namespace std::literals; - - RefPtr<OplogStorage> Storage = GetStorage(); - if (!Storage) - { - return {}; - } - - OplogEntryMapping Mapping = GetMapping(Core); - OplogStorage::AppendOpData OpData = OplogStorage::GetAppendOpData(Core); - - const OplogEntry OpEntry = Storage->AppendOp(OpData); - - RwLock::ExclusiveLockScope OplogLock(m_OplogLock); - const ProjectStore::LogSequenceNumber EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry); - if (m_CapturedOps) - { - m_CapturedOps->push_back(OpData.KeyHash); - } - m_MetaValid = false; - - return EntryId; -} - -std::vector<ProjectStore::LogSequenceNumber> -ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores) -{ - ZEN_ASSERT(m_Mode == EMode::kFull); - - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntries"); - - using namespace std::literals; - - RefPtr<OplogStorage> Storage = GetStorage(); - if (!Storage) - { - return std::vector<ProjectStore::LogSequenceNumber>(Cores.size(), LogSequenceNumber{}); - } - - size_t OpCount = Cores.size(); - std::vector<OplogEntryMapping> Mappings; - std::vector<OplogStorage::AppendOpData> OpDatas; - Mappings.resize(OpCount); - OpDatas.resize(OpCount); - - for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) - { - const CbObjectView& Core = Cores[OpIndex]; - OpDatas[OpIndex] = OplogStorage::GetAppendOpData(Core); - Mappings[OpIndex] = GetMapping(Core); - } - - std::vector<OplogEntry> OpEntries = Storage->AppendOps(OpDatas); - - std::vector<ProjectStore::LogSequenceNumber> EntryIds; - EntryIds.resize(OpCount); - { - { - RwLock::ExclusiveLockScope OplogLock(m_OplogLock); - if (m_CapturedOps) - { - m_CapturedOps->reserve(m_CapturedOps->size() + OpCount); - } - for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) - { - EntryIds[OpIndex] = RegisterOplogEntry(OplogLock, Mappings[OpIndex], OpEntries[OpIndex]); - if (m_CapturedOps) - { - m_CapturedOps->push_back(OpDatas[OpIndex].KeyHash); - } - } - } - m_MetaValid = false; - } - return EntryIds; -} - -////////////////////////////////////////////////////////////////////////// - -ProjectStore::Project::Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath) -: m_ProjectStore(PrjStore) -, m_CidStore(Store) -, m_OplogStoragePath(BasePath) -, m_LastAccessTimes({std::make_pair(std::string(), GcClock::TickCount())}) -{ -} - -ProjectStore::Project::~Project() -{ - // Only write access times if we have not been explicitly deleted - if (!m_OplogStoragePath.empty()) - { - WriteAccessTimes(); - } -} - -bool -ProjectStore::Project::Exists(const std::filesystem::path& BasePath) -{ - return IsFile(BasePath / "Project.zcb"); -} - -void -ProjectStore::Project::Read() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Project::Read"); - - using namespace std::literals; - - std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv; - - ZEN_DEBUG("project '{}': reading config from '{}'", Identifier, ProjectStateFilePath); - - BasicFile Blob; - Blob.Open(ProjectStateFilePath, BasicFile::Mode::kRead); - - IoBuffer Obj = Blob.ReadAll(); - CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); - - if (ValidationError == CbValidateError::None) - { - CbObject Cfg = LoadCompactBinaryObject(Obj); - - Identifier = std::filesystem::path(Cfg["id"sv].AsU8String()).string(); - RootDir = std::filesystem::path(Cfg["root"sv].AsU8String()).string(); - ProjectRootDir = std::filesystem::path(Cfg["project"sv].AsU8String()).string(); - EngineRootDir = std::filesystem::path(Cfg["engine"sv].AsU8String()).string(); - ProjectFilePath = std::filesystem::path(Cfg["projectfile"sv].AsU8String()).string(); - } - else - { - ZEN_ERROR("validation error {} hit for '{}'", ToString(ValidationError), ProjectStateFilePath); - } - - ReadAccessTimes(); -} - -void -ProjectStore::Project::Write() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Project::Write"); - - using namespace std::literals; - - BinaryWriter Mem; - - CbObjectWriter Cfg; - Cfg << "id"sv << Identifier; - Cfg << "root"sv << PathToUtf8(RootDir); - Cfg << "project"sv << PathToUtf8(ProjectRootDir); - Cfg << "engine"sv << PathToUtf8(EngineRootDir); - Cfg << "projectfile"sv << PathToUtf8(ProjectFilePath); - - Cfg.Save(Mem); - - CreateDirectories(m_OplogStoragePath); - - std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv; - - ZEN_INFO("project '{}': persisting config to '{}'", Identifier, ProjectStateFilePath); - - TemporaryFile::SafeWriteFile(ProjectStateFilePath, Mem.GetView()); -} - -void -ProjectStore::Project::ReadAccessTimes() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - using namespace std::literals; - - std::filesystem::path ProjectAccessTimesFilePath = m_OplogStoragePath / "AccessTimes.zcb"sv; - if (!IsFile(ProjectAccessTimesFilePath)) - { - return; - } - - ZEN_DEBUG("project '{}': reading access times '{}'", Identifier, ProjectAccessTimesFilePath); - - BasicFile Blob; - Blob.Open(ProjectAccessTimesFilePath, BasicFile::Mode::kRead); - - IoBuffer Obj = Blob.ReadAll(); - CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); - - if (ValidationError == CbValidateError::None) - { - CbObject Reader = LoadCompactBinaryObject(Obj); - - uint64_t Count = Reader["count"sv].AsUInt64(0); - if (Count > 0) - { - std::vector<uint64_t> Ticks; - Ticks.reserve(Count); - CbArrayView TicksArray = Reader["ticks"sv].AsArrayView(); - for (CbFieldView& TickView : TicksArray) - { - Ticks.emplace_back(TickView.AsUInt64()); - } - CbArrayView IdArray = Reader["ids"sv].AsArrayView(); - uint64_t Index = 0; - - for (CbFieldView& IdView : IdArray) - { - std::string_view Id = IdView.AsString(); - m_LastAccessTimes.insert_or_assign(std::string(Id), Ticks[Index++]); - } - } - - ////// Legacy format read - { - CbArrayView LastAccessTimes = Reader["lastaccess"sv].AsArrayView(); - for (CbFieldView& Entry : LastAccessTimes) - { - CbObjectView AccessTime = Entry.AsObjectView(); - std::string_view Id = AccessTime["id"sv].AsString(); - GcClock::Tick AccessTick = AccessTime["tick"sv].AsUInt64(); - m_LastAccessTimes.insert_or_assign(std::string(Id), AccessTick); - } - } - } - else - { - ZEN_WARN("project '{}': validation error {} hit for '{}'", Identifier, ToString(ValidationError), ProjectAccessTimesFilePath); - } -} - -void -ProjectStore::Project::WriteAccessTimes() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - using namespace std::literals; - - CbObjectWriter Writer(32 + (m_LastAccessTimes.size() * 16)); - - { - RwLock::SharedLockScope _(m_LastAccessTimesLock); - - Writer.AddInteger("count", gsl::narrow<uint64_t>(m_LastAccessTimes.size())); - Writer.BeginArray("ids"); - - for (const auto& It : m_LastAccessTimes) - { - Writer << It.first; - } - Writer.EndArray(); - Writer.BeginArray("ticks"); - for (const auto& It : m_LastAccessTimes) - { - Writer << gsl::narrow<uint64_t>(It.second); - } - Writer.EndArray(); - } - - CbObject Data = Writer.Save(); - - try - { - CreateDirectories(m_OplogStoragePath); - - std::filesystem::path ProjectAccessTimesFilePath = m_OplogStoragePath / "AccessTimes.zcb"sv; - - ZEN_DEBUG("project '{}': persisting access times for '{}'", Identifier, ProjectAccessTimesFilePath); - - TemporaryFile::SafeWriteFile(ProjectAccessTimesFilePath, Data.GetView()); - } - catch (const std::exception& Err) - { - ZEN_WARN("project '{}': writing access times FAILED, reason: '{}'", Identifier, Err.what()); - } -} - -LoggerRef -ProjectStore::Project::Log() const -{ - return m_ProjectStore->Log(); -} - -std::filesystem::path -ProjectStore::Project::BasePathForOplog(std::string_view OplogId) const -{ - return m_OplogStoragePath / OplogId; -} - -Ref<ProjectStore::Oplog> -ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - RwLock::ExclusiveLockScope _(m_ProjectLock); - - std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); - - try - { - Stopwatch Timer; - - Ref<Oplog> NewLog(new Oplog(Log(), Identifier, OplogId, m_CidStore, OplogBasePath, MarkerPath, Oplog::EMode::kFull)); - m_Oplogs.insert_or_assign(std::string{OplogId}, NewLog); - - NewLog->Write(); - - ZEN_INFO("oplog '{}/{}': created oplog at '{}' in {}", - Identifier, - OplogId, - OplogBasePath, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - - if (m_CapturedOplogs) - { - m_CapturedOplogs->push_back(std::string(OplogId)); - } - - return NewLog; - } - catch (const std::exception&) - { - // In case of failure we need to ensure there's no half constructed entry around - // - // (This is probably already ensured by the try_emplace implementation?) - - m_Oplogs.erase(std::string{OplogId}); - - return {}; - } -} - -Ref<ProjectStore::Oplog> -ProjectStore::Project::ReadOplog(std::string_view OplogId) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::OpenOplog"); - - std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); - - RwLock::SharedLockScope Lock(m_ProjectLock); - - if (auto It = m_Oplogs.find(std::string{OplogId}); It != m_Oplogs.end()) - { - if (Oplog::ExistsAt(OplogBasePath)) - { - return It->second; - } - else - { - return {}; - } - } - - if (Oplog::ExistsAt(OplogBasePath)) - { - Stopwatch Timer; - - Ref<Oplog> ExistingLog(new Oplog(m_ProjectStore->Log(), - Identifier, - OplogId, - m_CidStore, - OplogBasePath, - std::filesystem::path{}, - Oplog::EMode::kBasicReadOnly)); - - ExistingLog->Read(); - Lock.ReleaseNow(); - - ZEN_INFO("oplog '{}/{}': read oplog at '{}' in {}", Identifier, OplogId, OplogBasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - - return ExistingLog; - } - return {}; -} - -Ref<ProjectStore::Oplog> -ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bool VerifyPathOnDisk) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::OpenOplog"); - - { - RwLock::SharedLockScope ProjectLock(m_ProjectLock); - - auto OplogIt = m_Oplogs.find(std::string(OplogId)); - - if (OplogIt != m_Oplogs.end()) - { - bool ReOpen = false; - - if (VerifyPathOnDisk) - { - std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); - - if (!Oplog::ExistsAt(OplogBasePath)) - { - // Somebody deleted the oplog on disk behind our back - ProjectLock.ReleaseNow(); - std::filesystem::path DeletePath; - if (!RemoveOplog(OplogId, DeletePath)) - { - ZEN_WARN("Failed to clean up deleted oplog {}/{}", Identifier, OplogId, OplogBasePath); - } - - ReOpen = true; - } - } - - if (!ReOpen) - { - return OplogIt->second; - } - } - } - - std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); - - RwLock::ExclusiveLockScope Lock(m_ProjectLock); - if (auto It = m_Oplogs.find(std::string{OplogId}); It != m_Oplogs.end()) - { - return It->second; - } - if (Oplog::ExistsAt(OplogBasePath)) - { - try - { - Stopwatch Timer; - - Ref<Oplog> ExistingLog(new Oplog(m_ProjectStore->Log(), - Identifier, - OplogId, - m_CidStore, - OplogBasePath, - std::filesystem::path{}, - Oplog::EMode::kFull)); - - m_Oplogs.insert_or_assign(std::string{OplogId}, ExistingLog); - ExistingLog->Read(); - Lock.ReleaseNow(); - - ZEN_INFO("oplog '{}/{}': opened oplog at '{}' in {}", - Identifier, - OplogId, - OplogBasePath, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - - if (AllowCompact) - { - const uint32_t CompactUnusedThreshold = 50; - ExistingLog->CompactIfUnusedExceeds(/*DryRun*/ false, - CompactUnusedThreshold, - fmt::format("Compact on initial open of oplog {}/{}: ", Identifier, OplogId)); - } - return ExistingLog; - } - catch (const std::exception& Ex) - { - ZEN_WARN("oplog '{}/{}': failed to open oplog at '{}': {}", Identifier, OplogId, OplogBasePath, Ex.what()); - m_Oplogs.erase(std::string{OplogId}); - } - } - - return {}; -} - -void -ProjectStore::Oplog::CompactIfUnusedExceeds(bool DryRun, uint32_t CompactUnusedThreshold, std::string_view LogPrefix) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - RwLock::ExclusiveLockScope OplogLock(m_OplogLock); - if (!m_Storage) - { - return; - } - uint32_t UnusedPercent = GetUnusedSpacePercentLocked(); - if (UnusedPercent >= CompactUnusedThreshold) - { - Compact(OplogLock, - DryRun, - /*RetainLSNs*/ m_Storage->MaxLSN() <= - 0xff000000ul, // If we have less than 16 miln entries left of our LSN range, allow renumbering of LSNs - LogPrefix); - } -} - -bool -ProjectStore::Project::TryUnloadOplog(std::string_view OplogId) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - RwLock::ExclusiveLockScope _(m_ProjectLock); - if (auto OplogIt = m_Oplogs.find(std::string(OplogId)); OplogIt != m_Oplogs.end()) - { - Ref<Oplog>& Oplog = OplogIt->second; - - if (Oplog->CanUnload()) - { - m_Oplogs.erase(OplogIt); - return true; - } - return false; - } - - return false; -} - -bool -ProjectStore::Project::RemoveOplog(std::string_view OplogId, std::filesystem::path& OutDeletePath) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - { - RwLock::ExclusiveLockScope _(m_ProjectLock); - - if (auto OplogIt = m_Oplogs.find(std::string(OplogId)); OplogIt == m_Oplogs.end()) - { - std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); - - if (Oplog::ExistsAt(OplogBasePath)) - { - if (!PrepareDirectoryDelete(OplogBasePath, OutDeletePath)) - { - return false; - } - } - } - else - { - Ref<Oplog>& Oplog = OplogIt->second; - if (!Oplog->PrepareForDelete(OutDeletePath)) - { - return false; - } - m_Oplogs.erase(OplogIt); - } - } - m_LastAccessTimesLock.WithExclusiveLock([&]() { m_LastAccessTimes.erase(std::string(OplogId)); }); - return true; -} - -bool -ProjectStore::Project::DeleteOplog(std::string_view OplogId) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - std::filesystem::path DeletePath; - if (!RemoveOplog(OplogId, DeletePath)) - { - return false; - } - - // Erase content on disk - if (!DeletePath.empty()) - { - if (!OplogStorage::Delete(DeletePath)) - { - ZEN_WARN("oplog '{}/{}': failed to remove old oplog path '{}'", Identifier, OplogId, DeletePath); - return false; - } - } - return true; -} - -std::vector<std::string> -ProjectStore::Project::ScanForOplogs() const -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - RwLock::SharedLockScope _(m_ProjectLock); - - std::vector<std::string> Oplogs; - if (Project::Exists(m_OplogStoragePath)) - { - DirectoryContent DirContent; - GetDirectoryContent(m_OplogStoragePath, DirectoryContentFlags::IncludeDirs, DirContent); - Oplogs.reserve(DirContent.Directories.size()); - for (const std::filesystem::path& DirPath : DirContent.Directories) - { - std::string DirName = PathToUtf8(DirPath.filename()); - if (DirName.starts_with("[dropped]")) - { - continue; - } - if (Oplog::ExistsAt(DirPath)) - { - Oplogs.push_back(DirPath.filename().string()); - } - } - } - - return Oplogs; -} - -void -ProjectStore::Project::IterateOplogs(std::function<void(const RwLock::SharedLockScope&, const Oplog&)>&& Fn) const -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - RwLock::SharedLockScope Lock(m_ProjectLock); - - for (auto& Kv : m_Oplogs) - { - Fn(Lock, *Kv.second); - } -} - -void -ProjectStore::Project::IterateOplogs(std::function<void(const RwLock::SharedLockScope&, Oplog&)>&& Fn) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - RwLock::SharedLockScope Lock(m_ProjectLock); - - for (auto& Kv : m_Oplogs) - { - Fn(Lock, *Kv.second); - } -} - -void -ProjectStore::Project::Flush() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Project::Flush"); - - // We only need to flush oplogs that we have already loaded - IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) { Ops.Flush(); }); - WriteAccessTimes(); -} - -void -ProjectStore::Project::Scrub(ScrubContext& Ctx) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - // Scrubbing needs to check all existing oplogs - std::vector<std::string> OpLogs = ScanForOplogs(); - for (const std::string& OpLogId : OpLogs) - { - OpenOplog(OpLogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); - } - IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) { - if (!IsExpired(GcClock::TimePoint::min(), Ops)) - { - Ops.Scrub(Ctx); - } - }); -} - -uint64_t -ProjectStore::Project::TotalSize(const std::filesystem::path& BasePath) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - using namespace std::literals; - - uint64_t Size = 0; - std::filesystem::path AccessTimesFilePath = BasePath / "AccessTimes.zcb"sv; - if (IsFile(AccessTimesFilePath)) - { - Size += FileSizeFromPath(AccessTimesFilePath); - } - std::filesystem::path ProjectFilePath = BasePath / "Project.zcb"sv; - if (IsFile(ProjectFilePath)) - { - Size += FileSizeFromPath(ProjectFilePath); - } - - return Size; -} - -uint64_t -ProjectStore::Project::TotalSize() const -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - uint64_t Result = TotalSize(m_OplogStoragePath); - { - std::vector<std::string> OpLogs = ScanForOplogs(); - for (const std::string& OpLogId : OpLogs) - { - std::filesystem::path OplogBasePath = BasePathForOplog(OpLogId); - Result += Oplog::TotalSize(OplogBasePath); - } - } - return Result; -} - -bool -ProjectStore::Project::PrepareForDelete(std::filesystem::path& OutDeletePath) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - RwLock::ExclusiveLockScope _(m_ProjectLock); - - for (auto& It : m_Oplogs) - { - It.second->ResetState(); - } - - m_Oplogs.clear(); - - bool Success = PrepareDirectoryDelete(m_OplogStoragePath, OutDeletePath); - if (!Success) - { - return false; - } - m_OplogStoragePath.clear(); - return true; -} - -void -ProjectStore::Project::EnableUpdateCapture() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - m_ProjectLock.WithExclusiveLock([&]() { - if (m_UpdateCaptureRefCounter == 0) - { - ZEN_ASSERT(!m_CapturedOplogs); - m_CapturedOplogs = std::make_unique<std::vector<std::string>>(); - } - else - { - ZEN_ASSERT(m_CapturedOplogs); - } - m_UpdateCaptureRefCounter++; - }); -} - -void -ProjectStore::Project::DisableUpdateCapture() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - m_ProjectLock.WithExclusiveLock([&]() { - ZEN_ASSERT(m_CapturedOplogs); - ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); - m_UpdateCaptureRefCounter--; - if (m_UpdateCaptureRefCounter == 0) - { - m_CapturedOplogs.reset(); - } - }); -} - -std::vector<std::string> -ProjectStore::Project::GetCapturedOplogsLocked() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - if (m_CapturedOplogs) - { - return *m_CapturedOplogs; - } - return {}; -} - -std::vector<RwLock::SharedLockScope> -ProjectStore::Project::GetGcReferencerLocks() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - std::vector<RwLock::SharedLockScope> Locks; - Locks.emplace_back(RwLock::SharedLockScope(m_ProjectLock)); - Locks.reserve(1 + m_Oplogs.size()); - for (auto& Kv : m_Oplogs) - { - Locks.emplace_back(Kv.second->GetGcReferencerLock()); - } - return Locks; -} - -bool -ProjectStore::Project::IsExpired(const std::string& EntryName, - const std::filesystem::path& MarkerPath, - const GcClock::TimePoint ExpireTime) const -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - if (!MarkerPath.empty()) - { - std::error_code Ec; - if (IsFile(MarkerPath, Ec)) - { - if (Ec) - { - ZEN_WARN("{} '{}{}{}', Failed to check expiry via marker file '{}', assuming not expired", - EntryName.empty() ? "project"sv : "oplog"sv, - Identifier, - EntryName.empty() ? ""sv : "/"sv, - EntryName, - MarkerPath.string()); - return false; - } - return false; - } - } - - const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); - - RwLock::SharedLockScope _(m_LastAccessTimesLock); - if (auto It = m_LastAccessTimes.find(EntryName); It != m_LastAccessTimes.end()) - { - if (It->second <= ExpireTicks) - { - return true; - } - } - return false; -} - -bool -ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime) const -{ - return IsExpired(std::string(), ProjectFilePath, ExpireTime); -} - -bool -ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog) const -{ - return IsExpired(Oplog.OplogId(), Oplog.MarkerPath(), ExpireTime); -} - -bool -ProjectStore::Project::IsOplogTouchedSince(const GcClock::TimePoint TouchTime, std::string_view Oplog) const -{ - const GcClock::Tick TouchTicks = TouchTime.time_since_epoch().count(); - - RwLock::ExclusiveLockScope _(m_LastAccessTimesLock); - if (auto It = m_LastAccessTimes.find(std::string(Oplog)); It != m_LastAccessTimes.end()) - { - if (It->second > TouchTicks) - { - return true; - } - } - return false; -} - -bool -ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime, std::string_view OplogId) const -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - using namespace std::literals; - - { - RwLock::SharedLockScope Lock(m_ProjectLock); - auto OplogIt = m_Oplogs.find(std::string(OplogId)); - - if (OplogIt != m_Oplogs.end()) - { - Lock.ReleaseNow(); - return IsExpired(ExpireTime, *OplogIt->second); - } - } - - std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); - std::optional<CbObject> OplogConfig = Oplog::ReadStateFile(OplogBasePath, [this]() { return Log(); }); - std::filesystem::path MarkerPath = OplogConfig.has_value() ? OplogConfig.value()["gcpath"sv].AsU8String() : std::u8string(); - return IsExpired(std::string(OplogId), MarkerPath, ExpireTime); -} - -void -ProjectStore::Project::TouchProject() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - RwLock::ExclusiveLockScope _(m_LastAccessTimesLock); - m_LastAccessTimes.insert_or_assign(std::string(), GcClock::TickCount()); -} - -void -ProjectStore::Project::TouchOplog(std::string_view Oplog) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_ASSERT(!Oplog.empty()); - - RwLock::ExclusiveLockScope _(m_LastAccessTimesLock); - m_LastAccessTimes.insert_or_assign(std::string(Oplog), GcClock::TickCount()); -} - -GcClock::TimePoint -ProjectStore::Project::LastOplogAccessTime(std::string_view Oplog) const -{ - RwLock::SharedLockScope _(m_LastAccessTimesLock); - if (auto It = m_LastAccessTimes.find(std::string(Oplog)); It != m_LastAccessTimes.end()) - { - return GcClock::TimePointFromTick(It->second); - } - return GcClock::TimePoint::min(); -} - -////////////////////////////////////////////////////////////////////////// - -ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, const Configuration& Config) -: m_Log(logging::Get("project")) -, m_Gc(Gc) -, m_CidStore(Store) -, m_ProjectBasePath(BasePath) -, m_Config(Config) -, m_DiskWriteBlocker(Gc.GetDiskWriteBlocker()) -{ - ZEN_INFO("initializing project store at '{}'", m_ProjectBasePath); - // m_Log.set_level(spdlog::level::debug); - m_Gc.AddGcStorage(this); - m_Gc.AddGcReferencer(*this); - m_Gc.AddGcReferenceLocker(*this); -} - -ProjectStore::~ProjectStore() -{ - ZEN_INFO("closing project store at '{}'", m_ProjectBasePath); - m_Gc.RemoveGcReferenceLocker(*this); - m_Gc.RemoveGcReferencer(*this); - m_Gc.RemoveGcStorage(this); -} - -std::filesystem::path -ProjectStore::BasePathForProject(std::string_view ProjectId) -{ - return m_ProjectBasePath / ProjectId; -} - -void -ProjectStore::DiscoverProjects() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - if (!IsDir(m_ProjectBasePath)) - { - return; - } - - DirectoryContent DirContent; - GetDirectoryContent(m_ProjectBasePath, DirectoryContentFlags::IncludeDirs, DirContent); - - for (const std::filesystem::path& DirPath : DirContent.Directories) - { - std::string DirName = PathToUtf8(DirPath.filename()); - if (DirName.starts_with("[dropped]")) - { - continue; - } - OpenProject(DirName); - } -} - -void -ProjectStore::IterateProjects(std::function<void(Project& Prj)>&& Fn) -{ - RwLock::SharedLockScope _(m_ProjectsLock); - - for (auto& Kv : m_Projects) - { - Fn(*Kv.second.Get()); - } -} - -void -ProjectStore::Flush() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::Flush"); - - ZEN_INFO("flushing project store at '{}'", m_ProjectBasePath); - std::vector<Ref<Project>> Projects; - { - RwLock::SharedLockScope _(m_ProjectsLock); - Projects.reserve(m_Projects.size()); - - for (auto& Kv : m_Projects) - { - Projects.push_back(Kv.second); - } - } - - WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); - try - { - for (const Ref<Project>& Project : Projects) - { - Work.ScheduleWork( - WorkerPool, - [this, Project](std::atomic<bool>&) { - try - { - Project->Flush(); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what()); - } - }, - 0); - } - } - catch (const std::exception& Ex) - { - AbortFlag.store(true); - ZEN_WARN("Failed projects in {}. Reason: '{}'", m_ProjectBasePath, Ex.what()); - } - - Work.Wait(); -} - -void -ProjectStore::ScrubStorage(ScrubContext& Ctx) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_INFO("scrubbing '{}'", m_ProjectBasePath); - - DiscoverProjects(); - - std::vector<Ref<Project>> Projects; - { - RwLock::SharedLockScope Lock(m_ProjectsLock); - Projects.reserve(m_Projects.size()); - - for (auto& Kv : m_Projects) - { - if (Kv.second->IsExpired(GcClock::TimePoint::min())) - { - continue; - } - Projects.push_back(Kv.second); - } - } - for (const Ref<Project>& Project : Projects) - { - Project->Scrub(Ctx); - } -} - -GcStorageSize -ProjectStore::StorageSize() const -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::StorageSize"); - - using namespace std::literals; - - GcStorageSize Result; - { - if (IsDir(m_ProjectBasePath)) - { - DirectoryContent ProjectsFolderContent; - GetDirectoryContent(m_ProjectBasePath, DirectoryContentFlags::IncludeDirs, ProjectsFolderContent); - - for (const std::filesystem::path& ProjectBasePath : ProjectsFolderContent.Directories) - { - std::filesystem::path ProjectStateFilePath = ProjectBasePath / "Project.zcb"sv; - if (IsFile(ProjectStateFilePath)) - { - Result.DiskSize += Project::TotalSize(ProjectBasePath); - DirectoryContent DirContent; - GetDirectoryContent(ProjectBasePath, DirectoryContentFlags::IncludeDirs, DirContent); - for (const std::filesystem::path& OplogBasePath : DirContent.Directories) - { - Result.DiskSize += Oplog::TotalSize(OplogBasePath); - } - } - } - } - } - return Result; -} - -Ref<ProjectStore::Project> -ProjectStore::OpenProject(std::string_view ProjectId) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::OpenProject"); - - { - RwLock::SharedLockScope _(m_ProjectsLock); - if (auto ProjIt = m_Projects.find(std::string{ProjectId}); ProjIt != m_Projects.end()) - { - return ProjIt->second; - } - } - - RwLock::ExclusiveLockScope _(m_ProjectsLock); - if (auto ProjIt = m_Projects.find(std::string{ProjectId}); ProjIt != m_Projects.end()) - { - return ProjIt->second; - } - - std::filesystem::path BasePath = BasePathForProject(ProjectId); - - if (Project::Exists(BasePath)) - { - try - { - ZEN_INFO("project '{}': opening project at '{}'", ProjectId, BasePath); - - Ref<Project>& Prj = - m_Projects - .try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath))) - .first->second; - Prj->Identifier = ProjectId; - Prj->Read(); - return Prj; - } - catch (const std::exception& e) - { - ZEN_WARN("project '{}': failed to open at {} ({})", ProjectId, BasePath, e.what()); - m_Projects.erase(std::string{ProjectId}); - } - } - - return {}; -} - -Ref<ProjectStore::Project> -ProjectStore::NewProject(const std::filesystem::path& BasePath, - std::string_view ProjectId, - const std::filesystem::path& RootDir, - const std::filesystem::path& EngineRootDir, - const std::filesystem::path& ProjectRootDir, - const std::filesystem::path& ProjectFilePath) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::NewProject"); - - RwLock::ExclusiveLockScope _(m_ProjectsLock); - - ZEN_INFO("project '{}': creating project at '{}'", ProjectId, BasePath); - - Ref<Project>& Prj = - m_Projects.try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath))) - .first->second; - Prj->Identifier = ProjectId; - Prj->RootDir = RootDir; - Prj->EngineRootDir = EngineRootDir; - Prj->ProjectRootDir = ProjectRootDir; - Prj->ProjectFilePath = ProjectFilePath; - Prj->Write(); - - if (m_CapturedProjects) - { - m_CapturedProjects->push_back(std::string(ProjectId)); - } - - return Prj; -} - -bool -ProjectStore::UpdateProject(std::string_view ProjectId, - const std::filesystem::path& RootDir, - const std::filesystem::path& EngineRootDir, - const std::filesystem::path& ProjectRootDir, - const std::filesystem::path& ProjectFilePath) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::UpdateProject"); - - RwLock::ExclusiveLockScope ProjectsLock(m_ProjectsLock); - - auto ProjIt = m_Projects.find(std::string{ProjectId}); - - if (ProjIt == m_Projects.end()) - { - return false; - } - Ref<ProjectStore::Project> Prj = ProjIt->second; - - Prj->RootDir = RootDir; - Prj->EngineRootDir = EngineRootDir; - Prj->ProjectRootDir = ProjectRootDir; - Prj->ProjectFilePath = ProjectFilePath; - Prj->Write(); - - ZEN_INFO("project '{}': updated", ProjectId); - - return true; -} - -bool -ProjectStore::RemoveProject(std::string_view ProjectId, std::filesystem::path& OutDeletePath) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - RwLock::ExclusiveLockScope ProjectsLock(m_ProjectsLock); - - auto ProjIt = m_Projects.find(std::string{ProjectId}); - - if (ProjIt == m_Projects.end()) - { - return true; - } - - bool Success = ProjIt->second->PrepareForDelete(OutDeletePath); - - if (!Success) - { - return false; - } - m_Projects.erase(ProjIt); - return true; -} - -bool -ProjectStore::DeleteProject(std::string_view ProjectId) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::DeleteProject"); - - ZEN_INFO("project '{}': deleting", ProjectId); - - std::filesystem::path DeletePath; - if (!RemoveProject(ProjectId, DeletePath)) - { - return false; - } - - if (!DeletePath.empty()) - { - if (!DeleteDirectories(DeletePath)) - { - ZEN_WARN("project '{}': failed to remove old project path '{}'", ProjectId, DeletePath); - return false; - } - } - - return true; -} - -bool -ProjectStore::Exists(std::string_view ProjectId) -{ - return Project::Exists(BasePathForProject(ProjectId)); -} - -CbArray -ProjectStore::GetProjectsList() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::GetProjectsList"); - - using namespace std::literals; - - DiscoverProjects(); - - CbWriter Response; - Response.BeginArray(); - - IterateProjects([&Response](Project& Prj) { - Response.BeginObject(); - Response << "Id"sv << Prj.Identifier; - Response << "RootDir"sv << Prj.RootDir.string(); - Response << "ProjectRootDir"sv << PathToUtf8(Prj.ProjectRootDir); - Response << "EngineRootDir"sv << PathToUtf8(Prj.EngineRootDir); - Response << "ProjectFilePath"sv << PathToUtf8(Prj.ProjectFilePath); - Response.EndObject(); - }); - Response.EndArray(); - return Response.Save().AsArray(); -} - -CbObject -ProjectStore::GetProjectFiles(LoggerRef InLog, Project& Project, Oplog& Oplog, const std::unordered_set<std::string>& WantedFieldNames) -{ - auto Log = [&InLog]() { return InLog; }; - - using namespace std::literals; - - const bool WantsAllFields = WantedFieldNames.empty(); - - const bool WantsIdField = WantsAllFields || WantedFieldNames.contains("id"); - const bool WantsClientPathField = WantsAllFields || WantedFieldNames.contains("clientpath"); - const bool WantsServerPathField = WantsAllFields || WantedFieldNames.contains("serverpath"); - const bool WantsRawSizeField = WantsAllFields || WantedFieldNames.contains("rawsize"); - const bool WantsSizeField = WantsAllFields || WantedFieldNames.contains("size"); - - std::vector<Oid> Ids; - std::vector<std::string> ServerPaths; - std::vector<std::string> ClientPaths; - std::vector<uint64_t> Sizes; - std::vector<uint64_t> RawSizes; - - size_t Count = 0; - Oplog.IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) { - if (WantsIdField || WantsRawSizeField || WantsSizeField) - { - Ids.push_back(Id); - } - if (WantsServerPathField) - { - ServerPaths.push_back(std::string(ServerPath)); - } - if (WantsClientPathField) - { - ClientPaths.push_back(std::string(ClientPath)); - } - Count++; - }); - if (WantsRawSizeField || WantsSizeField) - { - if (WantsSizeField) - { - Sizes.resize(Ids.size(), (uint64_t)-1); - } - if (WantsRawSizeField) - { - RawSizes.resize(Ids.size(), (uint64_t)-1); - } - - Oplog.IterateChunks( - Project.RootDir, - Ids, - false, - [&](size_t Index, const IoBuffer& Payload, uint64_t /*ModTag*/) { - try - { - if (Payload) - { - if (Payload.GetContentType() == ZenContentType::kCompressedBinary) - { - if (WantsRawSizeField) - { - IoHash _; - if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index])) - { - if (WantsSizeField) - { - Sizes[Index] = Payload.GetSize(); - } - } - else - { - ZEN_WARN("oplog '{}/{}': payload for project file info for id {} is not a valid compressed binary.", - Project.Identifier, - Oplog.OplogId(), - Ids[Index]); - } - } - else if (WantsSizeField) - { - Sizes[Index] = Payload.GetSize(); - } - } - else - { - if (WantsSizeField) - { - Sizes[Index] = Payload.GetSize(); - } - if (WantsRawSizeField) - { - RawSizes[Index] = Payload.GetSize(); - } - } - } - else - { - ZEN_WARN("oplog '{}/{}': failed getting payload for project file info for id {}.", - Project.Identifier, - Oplog.OplogId(), - Ids[Index]); - } - } - catch (const std::exception& Ex) - { - ZEN_WARN("oplog '{}/{}': failed getting project file info for id {}. Reason: '{}'", - Project.Identifier, - Oplog.OplogId(), - Ids[Index], - Ex.what()); - } - return true; - }, - &GetSmallWorkerPool(EWorkloadType::Burst), - 256u * 1024u); - } - - CbObjectWriter Response(64u + Count * ((WantsIdField ? (5 + sizeof(Oid::OidBits)) : 0) + (WantsServerPathField ? 64 : 0) + - (WantsClientPathField ? 64 : 0) + (WantsSizeField ? 16 : 0) + (WantsRawSizeField ? 16 : 0))); - Response.BeginArray("files"sv); - for (size_t Index = 0; Index < Count; Index++) - { - Response.BeginObject(); - if (WantsIdField) - { - Response << "id"sv << Ids[Index]; - } - if (WantsServerPathField) - { - Response << "serverpath"sv << ServerPaths[Index]; - } - if (WantsClientPathField) - { - Response << "clientpath"sv << ClientPaths[Index]; - } - if (WantsSizeField && Sizes[Index] != (uint64_t)-1) - { - Response << "size"sv << Sizes[Index]; - } - if (WantsRawSizeField && RawSizes[Index] != (uint64_t)-1) - { - Response << "rawsize"sv << RawSizes[Index]; - } - Response.EndObject(); - } - Response.EndArray(); - - return Response.Save(); -} - -CbObject -ProjectStore::GetProjectChunkInfos(LoggerRef InLog, Project& Project, Oplog& Oplog, const std::unordered_set<std::string>& WantedFieldNames) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("ProjectStore::GetProjectChunkInfos"); - - auto Log = [&InLog]() { return InLog; }; - - using namespace std::literals; - - const bool WantsAllFields = WantedFieldNames.empty(); - - const bool WantsIdField = WantsAllFields || WantedFieldNames.contains("id"); - const bool WantsRawHashField = WantsAllFields || WantedFieldNames.contains("rawhash"); - const bool WantsRawSizeField = WantsAllFields || WantedFieldNames.contains("rawsize"); - const bool WantsSizeField = WantsAllFields || WantedFieldNames.contains("size"); - - std::vector<Oid> Ids; - std::vector<IoHash> Hashes; - std::vector<uint64_t> RawSizes; - std::vector<uint64_t> Sizes; - - size_t Count = 0; - size_t EstimatedCount = Oplog.OplogCount(); - if (WantsIdField) - { - Ids.reserve(EstimatedCount); - } - if (WantsRawHashField || WantsRawSizeField || WantsSizeField) - { - Hashes.reserve(EstimatedCount); - } - Oplog.IterateChunkMap([&](const Oid& Id, const IoHash& Hash) { - if (WantsIdField) - { - Ids.push_back(Id); - } - if (WantsRawHashField || WantsRawSizeField || WantsSizeField) - { - Hashes.push_back(Hash); - } - Count++; - }); - - if (WantsRawSizeField || WantsSizeField) - { - if (WantsRawSizeField) - { - RawSizes.resize(Hashes.size(), (uint64_t)-1); - } - if (WantsSizeField) - { - Sizes.resize(Hashes.size(), (uint64_t)-1); - } - - WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); - (void)Oplog.IterateChunks( - Hashes, - false, - [&](size_t Index, const IoBuffer& Payload, uint64_t /*ModTag*/) -> bool { - try - { - if (Payload) - { - if (Payload.GetContentType() == ZenContentType::kCompressedBinary) - { - if (WantsRawSizeField) - { - ZEN_ASSERT_SLOW(RawSizes[Index] == (uint64_t)-1); - IoHash _; - if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index])) - { - if (WantsSizeField) - { - ZEN_ASSERT_SLOW(Sizes[Index] == (uint64_t)-1); - Sizes[Index] = Payload.GetSize(); - } - } - else - { - ZEN_WARN("oplog '{}/{}': payload for project chunk for id {} is not a valid compressed binary.", - Project.Identifier, - Oplog.OplogId(), - Ids[Index]); - } - } - else if (WantsSizeField) - { - ZEN_ASSERT_SLOW(Sizes[Index] == (uint64_t)-1); - Sizes[Index] = Payload.GetSize(); - } - } - else - { - if (WantsSizeField) - { - ZEN_ASSERT_SLOW(Sizes[Index] == (uint64_t)-1); - Sizes[Index] = Payload.GetSize(); - } - if (WantsRawSizeField) - { - ZEN_ASSERT_SLOW(Sizes[Index] == (uint64_t)-1); - RawSizes[Index] = Payload.GetSize(); - } - } - } - else - { - ZEN_WARN("oplog '{}/{}': failed getting payload for chunk for id {}", - Project.Identifier, - Oplog.OplogId(), - Ids[Index]); - } - } - catch (const std::exception& Ex) - { - ZEN_WARN("oplog '{}/{}': failed getting project chunk info for id {}. Reason: '{}'", - Project.Identifier, - Oplog.OplogId(), - Ids[Index], - Ex.what()); - } - return true; - }, - &WorkerPool, - 256u * 1024u); - } - - CbObjectWriter Response(64u + Count * ((WantsIdField ? (5 + sizeof(Oid::OidBits)) : 0) + - (WantsRawHashField ? (10 + sizeof(IoHash::Hash)) : 0) + (WantsSizeField ? 16 : 0) + - (WantsRawSizeField ? 16 : 0))); - Response.BeginArray("chunkinfos"sv); - - for (size_t Index = 0; Index < Count; Index++) - { - Response.BeginObject(); - if (WantsIdField) - { - Response << "id"sv << Ids[Index]; - } - if (WantsRawHashField) - { - Response << "rawhash"sv << Hashes[Index]; - } - if (WantsSizeField && Sizes[Index] != (uint64_t)-1) - { - Response << "size"sv << Sizes[Index]; - } - if (WantsRawSizeField && RawSizes[Index] != (uint64_t)-1) - { - Response << "rawsize"sv << RawSizes[Index]; - } - Response.EndObject(); - } - Response.EndArray(); - - return Response.Save(); -} - -CbObject -ProjectStore::GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, const Oid& ChunkId) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("ProjectStore::GetChunkInfo"); - - using namespace std::literals; - - auto Log = [&InLog]() { return InLog; }; - - IoBuffer Chunk = Oplog.FindChunk(Project.RootDir, ChunkId, nullptr); - if (!Chunk) - { - return {}; - } - - uint64_t ChunkSize = Chunk.GetSize(); - if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) - { - IoHash RawHash; - uint64_t RawSize; - bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize); - if (!IsCompressed) - { - throw std::runtime_error( - fmt::format("Chunk info request for malformed chunk id '{}/{}'/'{}'", Project.Identifier, Oplog.OplogId(), ChunkId)); - } - ChunkSize = RawSize; - } - - CbObjectWriter Response; - Response << "size"sv << ChunkSize; - return Response.Save(); -} - -static ProjectStore::GetChunkRangeResult -ExtractRange(IoBuffer&& Chunk, uint64_t Offset, uint64_t Size, ZenContentType AcceptType) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - ProjectStore::GetChunkRangeResult Result; - - Result.ContentType = Chunk.GetContentType(); - - if (Result.ContentType == ZenContentType::kCompressedBinary) - { - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize); - if (!Compressed) - { - Result.Error = ProjectStore::GetChunkRangeResult::EError::MalformedContent; - Result.ErrorDescription = fmt::format("Malformed payload, not a compressed buffer"); - return Result; - } - - const bool IsFullRange = (Offset == 0) && ((Size == ~(0ull)) || (Size == RawSize)); - - if (IsFullRange) - { - if (AcceptType == ZenContentType::kBinary) - { - Result.Chunk = Compressed.DecompressToComposite(); - Result.ContentType = ZenContentType::kBinary; - } - else - { - Result.Chunk = Compressed.GetCompressed(); - } - Result.RawSize = 0; - } - else - { - if (Size == ~(0ull) || (Offset + Size) > RawSize) - { - if (Offset < RawSize) - { - Size = RawSize - Offset; - } - else - { - Size = 0; - } - } - - if (Size == 0) - { - Result.Error = ProjectStore::GetChunkRangeResult::EError::OutOfRange; - Result.ErrorDescription = - fmt::format("Chunk request for range outside of compressed chunk. Offset: {}, Size: {}, ChunkSize: {}", - Offset, - Size, - RawSize); - return Result; - } - - if (AcceptType == ZenContentType::kBinary) - { - Result.Chunk = CompositeBuffer(Compressed.Decompress(Offset, Size)); - Result.ContentType = ZenContentType::kBinary; - } - else - { - // Value will be a range of compressed blocks that covers the requested range - // The client will have to compensate for any offsets that do not land on an even block size multiple - Result.Chunk = Compressed.GetRange(Offset, Size).GetCompressed(); - } - Result.RawSize = RawSize; - } - Result.RawHash = RawHash; - } - else - { - const uint64_t ChunkSize = Chunk.GetSize(); - - const bool IsFullRange = (Offset == 0) && ((Size == ~(0ull)) || (Size == ChunkSize)); - if (IsFullRange) - { - Result.Chunk = CompositeBuffer(SharedBuffer(std::move(Chunk))); - Result.RawSize = 0; - } - else - { - if (Size == ~(0ull) || (Offset + Size) > ChunkSize) - { - if (Offset < ChunkSize) - { - Size = ChunkSize - Offset; - } - else - { - Size = 0; - } - } - - if (Size == 0) - { - Result.Error = ProjectStore::GetChunkRangeResult::EError::OutOfRange; - Result.ErrorDescription = - fmt::format("Chunk request for range outside of compressed chunk. Offset: {}, Size: {}, ChunkSize: {}", - Offset, - Size, - ChunkSize); - return Result; - } - - Result.Chunk = CompositeBuffer(SharedBuffer(IoBuffer(std::move(Chunk), Offset, Size))); - Result.RawSize = ChunkSize; - } - } - Result.Error = ProjectStore::GetChunkRangeResult::EError::Ok; - return Result; -} - -ProjectStore::GetChunkRangeResult -ProjectStore::GetChunkRange(LoggerRef InLog, - Project& Project, - Oplog& Oplog, - const Oid& ChunkId, - uint64_t Offset, - uint64_t Size, - ZenContentType AcceptType, - uint64_t* OptionalInOutModificationTag) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - ZEN_TRACE_CPU("ProjectStore::GetChunkRange"); - - auto Log = [&InLog]() { return InLog; }; - - uint64_t OldTag = OptionalInOutModificationTag == nullptr ? 0 : *OptionalInOutModificationTag; - IoBuffer Chunk = Oplog.FindChunk(Project.RootDir, ChunkId, OptionalInOutModificationTag); - if (!Chunk) - { - return GetChunkRangeResult{.Error = GetChunkRangeResult::EError::NotFound, - .ErrorDescription = fmt::format("Chunk range request for chunk {}/{}/{} failed, payload not found", - Project.Identifier, - Oplog.OplogId(), - ChunkId)}; - } - if (OptionalInOutModificationTag != nullptr && OldTag == *OptionalInOutModificationTag) - { - return {.Error = GetChunkRangeResult::EError::NotModified}; - } - - return ExtractRange(std::move(Chunk), Offset, Size, AcceptType); -} - -IoBuffer -ProjectStore::GetChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("ProjectStore::GetChunk"); - ZEN_UNUSED(Project, Oplog); - - IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash); - - if (!Chunk) - { - return {}; - } - - Chunk.SetContentType(ZenContentType::kCompressedBinary); - return Chunk; -} - -IoBuffer -ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const Oid& ChunkId) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("ProjectStore::GetChunk"); - - Ref<Project> Project = OpenProject(ProjectId); - if (!Project) - { - return {}; - } - Ref<Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact */ false, /*VerifyPathOnDisk*/ false); - if (!Oplog) - { - return {}; - } - return Oplog->FindChunk(Project->RootDir, ChunkId, /*OptOutModificationTag*/ nullptr); -} - -IoBuffer -ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const IoHash& Cid) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("ProjectStore::GetChunk"); - - Ref<Project> Project = OpenProject(ProjectId); - if (!Project) - { - return {}; - } - Ref<Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact */ false, /*VerifyPathOnDisk*/ false); - if (!Oplog) - { - return {}; - } - return m_CidStore.FindChunkByCid(Cid); -} - -bool -ProjectStore::PutChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash, IoBuffer&& Chunk) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("ProjectStore::PutChunk"); - - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); - if (RawHash != ChunkHash) - { - throw std::runtime_error( - fmt::format("Chunk request for invalid payload format for chunk {}/{}/{}", Project.Identifier, Oplog.OplogId(), ChunkHash)); - } - - Oplog.CaptureAddedAttachments(std::vector<IoHash>{ChunkHash}); - CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk, ChunkHash); - return Result.New; -} - -std::vector<ProjectStore::ChunkResult> -ProjectStore::GetChunks(Project& Project, Oplog& Oplog, std::span<const ChunkRequest> Requests) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("ProjectStore::GetChunks"); - - ZEN_ASSERT(!Requests.empty()); - - std::vector<ProjectStore::ChunkResult> Results; - size_t RequestCount = Requests.size(); - - Results.resize(RequestCount); - - if (RequestCount > 1) - { - std::vector<IoHash> ChunkRawHashes; - std::vector<size_t> ChunkRawHashesRequestIndex; - std::vector<Oid> ChunkIds; - std::vector<size_t> ChunkIdsRequestIndex; - - ChunkRawHashes.reserve(RequestCount); - ChunkRawHashesRequestIndex.reserve(RequestCount); - ChunkIds.reserve(RequestCount); - ChunkIdsRequestIndex.reserve(RequestCount); - - for (size_t RequestIndex = 0; RequestIndex < Requests.size(); RequestIndex++) - { - const ChunkRequest& Request = Requests[RequestIndex]; - if (Request.Id.index() == 0) - { - ChunkRawHashes.push_back(std::get<IoHash>(Request.Id)); - ChunkRawHashesRequestIndex.push_back(RequestIndex); - } - else - { - ChunkIds.push_back(std::get<Oid>(Request.Id)); - ChunkIdsRequestIndex.push_back(RequestIndex); - } - } - - WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); - if (!ChunkRawHashes.empty()) - { - Oplog.IterateChunks( - ChunkRawHashes, - true, - [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool { - if (Payload) - { - size_t RequestIndex = ChunkRawHashesRequestIndex[Index]; - const ChunkRequest& Request = Requests[RequestIndex]; - ChunkResult& Result = Results[RequestIndex]; - Result.Exists = true; - if (!Request.SkipData) - { - Result.ChunkBuffer = std::move(Payload); - Result.ChunkBuffer.MakeOwned(); - } - Result.ModTag = ModTag; - } - return true; - }, - &WorkerPool, - 8u * 1024); - } - if (!ChunkIdsRequestIndex.empty()) - { - Oplog.IterateChunks( - Project.RootDir, - ChunkIds, - true, - [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool { - if (Payload) - { - size_t RequestIndex = ChunkIdsRequestIndex[Index]; - const ChunkRequest& Request = Requests[RequestIndex]; - ChunkResult& Result = Results[RequestIndex]; - Result.Exists = true; - if (!Request.SkipData) - { - Result.ChunkBuffer = std::move(Payload); - Result.ChunkBuffer.MakeOwned(); - } - Result.ModTag = ModTag; - } - return true; - }, - &WorkerPool, - 8u * 1024); - } - } - else - { - const ChunkRequest& Request = Requests.front(); - ChunkResult& Result = Results.front(); - - if (Request.Id.index() == 0) - { - const IoHash& ChunkHash = std::get<IoHash>(Request.Id); - IoBuffer Payload = m_CidStore.FindChunkByCid(ChunkHash); - if (Payload) - { - Result.Exists = true; - Result.ModTag = GetModificationTagFromRawHash(ChunkHash); - if (!Request.SkipData) - { - Result.ChunkBuffer = std::move(Payload); - Result.ChunkBuffer.MakeOwned(); - } - } - } - else - { - const Oid& ChunkId = std::get<Oid>(Request.Id); - uint64_t ModTag = 0; - IoBuffer Payload = Oplog.FindChunk(Project.RootDir, ChunkId, &ModTag); - if (Payload) - { - Result.Exists = true; - Result.ModTag = ModTag; - if (!Request.SkipData) - { - Result.ChunkBuffer = std::move(Payload); - Result.ChunkBuffer.MakeOwned(); - } - } - } - } - return Results; -} - -std::vector<ProjectStore::ChunkRequest> -ProjectStore::ParseChunksRequests(Project& Project, Oplog& Oplog, const CbObject& Cb) -{ - ZEN_TRACE_CPU("Store::Rpc::getchunks"); - - using namespace std::literals; - - std::vector<ChunkRequest> Requests; - - if (auto RequestFieldView = Cb["Request"sv]; RequestFieldView.IsObject()) - { - CbObjectView RequestView = RequestFieldView.AsObjectView(); - bool SkipData = RequestView["SkipData"].AsBool(false); - CbArrayView ChunksArray = RequestView["Chunks"sv].AsArrayView(); - - size_t RequestCount = ChunksArray.Num(); - if (RequestCount > 0) - { - Requests.reserve(RequestCount); - for (CbFieldView FieldView : ChunksArray) - { - CbObjectView ChunkObject = FieldView.AsObjectView(); - ChunkRequest ChunkRequest = {.Offset = ChunkObject["Offset"sv].AsUInt64(0), - .Size = ChunkObject["Size"sv].AsUInt64((uint64_t)-1), - .SkipData = SkipData}; - if (CbFieldView InputModificationTagView = ChunkObject.FindView("ModTag"); InputModificationTagView.IsInteger()) - { - ChunkRequest.ModTag = InputModificationTagView.AsUInt64(); - } - if (CbFieldView RawHashView = ChunkObject.FindView("RawHash"sv); RawHashView.IsHash()) - { - const IoHash ChunkHash = RawHashView.AsHash(); - ChunkRequest.Id = ChunkHash; - } - else if (CbFieldView IdView = ChunkObject.FindView("Oid"sv); IdView.IsObjectId()) - { - const Oid ChunkId = IdView.AsObjectId(); - ChunkRequest.Id = ChunkId; - } - else - { - throw std::runtime_error( - fmt::format("oplog '{}/{}': malformed getchunks rpc request object, chunk request has no identifier", - Project.Identifier, - Oplog.OplogId())); - } - Requests.emplace_back(std::move(ChunkRequest)); - } - } - } - else if (CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); ChunksArray) - { - // Legacy full chunks only by rawhash - - size_t RequestCount = ChunksArray.Num(); - - Requests.reserve(RequestCount); - - std::vector<IoHash> Cids; - Cids.reserve(ChunksArray.Num()); - for (CbFieldView FieldView : ChunksArray) - { - Requests.push_back(ProjectStore::ChunkRequest{.Id = FieldView.AsHash()}); - } - } - else - { - throw std::runtime_error(fmt::format("oplog '{}/{}': malformed getchunks rpc request object", Project.Identifier, Oplog.OplogId())); - } - return Requests; -} - -CbPackage -ProjectStore::WriteChunksRequestResponse(Project& Project, - Oplog& Oplog, - std::vector<ChunkRequest>&& Requests, - std::vector<ChunkResult>&& Results) -{ - using namespace std::literals; - - CbPackage ResponsePackage; - - CbObjectWriter ResponseWriter(32 + Requests.size() * 64u); - ResponseWriter.BeginArray("Chunks"sv); - { - for (size_t Index = 0; Index < Requests.size(); Index++) - { - const ChunkRequest& Request = Requests[Index]; - ChunkResult& Result = Results[Index]; - if (Result.Exists) - { - ResponseWriter.BeginObject(); - { - if (Request.Id.index() == 0) - { - const IoHash& RawHash = std::get<IoHash>(Request.Id); - ResponseWriter.AddHash("Id", RawHash); - } - else - { - const Oid& Id = std::get<Oid>(Request.Id); - ResponseWriter.AddObjectId("Id", Id); - } - if (!Request.ModTag.has_value() || Request.ModTag.value() != Result.ModTag) - { - ResponseWriter.AddInteger("ModTag", Result.ModTag); - if (!Request.SkipData) - { - auto ExtractRangeResult = ExtractRange(std::move(Result.ChunkBuffer), - Request.Offset, - Request.Size, - ZenContentType::kCompressedBinary); - if (ExtractRangeResult.Error == GetChunkRangeResult::EError::Ok) - { - if (ExtractRangeResult.ContentType == ZenContentType::kCompressedBinary) - { - ZEN_ASSERT(ExtractRangeResult.RawHash != IoHash::Zero); - CompressedBuffer CompressedValue = - CompressedBuffer::FromCompressedNoValidate(std::move(ExtractRangeResult.Chunk)); - ZEN_ASSERT(CompressedValue); - - if (ExtractRangeResult.RawSize != 0) - { - // This really could use some thought so we don't send the same data if we get a request for - // multiple ranges from the same chunk block - - uint64_t FragmentRawOffset = 0; - OodleCompressor Compressor; - OodleCompressionLevel CompressionLevel; - uint64_t BlockSize = 0; - if (CompressedValue.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) - { - if (BlockSize > 0) - { - FragmentRawOffset = (Request.Offset / BlockSize) * BlockSize; - } - else - { - FragmentRawOffset = Request.Offset; - } - uint64_t FragmentRawLength = CompressedValue.DecodeRawSize(); - - IoHashStream FragmentHashStream; - FragmentHashStream.Append(ExtractRangeResult.RawHash.Hash, - sizeof(ExtractRangeResult.RawHash.Hash)); - FragmentHashStream.Append(&FragmentRawOffset, sizeof(FragmentRawOffset)); - FragmentHashStream.Append(&FragmentRawLength, sizeof(FragmentRawLength)); - IoHash FragmentHash = FragmentHashStream.GetHash(); - - ResponseWriter.AddHash("FragmentHash", FragmentHash); - ResponseWriter.AddInteger("FragmentOffset", FragmentRawOffset); - ResponseWriter.AddInteger("RawSize", ExtractRangeResult.RawSize); - ResponsePackage.AddAttachment(CbAttachment(CompressedValue, FragmentHash)); - } - else - { - std::string ErrorString = "Failed to get compression parameters from partial compressed buffer"; - ResponseWriter.AddString("Error", ErrorString); - ZEN_WARN("oplog '{}/{}': {}", Project.Identifier, Oplog.OplogId(), ErrorString); - } - } - else - { - ResponseWriter.AddHash("RawHash"sv, ExtractRangeResult.RawHash); - ResponsePackage.AddAttachment(CbAttachment(std::move(CompressedValue), ExtractRangeResult.RawHash)); - } - } - else - { - IoHashStream HashStream; - ZEN_ASSERT(Request.Id.index() == 1); - const Oid& Id = std::get<Oid>(Request.Id); - HashStream.Append(Id.OidBits, sizeof(Id.OidBits)); - HashStream.Append(&Request.Offset, sizeof(Request.Offset)); - HashStream.Append(&Request.Size, sizeof(Request.Size)); - IoHash Hash = HashStream.GetHash(); - - ResponseWriter.AddHash("Hash"sv, Hash); - if (ExtractRangeResult.RawSize != 0) - { - ResponseWriter.AddInteger("Size", ExtractRangeResult.RawSize); - } - ResponsePackage.AddAttachment(CbAttachment(std::move(ExtractRangeResult.Chunk), Hash)); - } - } - else - { - std::string ErrorString = - fmt::format("Failed fetching chunk range ({})", ExtractRangeResult.ErrorDescription); - ResponseWriter.AddString("Error", ErrorString); - ZEN_WARN("oplog '{}/{}': {}", Project.Identifier, Oplog.OplogId(), ErrorString); - } - } - } - } - ResponseWriter.EndObject(); - } - } - } - ResponseWriter.EndArray(); - ResponsePackage.SetObject(ResponseWriter.Save()); - - return ResponsePackage; -} - -bool -ProjectStore::AreDiskWritesAllowed() const -{ - return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); -} - -void -ProjectStore::EnableUpdateCapture() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - m_ProjectsLock.WithExclusiveLock([&]() { - if (m_UpdateCaptureRefCounter == 0) - { - ZEN_ASSERT(!m_CapturedProjects); - m_CapturedProjects = std::make_unique<std::vector<std::string>>(); - } - else - { - ZEN_ASSERT(m_CapturedProjects); - } - m_UpdateCaptureRefCounter++; - }); -} - -void -ProjectStore::DisableUpdateCapture() -{ - m_ProjectsLock.WithExclusiveLock([&]() { - ZEN_ASSERT(m_CapturedProjects); - ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); - m_UpdateCaptureRefCounter--; - if (m_UpdateCaptureRefCounter == 0) - { - m_CapturedProjects.reset(); - } - }); -} - -std::vector<std::string> -ProjectStore::GetCapturedProjectsLocked() -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - if (m_CapturedProjects) - { - return *m_CapturedProjects; - } - return {}; -} - -std::string -ProjectStore::GetGcName(GcCtx&) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - - return fmt::format("projectstore: '{}'", m_ProjectBasePath.string()); -} - -class ProjectStoreGcStoreCompactor : public GcStoreCompactor -{ -public: - ProjectStoreGcStoreCompactor(ProjectStore& ProjectStore, - const std::filesystem::path& BasePath, - std::vector<std::filesystem::path>&& OplogPathsToRemove, - std::vector<std::filesystem::path>&& ProjectPathsToRemove) - : m_ProjectStore(ProjectStore) - , m_BasePath(BasePath) - , m_OplogPathsToRemove(std::move(OplogPathsToRemove)) - , m_ProjectPathsToRemove(std::move(ProjectPathsToRemove)) - { - } - - virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>&) override - { - ZEN_TRACE_CPU("Store::CompactStore"); - ZEN_MEMSCOPE(GetProjectstoreTag()); - - auto Log = [&Ctx]() { return Ctx.Logger; }; - - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: projectstore [COMPACT] '{}': RemovedDisk: {} in {}", - m_BasePath, - NiceBytes(Stats.RemovedDisk), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - size_t CompactOplogCount = 0; - if (Ctx.Settings.IsDeleteMode) - { - for (const std::filesystem::path& OplogPath : m_OplogPathsToRemove) - { - uint64_t OplogSize = ProjectStore::Oplog::TotalSize(OplogPath); - if (DeleteDirectories(OplogPath)) - { - ZEN_DEBUG("GCV2: projectstore [COMPACT] '{}': removed oplog folder '{}', removed {}", - m_BasePath, - OplogPath, - NiceBytes(OplogSize)); - Stats.RemovedDisk += OplogSize; - } - else - { - ZEN_WARN("GCV2: projectstore [COMPACT] '{}': Failed to remove oplog folder '{}'", m_BasePath, OplogPath); - } - } - - for (const std::filesystem::path& ProjectPath : m_ProjectPathsToRemove) - { - uint64_t ProjectSize = ProjectStore::Project::TotalSize(ProjectPath); - if (DeleteDirectories(ProjectPath)) - { - ZEN_DEBUG("GCV2: projectstore [COMPACT] '{}': removed project folder '{}', removed {}", - m_BasePath, - ProjectPath, - NiceBytes(ProjectSize)); - Stats.RemovedDisk += ProjectSize; - } - else - { - ZEN_WARN("GCV2: projectstore [COMPACT] '{}': Failed to remove project folder '{}'", m_BasePath, ProjectPath); - } - } - } - - for (auto ProjectIt : m_ProjectStore.m_Projects) - { - Ref<ProjectStore::Project> Project = ProjectIt.second; - std::vector<std::string> OplogsToCompact = Project->GetOplogsToCompact(); - CompactOplogCount += OplogsToCompact.size(); - for (const std::string& OplogId : OplogsToCompact) - { - Ref<ProjectStore::Oplog> OpLog; - { - RwLock::SharedLockScope __(Project->m_ProjectLock); - if (auto OpIt = Project->m_Oplogs.find(OplogId); OpIt != Project->m_Oplogs.end()) - { - OpLog = OpIt->second; - } - else - { - Stopwatch OplogTimer; - std::filesystem::path OplogBasePath = Project->BasePathForOplog(OplogId); - OpLog = new ProjectStore::Oplog( - Project->Log(), - Project->Identifier, - OplogId, - Project->m_CidStore, - OplogBasePath, - std::filesystem::path{}, - ProjectStore::Oplog::EMode::kFull); // We need it to be a full read so we can write a new index snapshot - OpLog->Read(); - if (Ctx.Settings.Verbose) - { - ZEN_INFO("GCV2: projectstore [COMPACT] '{}': read oplog '{}/{}' at '{}' in {}", - m_BasePath, - Project->Identifier, - OplogId, - OplogBasePath, - NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); - } - } - - if (OpLog) - { - const uint64_t PreSize = OpLog->TotalSize(); - - OpLog->Compact(!Ctx.Settings.IsDeleteMode, - /*RetainLSNs*/ true, - fmt::format("GCV2: projectstore [COMPACT] '{}': ", m_BasePath)); - - const uint64_t PostSize = OpLog->TotalSize(); - - const uint64_t FreedSize = (PreSize > PostSize) ? (PreSize - PostSize) : 0; - - Stats.RemovedDisk += FreedSize; - } - } - } - } - - if (!Ctx.Settings.IsDeleteMode) - { - ZEN_DEBUG("GCV2: projectstore [COMPACT] '{}': Skipped deleting of {} oplogs and {} projects, skipped compacting {} oplogs", - m_BasePath, - m_OplogPathsToRemove.size(), - m_ProjectPathsToRemove.size(), - CompactOplogCount); - } - - m_ProjectPathsToRemove.clear(); - m_OplogPathsToRemove.clear(); - } - - virtual std::string GetGcName(GcCtx&) override { return fmt::format("projectstore: '{}'", m_BasePath.string()); } - -private: - ProjectStore& m_ProjectStore; - std::filesystem::path m_BasePath; - std::vector<std::filesystem::path> m_OplogPathsToRemove; - std::vector<std::filesystem::path> m_ProjectPathsToRemove; -}; - -GcStoreCompactor* -ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) -{ - ZEN_TRACE_CPU("Store::RemoveExpiredData"); - ZEN_MEMSCOPE(GetProjectstoreTag()); - - auto Log = [&Ctx]() { return Ctx.Logger; }; - - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: projectstore [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {} in {}", - m_ProjectBasePath, - Stats.CheckedCount, - Stats.FoundCount, - Stats.DeletedCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - std::vector<std::filesystem::path> OplogPathsToRemove; - std::vector<std::filesystem::path> ProjectPathsToRemove; - - std::vector<Ref<Project>> ExpiredProjects; - std::vector<Ref<Project>> Projects; - - DiscoverProjects(); - - { - RwLock::SharedLockScope Lock(m_ProjectsLock); - for (auto& Kv : m_Projects) - { - Stats.CheckedCount++; - if (Kv.second->IsExpired(Ctx.Settings.ProjectStoreExpireTime)) - { - ExpiredProjects.push_back(Kv.second); - continue; - } - Projects.push_back(Kv.second); - } - } - - size_t ExpiredOplogCount = 0; - for (const Ref<Project>& Project : Projects) - { - if (Ctx.IsCancelledFlag) - { - break; - } - - std::vector<std::string> ExpiredOplogs; - - std::vector<std::string> OpLogs = Project->ScanForOplogs(); - for (const std::string& OplogId : OpLogs) - { - Stats.CheckedCount++; - if (Project->IsExpired(Ctx.Settings.ProjectStoreExpireTime, OplogId)) - { - ExpiredOplogs.push_back(OplogId); - } - else if (!Project->IsOplogTouchedSince(GcClock::Now() - std::chrono::minutes(15), OplogId)) - { - if (Project->TryUnloadOplog(OplogId)) - { - ZEN_INFO("GCV2: projectstore [REMOVE EXPIRED] '{}': Unloaded oplog {}/{} due to inactivity", - m_ProjectBasePath, - Project->Identifier, - OplogId); - } - } - } - - std::filesystem::path ProjectPath = BasePathForProject(Project->Identifier); - ExpiredOplogCount += ExpiredOplogs.size(); - if (Ctx.Settings.IsDeleteMode) - { - for (const std::string& OplogId : ExpiredOplogs) - { - std::filesystem::path RemovePath; - if (Project->RemoveOplog(OplogId, RemovePath)) - { - if (!RemovePath.empty()) - { - OplogPathsToRemove.push_back(RemovePath); - } - Stats.DeletedCount++; - } - } - Project->Flush(); - } - } - - if (Ctx.Settings.IsDeleteMode) - { - for (const Ref<Project>& Project : ExpiredProjects) - { - std::string ProjectId = Project->Identifier; - { - { - if (!Project->IsExpired(Ctx.Settings.ProjectStoreExpireTime)) - { - ZEN_DEBUG( - "GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project no longer " - "expired.", - m_ProjectBasePath, - ProjectId); - continue; - } - } - std::filesystem::path RemovePath; - bool Success = RemoveProject(ProjectId, RemovePath); - if (!Success) - { - ZEN_DEBUG( - "GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project folder is locked.", - m_ProjectBasePath, - ProjectId); - continue; - } - if (!RemovePath.empty()) - { - ProjectPathsToRemove.push_back(RemovePath); - } - } - } - Stats.DeletedCount += ExpiredProjects.size(); - } - - size_t ExpiredProjectCount = ExpiredProjects.size(); - Stats.FoundCount += ExpiredOplogCount + ExpiredProjectCount; - return new ProjectStoreGcStoreCompactor(*this, m_ProjectBasePath, std::move(OplogPathsToRemove), std::move(ProjectPathsToRemove)); -} - -class ProjectStoreReferenceChecker : public GcReferenceChecker -{ -public: - ProjectStoreReferenceChecker(ProjectStore& InProjectStore) : m_ProjectStore(InProjectStore) { m_ProjectStore.EnableUpdateCapture(); } - - virtual ~ProjectStoreReferenceChecker() - { - try - { - m_ProjectStore.DisableUpdateCapture(); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("~ProjectStoreReferenceChecker threw exception: '{}'", Ex.what()); - } - } - - virtual std::string GetGcName(GcCtx&) override { return "projectstore"; } - virtual void PreCache(GcCtx&) override {} - - virtual void UpdateLockedState(GcCtx& Ctx) override - { - ZEN_TRACE_CPU("Store::UpdateLockedState"); - - auto Log = [&Ctx]() { return Ctx.Logger; }; - - Stopwatch Timer; - - std::vector<Ref<ProjectStore::Oplog>> AddedOplogs; - - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} in {} new oplogs", - "projectstore", - m_References.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - AddedOplogs.size()); - }); - - std::vector<std::string> AddedProjects = m_ProjectStore.GetCapturedProjectsLocked(); - for (const std::string& AddedProject : AddedProjects) - { - if (auto It = m_ProjectStore.m_Projects.find(AddedProject); It != m_ProjectStore.m_Projects.end()) - { - ProjectStore::Project& Project = *It->second; - for (auto& OplogPair : Project.m_Oplogs) - { - Ref<ProjectStore::Oplog> Oplog = OplogPair.second; - AddedOplogs.push_back(Oplog); - } - } - } - for (auto& ProjectPair : m_ProjectStore.m_Projects) - { - ProjectStore::Project& Project = *ProjectPair.second; - - std::vector<std::string> AddedOplogNames(Project.GetCapturedOplogsLocked()); - for (const std::string& OplogName : AddedOplogNames) - { - if (auto It = Project.m_Oplogs.find(OplogName); It != Project.m_Oplogs.end()) - { - Ref<ProjectStore::Oplog> Oplog = It->second; - AddedOplogs.push_back(Oplog); - } - } - } - - for (const Ref<ProjectStore::Oplog>& Oplog : AddedOplogs) - { - size_t BaseReferenceCount = m_References.size(); - - Stopwatch InnerTimer; - const auto __ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} from {}", - Oplog->m_BasePath, - m_References.size() - BaseReferenceCount, - NiceTimeSpanMs(InnerTimer.GetElapsedTimeMs()), - Oplog->OplogId()); - }); - - Oplog->GetAttachmentsLocked(m_References, Ctx.Settings.StoreProjectAttachmentMetaData); - if (std::vector<IoHash> PendingChunkReferences = Oplog->GetPendingChunkReferencesLocked(); !PendingChunkReferences.empty()) - { - m_References.insert(m_References.end(), PendingChunkReferences.begin(), PendingChunkReferences.end()); - } - } - - FilterReferences(Ctx, fmt::format("projectstore [LOCKSTATE] '{}'", "projectstore"), m_References); - } - - virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override - { - ZEN_TRACE_CPU("Store::GetUnusedReferences"); - - auto Log = [&Ctx]() { return Ctx.Logger; }; - - size_t InitialCount = IoCids.size(); - size_t UsedCount = InitialCount; - - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", - "projectstore", - UsedCount, - InitialCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_References, IoCids); - UsedCount = IoCids.size() - UnusedReferences.size(); - return UnusedReferences; - } - -private: - ProjectStore& m_ProjectStore; - std::vector<IoHash> m_References; -}; - -class ProjectStoreOplogReferenceChecker : public GcReferenceChecker -{ -public: - ProjectStoreOplogReferenceChecker(ProjectStore& InProjectStore, Ref<ProjectStore::Project> InProject, std::string_view InOplog) - : m_ProjectStore(InProjectStore) - , m_Project(InProject) - , m_OplogId(InOplog) - { - m_Project->EnableUpdateCapture(); - } - - virtual ~ProjectStoreOplogReferenceChecker() - { - try - { - m_Project->DisableUpdateCapture(); - - if (m_OplogHasUpdateCapture) - { - RwLock::SharedLockScope _(m_Project->m_ProjectLock); - if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) - { - Ref<ProjectStore::Oplog> Oplog = It->second; - Oplog->DisableUpdateCapture(); - m_OplogHasUpdateCapture = false; - } - } - } - catch (const std::exception& Ex) - { - ZEN_ERROR("~ProjectStoreOplogReferenceChecker threw exception: '{}'", Ex.what()); - } - } - - virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}/{}'", m_Project->Identifier, m_OplogId); } - - virtual void PreCache(GcCtx& Ctx) override - { - ZEN_TRACE_CPU("Store::Oplog::PreCache"); - - auto Log = [&Ctx]() { return Ctx.Logger; }; - - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': precached {} references in {} from {}/{}", - m_OplogBasePath, - m_References.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - m_Project->Identifier, - m_OplogId); - }); - - m_OplogBasePath = m_Project->BasePathForOplog(m_OplogId); - { - Ref<ProjectStore::Oplog> Oplog; - - RwLock::SharedLockScope __(m_Project->m_ProjectLock); - if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) - { - Oplog = It->second; - Oplog->EnableUpdateCapture(); - m_OplogHasUpdateCapture = true; - } - else if (ProjectStore::Oplog::ExistsAt(m_OplogBasePath)) - { - Stopwatch OplogTimer; - Oplog = new ProjectStore::Oplog(m_Project->Log(), - m_Project->Identifier, - m_OplogId, - m_Project->m_CidStore, - m_OplogBasePath, - std::filesystem::path{}, - ProjectStore::Oplog::EMode::kBasicReadOnly); - Oplog->Read(); - if (Ctx.Settings.Verbose) - { - ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': read oplog '{}/{}' in {}", - m_OplogBasePath, - m_Project->Identifier, - m_OplogId, - NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); - } - } - else - { - return; - } - - RwLock::SharedLockScope ___(Oplog->m_OplogLock); - if (Ctx.IsCancelledFlag) - { - return; - } - - GcClock::TimePoint CompactExpireTime = GcClock::Now() - std::chrono::minutes(30); - if (!m_Project->IsOplogTouchedSince(CompactExpireTime, m_OplogId)) - { - const uint32_t CompactUnusedThreshold = 25; - if (Oplog->GetUnusedSpacePercent() >= CompactUnusedThreshold) - { - m_Project->AddOplogToCompact(m_OplogId); - } - } - - Oplog->GetAttachmentsLocked(m_References, Ctx.Settings.StoreProjectAttachmentMetaData); - m_OplogAccessTime = m_Project->LastOplogAccessTime(m_OplogId); - } - FilterReferences(Ctx, fmt::format("projectstore [PRECACHE] '{}'", m_OplogBasePath), m_References); - } - - virtual void UpdateLockedState(GcCtx& Ctx) override - { - ZEN_TRACE_CPU("Store::Oplog::UpdateLockedState"); - - auto Log = [&Ctx]() { return Ctx.Logger; }; - - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} from {}/{}", - m_OplogBasePath, - m_AddedReferences.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - m_Project->Identifier, - m_OplogId); - }); - - if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) - { - Ref<ProjectStore::Oplog> Oplog = It->second; - Oplog->IterateCapturedOpsLocked([&](const Oid& Key, ProjectStore::LogSequenceNumber LSN, const CbObjectView& UpdateOp) -> bool { - ZEN_UNUSED(Key, LSN); - UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_AddedReferences.emplace_back(Visitor.AsAttachment()); }); - return true; - }); - std::vector<IoHash> AddedAttachments = Oplog->GetCapturedAttachmentsLocked(); - m_AddedReferences.insert(m_AddedReferences.end(), AddedAttachments.begin(), AddedAttachments.end()); - if (std::vector<IoHash> PendingChunkReferences = Oplog->GetPendingChunkReferencesLocked(); !PendingChunkReferences.empty()) - { - m_AddedReferences.insert(m_AddedReferences.end(), PendingChunkReferences.begin(), PendingChunkReferences.end()); - } - } - else if (m_Project->LastOplogAccessTime(m_OplogId) > m_OplogAccessTime && ProjectStore::Oplog::ExistsAt(m_OplogBasePath)) - { - Stopwatch OplogTimer; - { - Ref<ProjectStore::Oplog> Oplog(new ProjectStore::Oplog(m_Project->Log(), - m_Project->Identifier, - m_OplogId, - m_Project->m_CidStore, - m_OplogBasePath, - std::filesystem::path{}, - ProjectStore::Oplog::EMode::kBasicReadOnly)); - Oplog->Read(); - if (Ctx.Settings.Verbose) - { - ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': read oplog '{}/{}' in {}", - m_OplogBasePath, - m_Project->Identifier, - m_OplogId, - NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); - } - - OplogTimer.Reset(); - - Oplog->GetAttachmentsLocked(m_AddedReferences, Ctx.Settings.StoreProjectAttachmentMetaData); - } - if (Ctx.Settings.Verbose) - { - ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': read referenced attachments from oplog '{}/{}' in {}", - m_OplogBasePath, - m_Project->Identifier, - m_OplogId, - NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); - } - } - FilterReferences(Ctx, fmt::format("projectstore [LOCKSTATE] '{}'", m_OplogBasePath), m_AddedReferences); - } - - virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override - { - ZEN_TRACE_CPU("Store::Oplog::GetUnusedReferences"); - - auto Log = [&Ctx]() { return Ctx.Logger; }; - - const size_t InitialCount = IoCids.size(); - size_t UsedCount = InitialCount; - - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {} from {}/{}", - m_OplogBasePath, - UsedCount, - InitialCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - m_Project->Identifier, - m_OplogId); - }); - - std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_References, IoCids); - UnusedReferences = KeepUnusedReferences(m_AddedReferences, UnusedReferences); - UsedCount = IoCids.size() - UnusedReferences.size(); - return UnusedReferences; - } - - ProjectStore& m_ProjectStore; - Ref<ProjectStore::Project> m_Project; - std::string m_OplogId; - std::filesystem::path m_OplogBasePath; - bool m_OplogHasUpdateCapture = false; - std::vector<IoHash> m_References; - std::vector<IoHash> m_AddedReferences; - GcClock::TimePoint m_OplogAccessTime; -}; - -std::vector<GcReferenceChecker*> -ProjectStore::CreateReferenceCheckers(GcCtx& Ctx) -{ - ZEN_TRACE_CPU("Store::CreateReferenceCheckers"); - - auto Log = [&Ctx]() { return Ctx.Logger; }; - - size_t ProjectCount = 0; - size_t OplogCount = 0; - - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: projectstore [CREATE CHECKERS] '{}': opened {} projects and {} oplogs in {}", - m_ProjectBasePath, - ProjectCount, - OplogCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - DiscoverProjects(); - - std::vector<Ref<ProjectStore::Project>> Projects; - std::vector<GcReferenceChecker*> Checkers; - Checkers.emplace_back(new ProjectStoreReferenceChecker(*this)); - { - RwLock::SharedLockScope Lock(m_ProjectsLock); - Projects.reserve(m_Projects.size()); - - for (auto& Kv : m_Projects) - { - Projects.push_back(Kv.second); - } - } - ProjectCount += Projects.size(); - try - { - for (const Ref<ProjectStore::Project>& Project : Projects) - { - std::vector<std::string> OpLogs = Project->ScanForOplogs(); - Checkers.reserve(Checkers.size() + OpLogs.size()); - for (const std::string& OpLogId : OpLogs) - { - Checkers.emplace_back(new ProjectStoreOplogReferenceChecker(*this, Project, OpLogId)); - OplogCount++; - } - } - } - catch (const std::exception&) - { - while (!Checkers.empty()) - { - delete Checkers.back(); - Checkers.pop_back(); - } - throw; - } - - return Checkers; -} - -std::vector<RwLock::SharedLockScope> -ProjectStore::LockState(GcCtx& Ctx) -{ - ZEN_TRACE_CPU("Store::LockState"); - - auto Log = [&Ctx]() { return Ctx.Logger; }; - - std::vector<RwLock::SharedLockScope> Locks; - Locks.emplace_back(RwLock::SharedLockScope(m_ProjectsLock)); - for (auto& ProjectIt : m_Projects) - { - std::vector<RwLock::SharedLockScope> ProjectLocks = ProjectIt.second->GetGcReferencerLocks(); - for (auto It = std::make_move_iterator(ProjectLocks.begin()); It != std::make_move_iterator(ProjectLocks.end()); It++) - { - Locks.emplace_back(std::move(*It)); - } - } - return Locks; -} - -class ProjectStoreOplogReferenceValidator : public GcReferenceValidator -{ -public: - ProjectStoreOplogReferenceValidator(ProjectStore& InProjectStore, std::string_view InProject, std::string_view InOplog) - : m_ProjectStore(InProjectStore) - , m_ProjectId(InProject) - , m_OplogId(InOplog) - { - } - - virtual ~ProjectStoreOplogReferenceValidator() {} - - virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}/{}'", m_ProjectId, m_OplogId); } - - virtual void Validate(GcCtx& Ctx, GcReferenceValidatorStats& Stats) override - { - ZEN_TRACE_CPU("Store::Validate"); - - auto Log = [&Ctx]() { return Ctx.Logger; }; - - ProjectStore::Oplog::ValidationResult Result; - - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - std::string Status = Result.IsEmpty() ? "OK" : "Missing data"; - ZEN_INFO("GCV2: projectstore [VALIDATE] '{}/{}': Validated in {}. OpCount: {}, MinLSN: {}, MaxLSN: {}, Status: {}", - m_ProjectId, - m_OplogId, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - Result.OpCount, - Result.LSNLow.Number, - Result.LSNHigh.Number, - Status); - }); - Ref<ProjectStore::Oplog> Oplog; - Ref<ProjectStore::Project> Project = m_ProjectStore.OpenProject(m_ProjectId); - if (Project) - { - { - RwLock::SharedLockScope __(Project->m_ProjectLock); - if (auto It = Project->m_Oplogs.find(m_OplogId); It != Project->m_Oplogs.end()) - { - Oplog = It->second; - } - else - { - Stopwatch OplogTimer; - - std::filesystem::path OplogBasePath = Project->BasePathForOplog(m_OplogId); - Oplog = Ref<ProjectStore::Oplog>(new ProjectStore::Oplog(Project->Log(), - Project->Identifier, - m_OplogId, - Project->m_CidStore, - OplogBasePath, - std::filesystem::path{}, - ProjectStore::Oplog::EMode::kBasicReadOnly)); - Oplog->Read(); - if (Ctx.Settings.Verbose) - { - ZEN_INFO("GCV2: projectstore [VALIDATE] '{}': read oplog '{}/{}' in {}", - OplogBasePath, - Project->Identifier, - m_OplogId, - NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); - } - - if (Ctx.IsCancelledFlag) - { - return; - } - } - } - - if (Oplog) - { - Result = Oplog->Validate(Project->RootDir, Ctx.IsCancelledFlag, nullptr); - if (Ctx.IsCancelledFlag) - { - return; - } - Stats.CheckedCount = Result.OpCount; - Stats.MissingChunks = Result.MissingChunks.size(); - Stats.MissingFiles = Result.MissingFiles.size(); - Stats.MissingMetas = Result.MissingMetas.size(); - Stats.MissingAttachments = Result.MissingAttachments.size(); - } - - if (!Result.IsEmpty()) - { - ZEN_WARN("GCV2: projectstore [VALIDATE] '{}/{}': Missing data: Files: {}, Chunks: {}, Metas: {}, Attachments: {}", - m_ProjectId, - m_OplogId, - Result.MissingFiles.size(), - Result.MissingChunks.size(), - Result.MissingMetas.size(), - Result.MissingAttachments.size()); - } - } - } - - ProjectStore& m_ProjectStore; - std::string m_ProjectId; - std::string m_OplogId; -}; - -std::vector<GcReferenceValidator*> -ProjectStore::CreateReferenceValidators(GcCtx& Ctx) -{ - if (Ctx.Settings.SkipCidDelete) - { - return {}; - } - - auto Log = [&Ctx]() { return Ctx.Logger; }; - - DiscoverProjects(); - - std::vector<std::pair<std::string, std::string>> Oplogs; - { - RwLock::SharedLockScope _(m_ProjectsLock); - for (auto& ProjectPair : m_Projects) - { - ProjectStore::Project& Project = *ProjectPair.second; - std::vector<std::string> OpLogs = Project.ScanForOplogs(); - for (const std::string& OplogName : OpLogs) - { - Oplogs.push_back({Project.Identifier, OplogName}); - } - } - } - std::vector<GcReferenceValidator*> Validators; - Validators.reserve(Oplogs.size()); - for (const std::pair<std::string, std::string>& Oplog : Oplogs) - { - Validators.push_back(new ProjectStoreOplogReferenceValidator(*this, Oplog.first, Oplog.second)); - } - - return Validators; -} - -////////////////////////////////////////////////////////////////////////// - -Oid -OpKeyStringAsOid(std::string_view OpKey) -{ - eastl::fixed_vector<uint8_t, 512> Buffer; - return OpKeyStringAsOid(OpKey, Buffer); -} - -////////////////////////////////////////////////////////////////////////// - -#if ZEN_WITH_TESTS - -namespace testutils { - using namespace std::literals; - - static std::string OidAsString(const Oid& Id) - { - StringBuilder<25> OidStringBuilder; - Id.ToString(OidStringBuilder); - return OidStringBuilder.ToString(); - } - - static CbPackage CreateBulkDataOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) - { - CbPackage Package; - CbObjectWriter Object; - Object << "key"sv << OidAsString(Id); - if (!Attachments.empty()) - { - Object.BeginArray("bulkdata"); - for (const auto& Attachment : Attachments) - { - CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); - Object.BeginObject(); - Object << "id"sv << Attachment.first; - Object << "type"sv - << "Standard"sv; - Object << "data"sv << Attach; - Object.EndObject(); - - Package.AddAttachment(Attach); - } - Object.EndArray(); - } - Package.SetObject(Object.Save()); - return Package; - }; - - static CbPackage CreateFilesOplogPackage(const Oid& Id, - const std::filesystem::path ProjectRootDir, - const std::span<const std::pair<Oid, std::filesystem::path>>& Attachments) - { - CbPackage Package; - CbObjectWriter Object; - Object << "key"sv << OidAsString(Id); - if (!Attachments.empty()) - { - Object.BeginArray("files"); - for (const auto& Attachment : Attachments) - { - std::filesystem::path ServerPath = std::filesystem::relative(Attachment.second, ProjectRootDir); - std::filesystem::path ClientPath = ServerPath; // dummy - Object.BeginObject(); - Object << "id"sv << Attachment.first; - Object << "serverpath"sv << ServerPath.string(); - Object << "clientpath"sv << ClientPath.string(); - Object.EndObject(); - } - Object.EndArray(); - } - Package.SetObject(Object.Save()); - return Package; - }; - - static std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments( - const std::span<const size_t>& Sizes, - OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast, - uint64_t BlockSize = 0) - { - std::vector<std::pair<Oid, CompressedBuffer>> Result; - Result.reserve(Sizes.size()); - for (size_t Size : Sizes) - { - CompressedBuffer Compressed = - CompressedBuffer::Compress(SharedBuffer(CreateSemiRandomBlob(Size)), OodleCompressor::Mermaid, CompressionLevel, BlockSize); - Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); - } - return Result; - } - - static uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64_t RawOffset) - { - if (RawOffset > 0) - { - uint64_t BlockSize = 0; - OodleCompressor Compressor; - OodleCompressionLevel CompressionLevel; - if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) - { - return 0; - } - return BlockSize > 0 ? RawOffset % BlockSize : 0; - } - return 0; - } - - template<typename ChunkType> - CbObject BuildChunksRequest(bool SkipData, - std::string_view IdName, - const std::vector<ChunkType>& Chunks, - const std::vector<std::pair<size_t, size_t>>& Ranges, - const std::vector<uint64_t>& ModTags) - { - CbObjectWriter Request; - Request.BeginObject("Request"sv); - { - if (SkipData) - { - Request.AddBool("SkipData"sv, true); - } - if (!Chunks.empty()) - { - Request.BeginArray("Chunks"); - for (size_t Index = 0; Index < Chunks.size(); Index++) - { - Request.BeginObject(); - { - Request << IdName << Chunks[Index]; - if (!ModTags.empty()) - { - Request << "ModTag" << ModTags[Index]; - } - if (!Ranges.empty()) - { - Request << "Offset" << Ranges[Index].first; - Request << "Size" << Ranges[Index].second; - } - } - Request.EndObject(); - } - Request.EndArray(); - } - } - Request.EndObject(); - return Request.Save(); - }; - - CbObject BuildChunksRequest(bool SkipData, - const std::vector<Oid>& Chunks, - const std::vector<std::pair<size_t, size_t>>& Ranges, - const std::vector<uint64_t>& ModTags) - { - return BuildChunksRequest<Oid>(SkipData, "Oid", Chunks, Ranges, ModTags); - } - - CbObject BuildChunksRequest(bool SkipData, - const std::vector<IoHash>& Chunks, - const std::vector<std::pair<size_t, size_t>>& Ranges, - const std::vector<uint64_t>& ModTags) - { - return BuildChunksRequest<IoHash>(SkipData, "RawHash", Chunks, Ranges, ModTags); - } - -} // namespace testutils - -TEST_CASE("project.opkeys") -{ - using namespace std::literals; - - const std::string_view LongKey = - "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" - "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" - "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"sv; - - // Not a test per se, this code just exercises the key computation logic to ensure all - // edge cases are handled by the bug workaround logic - - for (int i = 1; i < 300; ++i) - { - CbObjectWriter Cbo; - Cbo << "key"sv << LongKey.substr(0, i); - - const Oid KeyId = ComputeOpKey(Cbo.Save()); - } - - { - CbObjectWriter Cbo; - Cbo << "key"sv - << "abcdef"; - - const Oid KeyId = ComputeOpKey(Cbo.Save()); - const Oid CorrectId = Oid::FromHexString( - "7a03540e" - "ecb0daa9" - "00f2949e"); - - CHECK(KeyId == CorrectId); - } - - { - CbObjectWriter Cbo; - Cbo << "key"sv - << "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" - "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" - "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"; - - const Oid KeyId = ComputeOpKey(Cbo.Save()); - const Oid CorrectId = Oid::FromHexString( - "c5e88c79" - "06b7fa38" - "7b0d2efd"); - - CHECK(KeyId == CorrectId); - } -} - -TEST_CASE("project.store.create") -{ - using namespace std::literals; - - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - CidStore CidStore(Gc); - CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); - - std::string_view ProjectName("proj1"sv); - std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); - std::filesystem::path RootDir = TempDir.Path() / "root"; - std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; - std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; - std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; - - Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / ProjectName, - ProjectName, - RootDir.string(), - EngineRootDir.string(), - ProjectRootDir.string(), - ProjectFilePath.string())); - CHECK(ProjectStore.DeleteProject(ProjectName)); - CHECK(!Project->Exists(BasePath)); -} - -TEST_CASE("project.store.lifetimes") -{ - using namespace std::literals; - - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - CidStore CidStore(Gc); - CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); - - std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); - std::filesystem::path RootDir = TempDir.Path() / "root"; - std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; - std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; - std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; - - Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv, - "proj1"sv, - RootDir.string(), - EngineRootDir.string(), - ProjectRootDir.string(), - ProjectFilePath.string())); - Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog1", {}); - CHECK(Oplog); - - std::filesystem::path DeletePath; - CHECK(Project->PrepareForDelete(DeletePath)); - CHECK(!DeletePath.empty()); - CHECK(!Project->OpenOplog("oplog1", /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true)); - // Oplog is now invalid, but pointer can still be accessed since we store old oplog pointers - CHECK(Oplog->OplogCount() == 0); - // Project is still valid since we have a Ref to it - CHECK(Project->Identifier == "proj1"sv); -} - -TEST_CASE("project.store.gc") -{ - using namespace std::literals; - using namespace testutils; - - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - CidStore CidStore(Gc); - CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); - - std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); - std::filesystem::path RootDir = TempDir.Path() / "root"; - std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; - - std::filesystem::path Project1RootDir = TempDir.Path() / "game1"; - std::filesystem::path Project1FilePath = TempDir.Path() / "game1" / "game.uproject"; - { - CreateDirectories(Project1FilePath.parent_path()); - BasicFile ProjectFile; - ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); - } - std::filesystem::path Project1OplogPath = TempDir.Path() / "game1" / "saves" / "cooked" / ".projectstore"; - { - CreateDirectories(Project1OplogPath.parent_path()); - BasicFile OplogFile; - OplogFile.Open(Project1OplogPath, BasicFile::Mode::kTruncate); - } - - std::filesystem::path Project2RootDir = TempDir.Path() / "game2"; - std::filesystem::path Project2FilePath = TempDir.Path() / "game2" / "game.uproject"; - { - CreateDirectories(Project2FilePath.parent_path()); - BasicFile ProjectFile; - ProjectFile.Open(Project2FilePath, BasicFile::Mode::kTruncate); - } - std::filesystem::path Project2Oplog1Path = TempDir.Path() / "game1" / "saves" / "cooked" / ".projectstore"; - { - CreateDirectories(Project2Oplog1Path.parent_path()); - BasicFile OplogFile; - OplogFile.Open(Project2Oplog1Path, BasicFile::Mode::kTruncate); - } - std::filesystem::path Project2Oplog2Path = TempDir.Path() / "game2" / "saves" / "cooked" / ".projectstore"; - { - CreateDirectories(Project2Oplog2Path.parent_path()); - BasicFile OplogFile; - OplogFile.Open(Project2Oplog2Path, BasicFile::Mode::kTruncate); - } - - { - Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, - "proj1"sv, - RootDir.string(), - EngineRootDir.string(), - Project1RootDir.string(), - Project1FilePath.string())); - Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1", Project1OplogPath); - CHECK(Oplog); - - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); - Oplog->AppendNewOplogEntry( - CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}))); - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122}))); - } - - { - Ref<ProjectStore::Project> Project2(ProjectStore.NewProject(BasePath / "proj2"sv, - "proj2"sv, - RootDir.string(), - EngineRootDir.string(), - Project2RootDir.string(), - Project2FilePath.string())); - { - Ref<ProjectStore::Oplog> Oplog = Project2->NewOplog("oplog2", Project2Oplog1Path); - CHECK(Oplog); - - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{177}))); - Oplog->AppendNewOplogEntry( - CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9123, 383, 590, 96}))); - Oplog->AppendNewOplogEntry( - CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{535, 221}))); - } - { - Ref<ProjectStore::Oplog> Oplog = Project2->NewOplog("oplog3", Project2Oplog2Path); - CHECK(Oplog); - - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{137}))); - Oplog->AppendNewOplogEntry( - CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9723, 683, 594, 98}))); - Oplog->AppendNewOplogEntry( - CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{531, 271}))); - } - } - - { - GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), - .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24), - .IsDeleteMode = true}; - GcResult Result = Gc.CollectGarbage(Settings); - CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); - CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); - CHECK_EQ(21u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); - CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); - CHECK(ProjectStore.OpenProject("proj1"sv)); - CHECK(ProjectStore.OpenProject("proj2"sv)); - } - - { - GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24), - .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24), - .IsDeleteMode = true}; - GcResult Result = Gc.CollectGarbage(Settings); - CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); - CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); - CHECK_EQ(21u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); - CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); - CHECK(ProjectStore.OpenProject("proj1"sv)); - CHECK(ProjectStore.OpenProject("proj2"sv)); - } - - RemoveFile(Project1FilePath); - - { - GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), - .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24), - .IsDeleteMode = true}; - GcResult Result = Gc.CollectGarbage(Settings); - CHECK_EQ(5u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); - CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); - CHECK_EQ(21u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); - CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); - CHECK(ProjectStore.OpenProject("proj1"sv)); - CHECK(ProjectStore.OpenProject("proj2"sv)); - } - - { - GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24), - .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24), - .CollectSmallObjects = true, - .IsDeleteMode = true}; - GcResult Result = Gc.CollectGarbage(Settings); - CHECK_EQ(4u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); - CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); - CHECK_EQ(21u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); - CHECK_EQ(7u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); - CHECK(!ProjectStore.OpenProject("proj1"sv)); - CHECK(ProjectStore.OpenProject("proj2"sv)); - } - - RemoveFile(Project2Oplog1Path); - { - GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), - .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24), - .CollectSmallObjects = true, - .IsDeleteMode = true}; - GcResult Result = Gc.CollectGarbage(Settings); - CHECK_EQ(3u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); - CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); - CHECK_EQ(14u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); - CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); - CHECK(!ProjectStore.OpenProject("proj1"sv)); - CHECK(ProjectStore.OpenProject("proj2"sv)); - } - - { - GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24), - .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24), - .CollectSmallObjects = true, - .IsDeleteMode = true}; - GcResult Result = Gc.CollectGarbage(Settings); - CHECK_EQ(3u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); - CHECK_EQ(0u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); - CHECK_EQ(14u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); - CHECK_EQ(0u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); - CHECK(!ProjectStore.OpenProject("proj1"sv)); - CHECK(ProjectStore.OpenProject("proj2"sv)); - } - - RemoveFile(Project2FilePath); - { - GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24), - .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24), - .CollectSmallObjects = true, - .IsDeleteMode = true}; - GcResult Result = Gc.CollectGarbage(Settings); - CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.CheckedCount); - CHECK_EQ(1u, Result.ReferencerStatSum.RemoveExpiredDataStats.DeletedCount); - CHECK_EQ(14u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.CheckedCount); - CHECK_EQ(14u, Result.ReferenceStoreStatSum.RemoveUnreferencedDataStats.DeletedCount); - CHECK(!ProjectStore.OpenProject("proj1"sv)); - CHECK(!ProjectStore.OpenProject("proj2"sv)); - } -} - -TEST_CASE("project.store.gc.prep") -{ - using namespace std::literals; - using namespace testutils; - - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - CidStore CidStore(Gc); - CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); - - std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); - std::filesystem::path RootDir = TempDir.Path() / "root"; - std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; - - std::filesystem::path Project1RootDir = TempDir.Path() / "game1"; - std::filesystem::path Project1FilePath = TempDir.Path() / "game1" / "game.uproject"; - { - CreateDirectories(Project1FilePath.parent_path()); - BasicFile ProjectFile; - ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); - } - std::filesystem::path Project1OplogPath = TempDir.Path() / "game1" / "saves" / "cooked" / ".projectstore"; - { - CreateDirectories(Project1OplogPath.parent_path()); - BasicFile OplogFile; - OplogFile.Open(Project1OplogPath, BasicFile::Mode::kTruncate); - } - - std::vector<std::pair<Oid, CompressedBuffer>> OpAttachments = CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}); - std::vector<IoHash> OpChunkHashes; - for (const auto& Chunk : OpAttachments) - { - OpChunkHashes.push_back(Chunk.second.DecodeRawHash()); - } - - { - Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, - "proj1"sv, - RootDir.string(), - EngineRootDir.string(), - Project1RootDir.string(), - Project1FilePath.string())); - Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); - } - { - Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); - Project1->DeleteOplog("oplog1"sv); - } - - // Equivalent of a `prep` existance check call - for (auto Attachment : OpAttachments) - { - CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); - } - - { - GcSettings Settings = {.CacheExpireTime = GcClock::Now(), - .ProjectStoreExpireTime = GcClock::Now(), - .CollectSmallObjects = true, - .IsDeleteMode = true}; - GcResult Result = Gc.CollectGarbage(Settings); - } - - // If a gc comes in between our prep and op write the chunks will be removed - for (auto Attachment : OpAttachments) - { - CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); - } - - { - // Make sure the chunks are stored but not the referencing op - Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); - Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); - Project1->DeleteOplog("oplog1"sv); - } - { - Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); - Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); - - // Equivalent of a `prep` call with tracking of ops - CHECK(Oplog->CheckPendingChunkReferences(OpChunkHashes, std::chrono::hours(1)).empty()); - } - - for (auto Attachment : OpAttachments) - { - CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); - } - - { - GcSettings Settings = {.CacheExpireTime = GcClock::Now(), - .ProjectStoreExpireTime = GcClock::Now(), - .CollectSmallObjects = true, - .IsDeleteMode = true}; - GcResult Result = Gc.CollectGarbage(Settings); - } - - // Attachments should now be retained - for (auto Attachment : OpAttachments) - { - CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); - } - - { - GcSettings Settings = {.CacheExpireTime = GcClock::Now(), - .ProjectStoreExpireTime = GcClock::Now(), - .CollectSmallObjects = true, - .IsDeleteMode = true}; - GcResult Result = Gc.CollectGarbage(Settings); - } - - // Attachments should now be retained across multiple GCs if retain time is still valud - for (auto Attachment : OpAttachments) - { - CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); - } - - { - Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); - Ref<ProjectStore::Oplog> Oplog = Project1->OpenOplog("oplog1"sv, true, true); - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); - Oplog->RemovePendingChunkReferences(OpChunkHashes); - CHECK(Oplog->GetPendingChunkReferencesLocked().size() == 0); - } - for (auto Attachment : OpAttachments) - { - CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); - } - { - Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); - Project1->DeleteOplog("oplog1"sv); - } - - { - GcSettings Settings = {.CacheExpireTime = GcClock::Now(), - .ProjectStoreExpireTime = GcClock::Now(), - .CollectSmallObjects = true, - .IsDeleteMode = true}; - GcResult Result = Gc.CollectGarbage(Settings); - } - - for (auto Attachment : OpAttachments) - { - CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); - } - - { - Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); - Project1->DeleteOplog("oplog1"sv); - } - { - // Make sure the chunks are stored but not the referencing op - Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); - Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); - Project1->DeleteOplog("oplog1"sv); - } - - // Caution - putting breakpoints and stepping through this part of the test likely makes it fails due to expiry time of pending - // chunks - { - Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); - Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); - - CHECK(Oplog->CheckPendingChunkReferences(OpChunkHashes, std::chrono::milliseconds(100)).empty()); - } - - // This pass they should be retained and while the ops are picked up in GC we are blocked from adding our op - { - GcSettings Settings = {.CacheExpireTime = GcClock::Now(), - .ProjectStoreExpireTime = GcClock::Now(), - .CollectSmallObjects = true, - .IsDeleteMode = true}; - GcResult Result = Gc.CollectGarbage(Settings); - } - for (auto Attachment : OpAttachments) - { - CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); - } - - Sleep(200); - // This pass they should also be retained since our age retention has kept them alive and they will now be picked up and the - // retention cleared - { - GcSettings Settings = {.CacheExpireTime = GcClock::Now(), - .ProjectStoreExpireTime = GcClock::Now(), - .CollectSmallObjects = true, - .IsDeleteMode = true}; - GcResult Result = Gc.CollectGarbage(Settings); - } - for (auto Attachment : OpAttachments) - { - CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); - } - - // This pass the retention time has expired and the last GC pass cleared the entries - { - GcSettings Settings = {.CacheExpireTime = GcClock::Now(), - .ProjectStoreExpireTime = GcClock::Now(), - .CollectSmallObjects = true, - .IsDeleteMode = true}; - GcResult Result = Gc.CollectGarbage(Settings); - } - - for (auto Attachment : OpAttachments) - { - CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); - } -} - -TEST_CASE("project.store.rpc.getchunks") -{ - using namespace std::literals; - using namespace testutils; - - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - CidStore CidStore(Gc); - CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); - - std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; - ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); - std::filesystem::path RootDir = TempDir.Path() / "root"sv; - std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; - - std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv; - std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv; - { - CreateDirectories(Project1FilePath.parent_path()); - BasicFile ProjectFile; - ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); - } - - std::vector<Oid> OpIds; - OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()}); - std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; - Oid FilesOpId = Oid::NewOid(); - std::vector<std::pair<Oid, std::filesystem::path>> FilesOpIdAttachments; - { - Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, - "proj1"sv, - RootDir.string(), - EngineRootDir.string(), - Project1RootDir.string(), - Project1FilePath.string())); - Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, {}); - CHECK(Oplog); - Attachments[OpIds[0]] = {}; - Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77}); - Attachments[OpIds[2]] = - CreateAttachments(std::initializer_list<size_t>{200 * 1024, 314 * 1024, 690, 99}, OodleCompressionLevel::VeryFast, 128 * 1024); - Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122}); - for (auto It : Attachments) - { - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(It.first, It.second)); - } - - std::filesystem::path UncompressedFilePath = RootDir / "content" / "uncompressed_file"; - CreateDirectories(UncompressedFilePath.parent_path()); - IoBuffer FileBlob = CreateRandomBlob(81823 * 2); - WriteFile(UncompressedFilePath, FileBlob); - FilesOpIdAttachments.push_back({Oid::NewOid(), UncompressedFilePath}); - Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(FilesOpId, RootDir, FilesOpIdAttachments)); - } - - auto GetChunks = [](zen::ProjectStore& ProjectStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, const CbObject& Cb) { - std::vector<ProjectStore::ChunkRequest> Requests = ProjectStore.ParseChunksRequests(Project, Oplog, Cb); - std::vector<ProjectStore::ChunkResult> Results = - Requests.empty() ? std::vector<ProjectStore::ChunkResult>{} : ProjectStore.GetChunks(Project, Oplog, Requests); - return ProjectStore.WriteChunksRequestResponse(Project, Oplog, std::move(Requests), std::move(Results)); - }; - - Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); - CHECK(Project1); - Ref<ProjectStore::Oplog> Oplog1 = Project1->OpenOplog("oplog1"sv, false, true); - CHECK(Oplog1); - // Invalid request - { - CbObjectWriter Request; - Request.BeginObject("WrongName"sv); - Request.EndObject(); - CbPackage Response; - CHECK_THROWS(GetChunks(ProjectStore, *Project1, *Oplog1, Request.Save())); - } - - // Empty request - { - const CbPackage& Response = - GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, std::vector<IoHash>{}, {}, {})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(0, Chunks.Num()); - } - // Single non-existing chunk by RawHash - IoHash NotFoundIoHash = IoHash::Max; - { - const CbPackage& Response = - GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundIoHash}, {}, {})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(0, Chunks.Num()); - } - { - const CbPackage& Response = - GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundIoHash}, {}, {})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(0, Chunks.Num()); - } - // Single non-existing chunk by Id - Oid NotFoundId = Oid::NewOid(); - { - const CbPackage& Response = - GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundId}, {}, {})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(0, Chunks.Num()); - } - { - const CbPackage& Response = - GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundId}, {}, {})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(0, Chunks.Num()); - } - // Single existing chunk by RawHash - { - // Fresh fetch - IoHash FirstAttachmentHash = Attachments[OpIds[2]][1].second.DecodeRawHash(); - uint64_t ResponseModTag = 0; - { - const CbPackage& Response = GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {})); - - CHECK_EQ(1, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - IoHash Id = Chunk["Id"].AsHash(); - CHECK_EQ(FirstAttachmentHash, Id); - ResponseModTag = Chunk["ModTag"].AsUInt64(); - CHECK_NE(0, ResponseModTag); - IoHash AttachmentHash = Chunk["RawHash"].AsHash(); - const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); - CHECK_NE(nullptr, Attachment); - CompressedBuffer Buffer = Attachment->AsCompressedBinary(); - CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - // Fetch with matching ModTag - { - const CbPackage& Response = - GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {ResponseModTag})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - IoHash Id = Chunk["Id"].AsHash(); - CHECK_EQ(FirstAttachmentHash, Id); - CHECK(!Chunk.FindView("ModTag")); - CHECK(!Chunk.FindView("RawHash")); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - // Fetch with mismatching ModTag - { - const CbPackage& Response = - GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)})); - - CHECK_EQ(1, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - IoHash Id = Chunk["Id"].AsHash(); - CHECK_EQ(FirstAttachmentHash, Id); - ResponseModTag = Chunk["ModTag"].AsUInt64(); - CHECK_NE(0, ResponseModTag); - IoHash AttachmentHash = Chunk["RawHash"].AsHash(); - const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); - CHECK_NE(nullptr, Attachment); - CompressedBuffer Buffer = Attachment->AsCompressedBinary(); - CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - // Fresh modtime query - { - const CbPackage& Response = GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - IoHash Id = Chunk["Id"].AsHash(); - CHECK_EQ(FirstAttachmentHash, Id); - uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); - CHECK_EQ(ResponseModTag, ResponseModTag2); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - // Modtime query with matching ModTag - { - const CbPackage& Response = - GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {ResponseModTag})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - IoHash Id = Chunk["Id"].AsHash(); - CHECK_EQ(FirstAttachmentHash, Id); - CHECK(!Chunk.FindView("ModTag")); - CHECK(!Chunk.FindView("RawHash")); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - // Modtime query with mismatching ModTag - { - const CbPackage& Response = - GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - IoHash Id = Chunk["Id"].AsHash(); - CHECK_EQ(FirstAttachmentHash, Id); - uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); - CHECK_EQ(ResponseModTag, ResponseModTag2); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - } - // Single existing CID chunk by Id - { - Oid FirstAttachmentId = Attachments[OpIds[2]][1].first; - uint64_t ResponseModTag = 0; - { - // Full chunk request - const CbPackage& Response = - GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {})); - CHECK_EQ(1, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - Oid Id = Chunk["Id"].AsObjectId(); - CHECK_EQ(FirstAttachmentId, Id); - ResponseModTag = Chunk["ModTag"].AsUInt64(); - CHECK_NE(0, ResponseModTag); - IoHash AttachmentHash = Chunk["RawHash"].AsHash(); - const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); - CHECK_NE(nullptr, Attachment); - CompressedBuffer Buffer = Attachment->AsCompressedBinary(); - CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - { - // Partial chunk request - const CbPackage& Response = - GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{130 * 1024, 8100}}, {})); - CHECK_EQ(1, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - Oid Id = Chunk["Id"].AsObjectId(); - CHECK_EQ(FirstAttachmentId, Id); - ResponseModTag = Chunk["ModTag"].AsUInt64(); - CHECK_NE(0, ResponseModTag); - IoHash AttachmentHash = Chunk["FragmentHash"].AsHash(); - const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); - CHECK_NE(nullptr, Attachment); - std::uint64_t FragmentStart = Chunk["FragmentOffset"].AsUInt64(); - CompressedBuffer Buffer = Attachment->AsCompressedBinary(); - CHECK(FragmentStart <= 130 * 1024); - CHECK(FragmentStart + Buffer.DecodeRawSize() >= 130 * 1024 + 8100); - auto ResponseDecompressedBuffer = Buffer.Decompress(130 * 1024 - FragmentStart, 8100); - auto ExpectedDecompressedBuffer = Attachments[OpIds[2]][1].second.Decompress(130 * 1024, 8100); - CHECK(ResponseDecompressedBuffer.AsIoBuffer().GetView().EqualBytes(ExpectedDecompressedBuffer.AsIoBuffer().GetView())); - CHECK_EQ(Chunk["RawSize"sv].AsUInt64(), Attachments[OpIds[2]][1].second.DecodeRawSize()); - CHECK(!Chunk.FindView("Size")); - } - { - // Fetch with matching ModTag - const CbPackage& Response = - GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - Oid Id = Chunk["Id"].AsObjectId(); - CHECK_EQ(FirstAttachmentId, Id); - CHECK(!Chunk.FindView("ModTag")); - CHECK(!Chunk.FindView("RawHash")); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - { - // Fetch with mismatching ModTag - const CbPackage& Response = - GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)})); - CHECK_EQ(1, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - Oid Id = Chunk["Id"].AsObjectId(); - CHECK_EQ(FirstAttachmentId, Id); - uint64_t ResponseModTag3 = Chunk["ModTag"].AsUInt64(); - CHECK_EQ(ResponseModTag, ResponseModTag3); - IoHash AttachmentHash = Chunk["RawHash"].AsHash(); - const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); - CHECK_NE(nullptr, Attachment); - CompressedBuffer Buffer = Attachment->AsCompressedBinary(); - CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - // Fresh modtime query - { - const CbPackage& Response = - GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - Oid Id = Chunk["Id"].AsObjectId(); - CHECK_EQ(FirstAttachmentId, Id); - uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); - CHECK_EQ(ResponseModTag, ResponseModTag2); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - // Modtime query with matching ModTag - { - const CbPackage& Response = - GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - Oid Id = Chunk["Id"].AsObjectId(); - CHECK_EQ(FirstAttachmentId, Id); - CHECK(!Chunk.FindView("ModTag")); - CHECK(!Chunk.FindView("RawHash")); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - // Modtime query with mismatching ModTag - { - const CbPackage& Response = - GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - Oid Id = Chunk["Id"].AsObjectId(); - CHECK_EQ(FirstAttachmentId, Id); - uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); - CHECK_EQ(ResponseModTag, ResponseModTag2); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - } - - // Single existing file chunk by Id - { - Oid FirstAttachmentId = FilesOpIdAttachments[0].first; - uint64_t ResponseModTag = 0; - { - // Full chunk request - const CbPackage& Response = - GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {})); - CHECK_EQ(1, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - Oid Id = Chunk["Id"].AsObjectId(); - CHECK_EQ(FirstAttachmentId, Id); - ResponseModTag = Chunk["ModTag"].AsUInt64(); - CHECK_NE(0, ResponseModTag); - IoHash AttachmentHash = Chunk["Hash"].AsHash(); - const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); - CHECK_NE(nullptr, Attachment); - CompositeBuffer Buffer = Attachment->AsCompositeBinary(); - CHECK_EQ(IoHash::HashBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten()), IoHash::HashBuffer(Buffer)); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - { - // Partial chunk request - const CbPackage& Response = - GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{81823, 5434}}, {})); - CHECK_EQ(1, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - Oid Id = Chunk["Id"].AsObjectId(); - CHECK_EQ(FirstAttachmentId, Id); - ResponseModTag = Chunk["ModTag"].AsUInt64(); - CHECK_NE(0, ResponseModTag); - - IoHash AttachmentHash = Chunk["Hash"].AsHash(); - const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); - CHECK_NE(nullptr, Attachment); - CompositeBuffer Buffer = Attachment->AsCompositeBinary(); - CHECK_EQ(IoHash::HashBuffer(IoBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten(), 81823, 5434)), - IoHash::HashBuffer(Buffer)); - CHECK_EQ(Chunk["Size"sv].AsUInt64(), FileSizeFromPath(FilesOpIdAttachments[0].second)); - CHECK(!Chunk.FindView("RawSize")); - } - { - // Fetch with matching ModTag - const CbPackage& Response = - GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - Oid Id = Chunk["Id"].AsObjectId(); - CHECK_EQ(FirstAttachmentId, Id); - CHECK(!Chunk.FindView("ModTag")); - CHECK(!Chunk.FindView("Hash")); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - { - // Fetch with mismatching ModTag - const CbPackage& Response = - GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)})); - CHECK_EQ(1, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - Oid Id = Chunk["Id"].AsObjectId(); - CHECK_EQ(FirstAttachmentId, Id); - uint64_t ResponseModTag3 = Chunk["ModTag"].AsUInt64(); - CHECK_EQ(ResponseModTag, ResponseModTag3); - IoHash AttachmentHash = Chunk["Hash"].AsHash(); - const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); - CHECK_NE(nullptr, Attachment); - CompositeBuffer Buffer = Attachment->AsCompositeBinary(); - CHECK_EQ(IoHash::HashBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten()), IoHash::HashBuffer(Buffer)); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - // Fresh modtime query - { - const CbPackage& Response = - GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - Oid Id = Chunk["Id"].AsObjectId(); - CHECK_EQ(FirstAttachmentId, Id); - uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); - CHECK_EQ(ResponseModTag, ResponseModTag2); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - // Modtime query with matching ModTag - { - const CbPackage& Response = - GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - Oid Id = Chunk["Id"].AsObjectId(); - CHECK_EQ(FirstAttachmentId, Id); - CHECK(!Chunk.FindView("ModTag")); - CHECK(!Chunk.FindView("Hash")); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - // Modtime query with mismatching ModTag - { - const CbPackage& Response = - GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)})); - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(1, Chunks.Num()); - CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); - - Oid Id = Chunk["Id"].AsObjectId(); - CHECK_EQ(FirstAttachmentId, Id); - uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); - CHECK_EQ(ResponseModTag, ResponseModTag2); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - } - - // Multi RawHash Request - { - std::vector<CompressedBuffer> AttachmentBuffers{Attachments[OpIds[1]][0].second, - Attachments[OpIds[2]][0].second, - Attachments[OpIds[2]][1].second}; - std::vector<IoHash> AttachmentHashes{AttachmentBuffers[0].DecodeRawHash(), - AttachmentBuffers[1].DecodeRawHash(), - AttachmentBuffers[2].DecodeRawHash()}; - std::vector<uint64_t> ResponseModTags(3, 0); - { - // Fresh fetch - const CbPackage& Response = - GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, {})); - - CHECK_EQ(3, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(3, Chunks.Num()); - for (CbFieldView ChunkView : Chunks) - { - CbObjectView Chunk = ChunkView.AsObjectView(); - IoHash Id = Chunk["Id"].AsHash(); - - auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); - CHECK(It != AttachmentHashes.end()); - ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); - CHECK_EQ(AttachmentHashes[Index], Id); - ResponseModTags[Index] = Chunk["ModTag"].AsUInt64(); - CHECK_NE(0, ResponseModTags[Index]); - IoHash AttachmentHash = Chunk["RawHash"].AsHash(); - const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); - CHECK_NE(nullptr, Attachment); - CompressedBuffer Buffer = Attachment->AsCompressedBinary(); - CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); - CHECK(AttachmentBuffers[Index].GetCompressed().Flatten().GetView().EqualBytes(Buffer.GetCompressed().Flatten().GetView())); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - } - { - // Fetch with matching ModTag - const CbPackage& Response = GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, ResponseModTags)); - - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(3, Chunks.Num()); - for (CbFieldView ChunkView : Chunks) - { - CbObjectView Chunk = ChunkView.AsObjectView(); - IoHash Id = Chunk["Id"].AsHash(); - - auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); - CHECK(It != AttachmentHashes.end()); - ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); - CHECK_EQ(AttachmentHashes[Index], Id); - CHECK(!Chunk.FindView("ModTag")); - CHECK(!Chunk.FindView("RawHash")); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - } - { - // Fresh modtime query - const CbPackage& Response = - GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, {})); - - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(3, Chunks.Num()); - for (CbFieldView ChunkView : Chunks) - { - CbObjectView Chunk = ChunkView.AsObjectView(); - IoHash Id = Chunk["Id"].AsHash(); - - auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); - CHECK(It != AttachmentHashes.end()); - ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); - CHECK_EQ(AttachmentHashes[Index], Id); - CHECK_EQ(ResponseModTags[Index], Chunk["ModTag"].AsUInt64()); - CHECK(!Chunk.FindView("RawHash")); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - } - { - // Modtime query with matching ModTags - const CbPackage& Response = GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, ResponseModTags)); - - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(3, Chunks.Num()); - for (CbFieldView ChunkView : Chunks) - { - CbObjectView Chunk = ChunkView.AsObjectView(); - IoHash Id = Chunk["Id"].AsHash(); - - auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); - CHECK(It != AttachmentHashes.end()); - ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); - CHECK_EQ(AttachmentHashes[Index], Id); - CHECK(!Chunk.FindView("ModTag")); - CHECK(!Chunk.FindView("RawHash")); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - } - { - // Modtime query with mismatching ModTags - std::vector<uint64_t> MismatchingModTags(ResponseModTags); - for (uint64_t& Tag : MismatchingModTags) - { - Tag++; - } - const CbPackage& Response = - GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, MismatchingModTags)); - - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(3, Chunks.Num()); - for (CbFieldView ChunkView : Chunks) - { - CbObjectView Chunk = ChunkView.AsObjectView(); - IoHash Id = Chunk["Id"].AsHash(); - - auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); - CHECK(It != AttachmentHashes.end()); - ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); - CHECK_EQ(AttachmentHashes[Index], Id); - CHECK(Chunk["ModTag"].AsUInt64() == ResponseModTags[Index]); - CHECK(!Chunk.FindView("RawHash")); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - } - } - // Multi Id Request - { - std::vector<CompressedBuffer> AttachmentBuffers{Attachments[OpIds[1]][0].second, - Attachments[OpIds[2]][0].second, - Attachments[OpIds[2]][1].second}; - std::vector<IoHash> AttachmentHashes{AttachmentBuffers[0].DecodeRawHash(), - AttachmentBuffers[1].DecodeRawHash(), - AttachmentBuffers[2].DecodeRawHash()}; - std::vector<Oid> AttachedIds{Attachments[OpIds[1]][0].first, Attachments[OpIds[2]][0].first, Attachments[OpIds[2]][1].first}; - std::vector<uint64_t> ResponseModTags(3, 0); - { - // Fresh fetch - const CbPackage& Response = - GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, {})); - - CHECK_EQ(3, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(3, Chunks.Num()); - for (CbFieldView ChunkView : Chunks) - { - CbObjectView Chunk = ChunkView.AsObjectView(); - Oid Id = Chunk["Id"].AsObjectId(); - - auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); - CHECK(It != AttachedIds.end()); - ptrdiff_t Index = std::distance(AttachedIds.begin(), It); - CHECK_EQ(AttachedIds[Index], Id); - ResponseModTags[Index] = Chunk["ModTag"].AsUInt64(); - CHECK_NE(0, ResponseModTags[Index]); - IoHash AttachmentHash = Chunk["RawHash"].AsHash(); - const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); - CHECK_NE(nullptr, Attachment); - CompressedBuffer Buffer = Attachment->AsCompressedBinary(); - CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); - CHECK(AttachmentBuffers[Index].GetCompressed().Flatten().GetView().EqualBytes(Buffer.GetCompressed().Flatten().GetView())); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - } - { - // Fetch with matching ModTag - const CbPackage& Response = GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, ResponseModTags)); - - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(3, Chunks.Num()); - for (CbFieldView ChunkView : Chunks) - { - CbObjectView Chunk = ChunkView.AsObjectView(); - Oid Id = Chunk["Id"].AsObjectId(); - - auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); - CHECK(It != AttachedIds.end()); - ptrdiff_t Index = std::distance(AttachedIds.begin(), It); - CHECK_EQ(AttachedIds[Index], Id); - CHECK(!Chunk.FindView("ModTag")); - CHECK(!Chunk.FindView("RawHash")); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - } - { - // Fresh modtime query - const CbPackage& Response = - GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, {})); - - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(3, Chunks.Num()); - for (CbFieldView ChunkView : Chunks) - { - CbObjectView Chunk = ChunkView.AsObjectView(); - Oid Id = Chunk["Id"].AsObjectId(); - - auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); - CHECK(It != AttachedIds.end()); - ptrdiff_t Index = std::distance(AttachedIds.begin(), It); - CHECK_EQ(AttachedIds[Index], Id); - CHECK_EQ(ResponseModTags[Index], Chunk["ModTag"].AsUInt64()); - CHECK(!Chunk.FindView("RawHash")); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - } - { - // Modtime query with matching ModTags - const CbPackage& Response = GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, ResponseModTags)); - - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(3, Chunks.Num()); - for (CbFieldView ChunkView : Chunks) - { - CbObjectView Chunk = ChunkView.AsObjectView(); - Oid Id = Chunk["Id"].AsObjectId(); - - auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); - CHECK(It != AttachedIds.end()); - ptrdiff_t Index = std::distance(AttachedIds.begin(), It); - CHECK_EQ(AttachedIds[Index], Id); - CHECK(!Chunk.FindView("ModTag")); - CHECK(!Chunk.FindView("RawHash")); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - } - { - // Modtime query with mismatching ModTags - std::vector<uint64_t> MismatchingModTags(ResponseModTags); - for (uint64_t& Tag : MismatchingModTags) - { - Tag++; - } - const CbPackage& Response = GetChunks(ProjectStore, - *Project1, - *Oplog1, - testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, MismatchingModTags)); - - CHECK_EQ(0, Response.GetAttachments().size()); - CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); - CHECK_EQ(3, Chunks.Num()); - for (CbFieldView ChunkView : Chunks) - { - CbObjectView Chunk = ChunkView.AsObjectView(); - Oid Id = Chunk["Id"].AsObjectId(); - - auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); - CHECK(It != AttachedIds.end()); - ptrdiff_t Index = std::distance(AttachedIds.begin(), It); - CHECK_EQ(AttachedIds[Index], Id); - CHECK(Chunk["ModTag"].AsUInt64() == ResponseModTags[Index]); - CHECK(!Chunk.FindView("RawHash")); - CHECK(!Chunk.FindView("Size")); - CHECK(!Chunk.FindView("RawSize")); - } - } - } -} - -TEST_CASE("project.store.partial.read") -{ - using namespace std::literals; - using namespace testutils; - - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - CidStore CidStore(Gc); - CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); - - std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; - ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); - std::filesystem::path RootDir = TempDir.Path() / "root"sv; - std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; - - std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv; - std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv; - { - CreateDirectories(Project1FilePath.parent_path()); - BasicFile ProjectFile; - ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); - } - - std::vector<Oid> OpIds; - OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()}); - std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; - { - Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, - "proj1"sv, - RootDir.string(), - EngineRootDir.string(), - Project1RootDir.string(), - Project1FilePath.string())); - Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, {}); - CHECK(Oplog); - Attachments[OpIds[0]] = {}; - Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77}); - Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{7123, 9583, 690, 99}); - Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122}); - for (auto It : Attachments) - { - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(It.first, It.second)); - } - } - Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); - CHECK(Project1); - Ref<ProjectStore::Oplog> Oplog1 = Project1->OpenOplog("oplog1"sv, false, true); - CHECK(Oplog1); - { - uint64_t ModificationTag = 0; - - auto Result = ProjectStore.GetChunkRange(Log(), - *Project1, - *Oplog1, - Attachments[OpIds[1]][0].first, - 0, - ~0ull, - ZenContentType::kCompressedBinary, - &ModificationTag); - - CHECK_EQ(ProjectStore::GetChunkRangeResult::EError::Ok, Result.Error); - - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Attachment = CompressedBuffer::FromCompressed(Result.Chunk, RawHash, RawSize); - CHECK(RawSize == Attachments[OpIds[1]][0].second.DecodeRawSize()); - - auto Result2 = ProjectStore.GetChunkRange(Log(), - *Project1, - *Oplog1, - Attachments[OpIds[1]][0].first, - 0, - ~0ull, - ZenContentType::kCompressedBinary, - &ModificationTag); - CHECK_EQ(ProjectStore::GetChunkRangeResult::EError::NotModified, Result2.Error); - } - - { - uint64_t FullChunkModificationTag = 0; - { - auto Result = ProjectStore.GetChunkRange(Log(), - *Project1, - *Oplog1, - Attachments[OpIds[2]][1].first, - 0, - ~0ull, - ZenContentType::kCompressedBinary, - &FullChunkModificationTag); - CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::Ok); - CHECK(Result.Chunk); - CHECK(CompressedBuffer::FromCompressedNoValidate(std::move(Result.Chunk)).DecodeRawSize() == - Attachments[OpIds[2]][1].second.DecodeRawSize()); - } - { - auto Result = ProjectStore.GetChunkRange(Log(), - *Project1, - *Oplog1, - Attachments[OpIds[2]][1].first, - 0, - ~0ull, - ZenContentType::kCompressedBinary, - &FullChunkModificationTag); - CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::NotModified); - } - } - { - uint64_t PartialChunkModificationTag = 0; - { - auto Result = ProjectStore.GetChunkRange(Log(), - *Project1, - *Oplog1, - Attachments[OpIds[2]][1].first, - 5, - 1773, - ZenContentType::kCompressedBinary, - &PartialChunkModificationTag); - CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::Ok); - - IoHash PartialRawHash; - uint64_t PartialRawSize; - CompressedBuffer PartialCompressedResult = CompressedBuffer::FromCompressed(Result.Chunk, PartialRawHash, PartialRawSize); - CHECK(PartialRawSize >= 1773); - - uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5); - SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed); - SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress(); - const uint8_t* FullDataPtr = &(reinterpret_cast<const uint8_t*>(FullDecompressed.GetView().GetData())[5]); - const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData()); - CHECK(FullDataPtr[0] == PartialDataPtr[0]); - } - - { - auto Result = ProjectStore.GetChunkRange(Log(), - *Project1, - *Oplog1, - Attachments[OpIds[2]][1].first, - 0, - 1773, - ZenContentType::kCompressedBinary, - &PartialChunkModificationTag); - CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::NotModified); - } - } -} - -TEST_CASE("project.store.block") -{ - using namespace std::literals; - using namespace testutils; - - std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, 2257, 3685, 3489, - 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251, - 491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225}); - - std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes); - std::vector<std::pair<IoHash, FetchChunkFunc>> Chunks; - Chunks.reserve(AttachmentSizes.size()); - for (const auto& It : AttachmentsWithId) - { - Chunks.push_back( - std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { - return {Buffer.DecodeRawSize(), Buffer}; - })); - } - ChunkBlockDescription Block; - CompressedBuffer BlockBuffer = GenerateChunkBlock(std::move(Chunks), Block); - uint64_t HeaderSize; - CHECK(IterateChunkBlock( - BlockBuffer.Decompress(), - [](CompressedBuffer&&, const IoHash&) {}, - HeaderSize)); -} - -TEST_CASE("project.store.iterateoplog") -{ - using namespace std::literals; - using namespace testutils; - - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - CidStore CidStore(Gc); - CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); - - std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; - ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); - std::filesystem::path RootDir = TempDir.Path() / "root"sv; - std::filesystem::path EngineRootDir = TempDir.Path() / "enginesv"; - - std::filesystem::path ProjectRootDir = TempDir.Path() / "game"sv; - std::filesystem::path ProjectFilePath = TempDir.Path() / "game"sv / "game.uproject"sv; - { - CreateDirectories(ProjectFilePath.parent_path()); - BasicFile ProjectFile; - ProjectFile.Open(ProjectFilePath, BasicFile::Mode::kTruncate); - } - std::filesystem::path ProjectOplogPath = TempDir.Path() / "game"sv / "saves"sv / "cooked"sv / ".projectstore"sv; - { - CreateDirectories(ProjectOplogPath.parent_path()); - BasicFile OplogFile; - OplogFile.Open(ProjectOplogPath, BasicFile::Mode::kTruncate); - } - - Ref<ProjectStore::Project> TestProject(ProjectStore.NewProject(BasePath / "proj"sv, - "proj"sv, - RootDir.string(), - EngineRootDir.string(), - ProjectRootDir.string(), - ProjectFilePath.string())); - Ref<ProjectStore::Oplog> Oplog = TestProject->NewOplog("oplog"sv, ProjectOplogPath); - CHECK(Oplog); - - struct TestOidData - { - Oid KeyAsOidNotOplogId = Oid::NewOid(); - std::string Key = KeyAsOidNotOplogId.ToString(); - bool bFound = false; - }; - constexpr int NumTestOids = 4; - TestOidData TestOids[NumTestOids]; - for (const TestOidData& TestOid : TestOids) - { - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(TestOid.KeyAsOidNotOplogId, {})); - } - int Count = 0; - - auto ResetTest = [&Count, &TestOids]() { - Count = 0; - for (TestOidData& TestOid : TestOids) - { - TestOid.bFound = false; - } - }; - auto IncrementCount = [&Count](CbObjectView /* Op */) { ++Count; }; - auto MarkFound = [&TestOids, &Count](ProjectStore::LogSequenceNumber /* LSN */, const Oid& /* InId */, CbObjectView Op) { - for (TestOidData& TestOid : TestOids) - { - if (Op["key"sv].AsString() == TestOid.Key) - { - TestOid.bFound = true; - ++Count; - } - } - }; - - // Tests of IterateOpLog and IterateOplogWithKey, with various Paging arguments - { - ResetTest(); - Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{}); - CHECK(Count == NumTestOids); - ResetTest(); - Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{}); - CHECK(Count == NumTestOids); - ResetTest(); - Oplog->IterateOplogWithKey(MarkFound); - CHECK(Count == NumTestOids); - - Count = 0; - for (int Start = 0; Start < NumTestOids; ++Start) - { - for (int Size = 0; Size < NumTestOids - Start; ++Size) - { - ResetTest(); - Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{Start, Size}); - CHECK(Count == Size); - ResetTest(); - Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{Start, Size}); - CHECK(Count == Size); - } - - // Out of range Size arguments - ResetTest(); - Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{Start, -1}); - CHECK(Count == NumTestOids - Start); - ResetTest(); - Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{Start, NumTestOids * 2}); - CHECK(Count == NumTestOids - Start); - } - - // Out of range Start arguments - for (int Size = 0; Size < NumTestOids; ++Size) - { - ResetTest(); - Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{-1, Size}); - CHECK(Count == Size); - ResetTest(); - Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{-1, Size}); - CHECK(Count == Size); - - ResetTest(); - Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{NumTestOids, Size}); - CHECK(Count == 0); - ResetTest(); - Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{NumTestOids, Size}); - CHECK(Count == 0); - } - // Out of range Start and Size arguments - ResetTest(); - Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{-1, -1}); - CHECK(Count == NumTestOids); - ResetTest(); - Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{-1, -1}); - CHECK(Count == NumTestOids); - - ResetTest(); - Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{-1, 2 * NumTestOids}); - CHECK(Count == NumTestOids); - ResetTest(); - Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{-1, 2 * NumTestOids}); - CHECK(Count == NumTestOids); - - ResetTest(); - Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{NumTestOids, -1}); - CHECK(Count == 0); - ResetTest(); - Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{NumTestOids, -1}); - CHECK(Count == 0); - - ResetTest(); - Oplog->IterateOplog(IncrementCount, ProjectStore::Oplog::Paging{NumTestOids, 2 * NumTestOids}); - CHECK(Count == 0); - ResetTest(); - Oplog->IterateOplogWithKey(MarkFound, ProjectStore::Oplog::Paging{NumTestOids, 2 * NumTestOids}); - CHECK(Count == 0); - } -} - -#endif - -void -prj_forcelink() -{ -} - -} // namespace zen diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h deleted file mode 100644 index 258be5930..000000000 --- a/src/zenserver/projectstore/projectstore.h +++ /dev/null @@ -1,583 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compositebuffer.h> -#include <zencore/uid.h> -#include <zencore/xxhash.h> -#include <zenstore/gc.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <tsl/robin_map.h> -ZEN_THIRD_PARTY_INCLUDES_END - -#include <map> -#include <variant> - -namespace zen { - -class CidStore; -class AuthMgr; -class ScrubContext; - -/** Project Store - - A project store consists of a number of Projects. - - Each project contains a number of oplogs (short for "operation log"). UE uses - one oplog per target platform to store the output of the cook process. - - An oplog consists of a sequence of "op" entries. Each entry is a structured object - containing references to attachments. Attachments are typically the serialized - package data split into separate chunks for bulk data, exports and header - information. - */ -class ProjectStore : public RefCounted, public GcStorage, public GcReferencer, public GcReferenceLocker -{ - struct OplogStorage; - -public: - struct Configuration - { - }; - - ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, const Configuration& Config); - ~ProjectStore(); - - struct LogSequenceNumber - { - uint32_t Number = 0u; - - operator bool() const { return Number != 0u; }; - LogSequenceNumber() = default; - explicit LogSequenceNumber(size_t InNumber) : Number(uint32_t(InNumber)) {} - operator size_t() const { return Number; }; - inline auto operator<=>(const LogSequenceNumber& Other) const = default; - - struct Hasher - { - size_t operator()(const LogSequenceNumber& v) const { return std::hash<uint32_t>()(v.Number); } - }; - }; - - template<class V> - using LsnMap = tsl::robin_map<LogSequenceNumber, V, LogSequenceNumber::Hasher>; - - struct OplogEntryAddress - { - uint32_t Offset; // note: Multiple of m_OpsAlign! - uint32_t Size; - }; - - struct OplogEntry - { - LogSequenceNumber OpLsn; - OplogEntryAddress OpCoreAddress; - uint32_t OpCoreHash; // Used as checksum - Oid OpKeyHash; - uint32_t Reserved; - - inline bool IsTombstone() const { return OpCoreAddress.Offset == 0 && OpCoreAddress.Size == 0 && OpLsn.Number; } - inline void MakeTombstone() - { - OpLsn = {}; - OpCoreAddress.Offset = OpCoreAddress.Size = OpCoreHash = Reserved = 0; - } - }; - - static_assert(IsPow2(sizeof(OplogEntry))); - - struct Oplog : public RefCounted - { - enum class EMode - { - kBasicReadOnly, - kFull - }; - - Oplog(const LoggerRef& Log, - std::string_view ProjectIdentifier, - std::string_view Id, - CidStore& Store, - const std::filesystem::path& BasePath, - const std::filesystem::path& MarkerPath, - EMode State); - ~Oplog(); - - [[nodiscard]] static bool ExistsAt(const std::filesystem::path& BasePath); - bool Exists() const; - - void Read(); - void Write(); - void Update(const std::filesystem::path& MarkerPath); - bool Reset(); - bool CanUnload(); - - struct ChunkInfo - { - Oid ChunkId; - uint64_t ChunkSize; - }; - - struct Paging - { - int32_t Start = -1; - int32_t Count = -1; - }; - - std::vector<ChunkInfo> GetAllChunksInfo(const std::filesystem::path& ProjectRootDir); - void IterateChunkMap(std::function<void(const Oid&, const IoHash& Hash)>&& Fn); - void IterateFileMap(std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn); - void IterateOplog(std::function<void(CbObjectView)>&& Fn, const Paging& EntryPaging); - void IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Fn); - void IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Fn, const Paging& EntryPaging); - void IterateOplogLocked(std::function<void(CbObjectView)>&& Fn, const Paging& EntryPaging); - size_t GetOplogEntryCount() const; - - std::optional<CbObject> GetOpByKey(const Oid& Key); - std::optional<CbObject> GetOpByIndex(LogSequenceNumber Index); - LogSequenceNumber GetOpIndexByKey(const Oid& Key); - - IoBuffer FindChunk(const std::filesystem::path& ProjectRootDir, const Oid& ChunkId, uint64_t* OptOutModificationTag); - IoBuffer GetChunkByRawHash(const IoHash& RawHash); - bool IterateChunks(std::span<IoHash> RawHashes, - bool IncludeModTag, - const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool, - uint64_t LargeSizeLimit); - bool IterateChunks(const std::filesystem::path& ProjectRootDir, - std::span<Oid> ChunkIds, - bool IncludeModTag, - const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool, - uint64_t LargeSizeLimit); - - /** Persist a new oplog entry - * - * Returns the oplog LSN assigned to the new entry, or an invalid number if the entry is rejected - */ - LogSequenceNumber AppendNewOplogEntry(CbPackage Op); - LogSequenceNumber AppendNewOplogEntry(CbObjectView Core); - std::vector<LogSequenceNumber> AppendNewOplogEntries(std::span<CbObjectView> Cores); - - const std::string& OplogId() const { return m_OplogId; } - - const std::filesystem::path& TempPath() const { return m_TempPath; } - const std::filesystem::path& MarkerPath() const { return m_MarkerPath; } - - LoggerRef Log() const { return m_Log; } - void Flush(); - void Scrub(ScrubContext& Ctx); - static uint64_t TotalSize(const std::filesystem::path& BasePath); - uint64_t TotalSize() const; - - std::size_t OplogCount() const - { - RwLock::SharedLockScope _(m_OplogLock); - return m_OpToPayloadOffsetMap.size(); - } - - void ResetState(); - bool PrepareForDelete(std::filesystem::path& OutRemoveDirectory); - - void EnableUpdateCapture(); - void DisableUpdateCapture(); - void CaptureAddedAttachments(std::span<const IoHash> AttachmentHashes); - std::vector<IoHash> GetCapturedAttachmentsLocked(); - std::vector<IoHash> CheckPendingChunkReferences(std::span<const IoHash> ChunkHashes, const GcClock::Duration& RetainTime); - void RemovePendingChunkReferences(std::span<const IoHash> ChunkHashes); - std::vector<IoHash> GetPendingChunkReferencesLocked(); - - RwLock::SharedLockScope GetGcReferencerLock() { return RwLock::SharedLockScope(m_OplogLock); } - - uint32_t GetUnusedSpacePercent() const; - void Compact(bool DryRun, bool RetainLSNs, std::string_view LogPrefix); - - void GetAttachmentsLocked(std::vector<IoHash>& OutAttachments, bool StoreMetaDataOnDisk); - - std::string_view GetOuterProjectIdentifier() const { return m_OuterProjectId; } - void CompactIfUnusedExceeds(bool DryRun, uint32_t CompactUnusedThreshold, std::string_view LogPrefix); - - static std::optional<CbObject> ReadStateFile(const std::filesystem::path& BasePath, std::function<LoggerRef()>&& Log); - - struct ChunkMapping - { - Oid Id; - IoHash Hash; - }; - - struct FileMapping - { - Oid Id; - IoHash Hash; // This is either zero or a cid - std::string ServerPath; // If Hash is valid then this should be empty - std::string ClientPath; - }; - - struct ValidationResult - { - uint32_t OpCount = 0; - LogSequenceNumber LSNLow; - LogSequenceNumber LSNHigh; - std::vector<std::pair<Oid, FileMapping>> MissingFiles; - std::vector<std::pair<Oid, ChunkMapping>> MissingChunks; - std::vector<std::pair<Oid, ChunkMapping>> MissingMetas; - std::vector<std::pair<Oid, IoHash>> MissingAttachments; - std::vector<std::pair<Oid, std::string>> OpKeys; - - bool IsEmpty() const - { - return MissingFiles.empty() && MissingChunks.empty() && MissingMetas.empty() && MissingAttachments.empty(); - } - }; - - ValidationResult Validate(const std::filesystem::path& ProjectRootDir, - std::atomic_bool& IsCancelledFlag, - WorkerThreadPool* OptionalWorkerPool); - - private: - struct FileMapEntry - { - std::string ServerPath; - std::string ClientPath; - }; - - template<class V> - using OidMap = tsl::robin_map<Oid, V, Oid::Hasher>; - - LoggerRef m_Log; - const std::string m_OuterProjectId; - const std::string m_OplogId; - CidStore& m_CidStore; - const std::filesystem::path m_BasePath; - std::filesystem::path m_MarkerPath; - const std::filesystem::path m_TempPath; - const std::filesystem::path m_MetaPath; - - const EMode m_Mode; - - mutable RwLock m_OplogLock; - OidMap<IoHash> m_ChunkMap; // output data chunk id -> CAS address - OidMap<IoHash> m_MetaMap; // meta chunk id -> CAS address - OidMap<FileMapEntry> m_FileMap; // file id -> file map entry - int32_t m_ManifestVersion; // File system manifest version - - struct PayloadIndex - { - uint32_t Index = std::numeric_limits<uint32_t>::max(); - - operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); }; - PayloadIndex() = default; - explicit PayloadIndex(size_t InIndex) : Index(uint32_t(InIndex)) {} - operator size_t() const { return Index; }; - inline auto operator<=>(const PayloadIndex& Other) const = default; - - struct Hasher - { - size_t operator()(const PayloadIndex& v) const { return std::hash<uint32_t>()(v.Index); } - }; - }; - - struct OplogPayload - { - LogSequenceNumber Lsn; - OplogEntryAddress Address; - }; - - OidMap<PayloadIndex> m_OpToPayloadOffsetMap; - std::vector<OplogPayload> m_OpLogPayloads; - std::unique_ptr<LsnMap<PayloadIndex>> m_LsnToPayloadOffsetMap; - - std::atomic<bool> m_MetaValid = false; - - uint32_t m_UpdateCaptureRefCounter = 0; - std::unique_ptr<std::vector<Oid>> m_CapturedOps; - std::unique_ptr<std::vector<IoHash>> m_CapturedAttachments; - std::unordered_set<IoHash, IoHash::Hasher> m_PendingPrepOpAttachments; - GcClock::TimePoint m_PendingPrepOpAttachmentsRetainEnd; - - RefPtr<OplogStorage> m_Storage; - uint64_t m_LogFlushPosition = 0; - bool m_IsLegacySnapshot = false; - - RefPtr<OplogStorage> GetStorage(); - - /** Scan oplog and register each entry, thus updating the in-memory tracking tables - */ - uint32_t GetUnusedSpacePercentLocked() const; - void WriteIndexSnapshot(); - void ReadIndexSnapshot(); - void RefreshLsnToPayloadOffsetMap(RwLock::ExclusiveLockScope&); - - struct OplogEntryMapping - { - std::vector<ChunkMapping> Chunks; - std::vector<ChunkMapping> Meta; - std::vector<FileMapping> Files; - }; - - OplogEntryMapping GetMapping(CbObjectView Core); - - /** Update tracking metadata for a new oplog entry - * - * This is used during replay (and gets called as part of new op append) - * - * Returns the oplog LSN assigned to the new entry, or kInvalidOp if the entry is rejected - */ - LogSequenceNumber RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, - const OplogEntryMapping& OpMapping, - const OplogEntry& OpEntry); - - void AddFileMapping(const RwLock::ExclusiveLockScope& OplogLock, - const Oid& FileId, - const IoHash& Hash, - std::string_view ServerPath, - std::string_view ClientPath); - void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash); - void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash); - void Compact(RwLock::ExclusiveLockScope& Lock, bool DryRun, bool RetainLSNs, std::string_view LogPrefix); - void IterateCapturedOpsLocked(std::function<bool(const Oid& Key, LogSequenceNumber LSN, const CbObjectView& UpdateOp)>&& Callback); - std::vector<PayloadIndex> GetSortedOpPayloadRangeLocked( - const Paging& EntryPaging, - tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher>* OutOptionalReverseKeyMap); - - friend class ProjectStoreOplogReferenceChecker; - friend class ProjectStoreReferenceChecker; - friend class ProjectStoreOplogReferenceValidator; - friend struct OplogStorage; - }; - - struct Project : public RefCounted - { - std::string Identifier; - std::filesystem::path RootDir; - std::filesystem::path EngineRootDir; - std::filesystem::path ProjectRootDir; - std::filesystem::path ProjectFilePath; - - Ref<Oplog> NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath); - Ref<Oplog> OpenOplog(std::string_view OplogId, bool AllowCompact, bool VerifyPathOnDisk); - Ref<Oplog> ReadOplog(std::string_view OplogId); - bool TryUnloadOplog(std::string_view OplogId); - bool DeleteOplog(std::string_view OplogId); - bool RemoveOplog(std::string_view OplogId, std::filesystem::path& OutDeletePath); - void IterateOplogs(std::function<void(const RwLock::SharedLockScope&, const Oplog&)>&& Fn) const; - void IterateOplogs(std::function<void(const RwLock::SharedLockScope&, Oplog&)>&& Fn); - std::vector<std::string> ScanForOplogs() const; - bool IsExpired(const GcClock::TimePoint ExpireTime) const; - bool IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog) const; - bool IsExpired(const GcClock::TimePoint ExpireTime, std::string_view OplogId) const; - bool IsOplogTouchedSince(const GcClock::TimePoint TouchTime, std::string_view Oplog) const; - void TouchProject(); - void TouchOplog(std::string_view Oplog); - GcClock::TimePoint LastOplogAccessTime(std::string_view Oplog) const; - - Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath); - virtual ~Project(); - - void Read(); - void Write(); - [[nodiscard]] static bool Exists(const std::filesystem::path& BasePath); - void Flush(); - void Scrub(ScrubContext& Ctx); - LoggerRef Log() const; - static uint64_t TotalSize(const std::filesystem::path& BasePath); - uint64_t TotalSize() const; - bool PrepareForDelete(std::filesystem::path& OutDeletePath); - - void EnableUpdateCapture(); - void DisableUpdateCapture(); - std::vector<std::string> GetCapturedOplogsLocked(); - - std::vector<RwLock::SharedLockScope> GetGcReferencerLocks(); - - void AddOplogToCompact(std::string_view OplogId) - { - m_OplogsToCompactLock.WithExclusiveLock([&]() { m_OplogsToCompact.insert(std::string(OplogId)); }); - } - std::vector<std::string> GetOplogsToCompact() - { - std::vector<std::string> Result; - m_OplogsToCompactLock.WithExclusiveLock([&]() { - Result.reserve(m_OplogsToCompact.size()); - Result.insert(Result.end(), m_OplogsToCompact.begin(), m_OplogsToCompact.end()); - m_OplogsToCompact.clear(); - }); - return Result; - } - - private: - ProjectStore* m_ProjectStore; - CidStore& m_CidStore; - mutable RwLock m_ProjectLock; - std::map<std::string, Ref<Oplog>> m_Oplogs; - std::filesystem::path m_OplogStoragePath; - mutable RwLock m_LastAccessTimesLock; - mutable tsl::robin_map<std::string, GcClock::Tick> m_LastAccessTimes; - uint32_t m_UpdateCaptureRefCounter = 0; - std::unique_ptr<std::vector<std::string>> m_CapturedOplogs; - - RwLock m_OplogsToCompactLock; - std::unordered_set<std::string> m_OplogsToCompact; - - std::filesystem::path BasePathForOplog(std::string_view OplogId) const; - bool IsExpired(const std::string& EntryName, const std::filesystem::path& MarkerPath, const GcClock::TimePoint ExpireTime) const; - void WriteAccessTimes(); - void ReadAccessTimes(); - - friend class ProjectStoreOplogReferenceChecker; - friend class ProjectStoreReferenceChecker; - friend class ProjectStoreOplogReferenceValidator; - friend class ProjectStoreGcStoreCompactor; - }; - - Ref<Project> OpenProject(std::string_view ProjectId); - Ref<Project> NewProject(const std::filesystem::path& BasePath, - std::string_view ProjectId, - const std::filesystem::path& RootDir, - const std::filesystem::path& EngineRootDir, - const std::filesystem::path& ProjectRootDir, - const std::filesystem::path& ProjectFilePath); - bool UpdateProject(std::string_view ProjectId, - const std::filesystem::path& RootDir, - const std::filesystem::path& EngineRootDir, - const std::filesystem::path& ProjectRootDir, - const std::filesystem::path& ProjectFilePath); - bool RemoveProject(std::string_view ProjectId, std::filesystem::path& OutDeletePath); - bool DeleteProject(std::string_view ProjectId); - bool Exists(std::string_view ProjectId); - void Flush(); - void DiscoverProjects(); - void IterateProjects(std::function<void(Project& Prj)>&& Fn); - - LoggerRef Log() { return m_Log; } - const std::filesystem::path& BasePath() const { return m_ProjectBasePath; } - - // GcStorage - virtual void ScrubStorage(ScrubContext& Ctx) override; - virtual GcStorageSize StorageSize() const override; - - virtual std::string GetGcName(GcCtx& Ctx) override; - virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; - virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; - virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override; - - virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) override; - - CbArray GetProjectsList(); - static CbObject GetProjectFiles(LoggerRef InLog, - Project& Project, - Oplog& Oplog, - const std::unordered_set<std::string>& WantedFieldNames); - - static CbObject GetProjectChunkInfos(LoggerRef InLog, - Project& Project, - Oplog& Oplog, - const std::unordered_set<std::string>& WantedFieldNames); - static CbObject GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, const Oid& ChunkId); - struct GetChunkRangeResult - { - enum class EError : uint8_t - { - Ok, - NotFound, - NotModified, - MalformedContent, - OutOfRange - }; - EError Error = EError(-1); - std::string ErrorDescription; - CompositeBuffer Chunk = CompositeBuffer(); - IoHash RawHash = IoHash::Zero; - uint64_t RawSize = 0; - ZenContentType ContentType = ZenContentType::kUnknownContentType; - }; - static GetChunkRangeResult GetChunkRange(LoggerRef InLog, - Project& Project, - Oplog& Oplog, - const Oid& ChunkId, - uint64_t Offset, - uint64_t Size, - ZenContentType AcceptType, - uint64_t* OptionalInOutModificationTag); - IoBuffer GetChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash); - - IoBuffer GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const Oid& ChunkId); - - IoBuffer GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const IoHash& Cid); - - bool PutChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash, IoBuffer&& Chunk); - - struct ChunkRequest - { - uint64_t Offset = 0; - uint64_t Size = (uint64_t)-1; - std::variant<IoHash, Oid> Id; - std::optional<uint64_t> ModTag; - bool SkipData = false; - }; - struct ChunkResult - { - bool Exists = false; - IoBuffer ChunkBuffer; - uint64_t ModTag = 0; - }; - std::vector<ChunkResult> GetChunks(Project& Project, Oplog& Oplog, std::span<const ChunkRequest> Requests); - - std::vector<ProjectStore::ChunkRequest> ParseChunksRequests(Project& Project, Oplog& Oplog, const CbObject& Cb); - CbPackage WriteChunksRequestResponse(Project& Project, - Oplog& Oplog, - std::vector<ChunkRequest>&& Requests, - std::vector<ChunkResult>&& Results); - - bool AreDiskWritesAllowed() const; - - void EnableUpdateCapture(); - void DisableUpdateCapture(); - std::vector<std::string> GetCapturedProjectsLocked(); - -private: - LoggerRef m_Log; - GcManager& m_Gc; - CidStore& m_CidStore; - std::filesystem::path m_ProjectBasePath; - const Configuration m_Config; - mutable RwLock m_ProjectsLock; - std::map<std::string, Ref<Project>> m_Projects; - const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; - uint32_t m_UpdateCaptureRefCounter = 0; - std::unique_ptr<std::vector<std::string>> m_CapturedProjects; - - std::filesystem::path BasePathForProject(std::string_view ProjectId); - - friend class ProjectStoreGcStoreCompactor; - friend class ProjectStoreOplogReferenceChecker; - friend class ProjectStoreReferenceChecker; -}; - -Oid ComputeOpKey(const CbObjectView& Op); - -Oid OpKeyStringAsOid(std::string_view OpKey); - -template<typename T> -Oid -OpKeyStringAsOid(std::string_view OpKey, T& TmpBuffer) -{ - using namespace std::literals; - - CbObjectWriter Writer; - Writer << "key"sv << OpKey; - Writer.Finalize(); - TmpBuffer.resize(Writer.GetSaveSize()); - MutableMemoryView SaveBuffer(MutableMemoryView(TmpBuffer.data(), TmpBuffer.size())); - - const Oid OpId = ComputeOpKey(Writer.Save(SaveBuffer).AsObjectView()); - - return OpId; -} - -void prj_forcelink(); - -} // namespace zen diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index 488656d8b..11cc58e4d 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -3,7 +3,7 @@ #pragma once #include <zencore/jobqueue.h> -#include "projectstore.h" +#include <zenstore/projectstore.h> #include <zenutil/chunkblock.h> diff --git a/src/zenserver/vfs/vfsimpl.cpp b/src/zenserver/vfs/vfsimpl.cpp index e698d6d19..d22738827 100644 --- a/src/zenserver/vfs/vfsimpl.cpp +++ b/src/zenserver/vfs/vfsimpl.cpp @@ -3,8 +3,6 @@ #include "vfsimpl.h" #include "vfsservice.h" -#include "projectstore/projectstore.h" - #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zenstore/cache/structuredcachestore.h> diff --git a/src/zenserver/vfs/vfsimpl.h b/src/zenserver/vfs/vfsimpl.h index c33df100b..0dabf2c67 100644 --- a/src/zenserver/vfs/vfsimpl.h +++ b/src/zenserver/vfs/vfsimpl.h @@ -4,9 +4,8 @@ #include "vfsservice.h" -#include "projectstore/projectstore.h" - #include <zencore/logging.h> +#include <zenstore/projectstore.h> #include <zenvfs/vfs.h> #if ZEN_WITH_VFS diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index b4a4279ae..e52c0f506 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -1128,7 +1128,6 @@ ZenServer::ToString(ServerState Value) void zenserver_forcelinktests() { - zen::prj_forcelink(); zen::remoteprojectstore_forcelink(); } diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h index bcb02d336..8b786ed22 100644 --- a/src/zenserver/zenserver.h +++ b/src/zenserver/zenserver.h @@ -24,6 +24,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #include <zenhttp/httptest.h> #include <zenstore/cache/structuredcachestore.h> #include <zenstore/gc.h> +#include <zenstore/projectstore.h> #include "admin/admin.h" #include "buildstore/httpbuildstore.h" #include "cache/httpstructuredcache.h" @@ -31,7 +32,6 @@ ZEN_THIRD_PARTY_INCLUDES_END #include "frontend/frontend.h" #include "objectstore/objectstore.h" #include "projectstore/httpprojectstore.h" -#include "projectstore/projectstore.h" #include "stats/statsreporter.h" #include "upstream/upstream.h" #include "vfs/vfsservice.h" |