diff options
| author | Dan Engelbrecht <[email protected]> | 2024-03-14 16:50:18 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-03-14 16:50:18 +0100 |
| commit | 0a935231009cb21680d364ef125f0296a5a5bed6 (patch) | |
| tree | 7e55a67ae60883b0eab71a0d636aeec23f307d14 /src/zenserver/projectstore/projectstore.cpp | |
| parent | clean up test linking (#4) (diff) | |
| download | zen-0a935231009cb21680d364ef125f0296a5a5bed6.tar.xz zen-0a935231009cb21680d364ef125f0296a5a5bed6.zip | |
special treatment large oplog attachments v2 (#5)
- Bugfix: Install Ctrl+C handler earlier when doing `zen oplog-export` and `zen oplog-export` to properly cancel jobs
- Improvement: Add ability to block a set of CAS entries from GC in project store
- Improvement: Large attachments and loose files are now split into smaller chunks and stored in blocks during oplog export
Diffstat (limited to 'src/zenserver/projectstore/projectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 255 |
1 files changed, 225 insertions, 30 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 47f8b7357..cfa53c080 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -986,6 +986,17 @@ ProjectStore::Oplog::IterateOplog(std::function<void(CbObjectView)>&& Handler) m_Storage->ReplayLogEntries(Entries, [&](CbObjectView Op) { Handler(Op); }); } +size_t +ProjectStore::Oplog::GetOplogEntryCount() const +{ + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return 0; + } + return m_LatestOpMap.size(); +} + void ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbObjectView)>&& Handler) { @@ -1122,6 +1133,79 @@ ProjectStore::Oplog::AddChunkMappings(const std::unordered_map<Oid, IoHash, Oid: } void +ProjectStore::Oplog::CaptureUpdatedLSN(RwLock::ExclusiveLockScope&, uint32_t LSN) +{ + if (m_UpdatedLSNs) + { + m_UpdatedLSNs->push_back(LSN); + } +} + +void +ProjectStore::Oplog::CaptureAddedAttachments(std::span<const IoHash> AttachmentHashes) +{ + m_OplogLock.WithExclusiveLock([this, AttachmentHashes]() { + if (m_NonGCAttachments) + { + m_NonGCAttachments->reserve(m_NonGCAttachments->size() + AttachmentHashes.size()); + m_NonGCAttachments->insert(m_NonGCAttachments->end(), AttachmentHashes.begin(), AttachmentHashes.end()); + } + }); +} + +void +ProjectStore::Oplog::EnableUpdateCapture() +{ + m_OplogLock.WithExclusiveLock([&]() { + m_UpdatedLSNs = std::make_unique<std::vector<int>>(); + m_NonGCAttachments = std::make_unique<std::vector<IoHash>>(); + }); +} + +void +ProjectStore::Oplog::DisableUpdateCapture() +{ + m_OplogLock.WithExclusiveLock([&]() { + m_UpdatedLSNs.reset(); + m_NonGCAttachments.reset(); + }); +} + +void +ProjectStore::Oplog::IterateUpdatedLSNs(RwLock::SharedLockScope&, std::function<bool(const CbObjectView& UpdateOp)>&& Callback) +{ + if (m_UpdatedLSNs) + { + for (int UpdatedLSN : *m_UpdatedLSNs) + { + std::optional<CbObject> UpdatedOp = GetOpByIndex(UpdatedLSN); + if (UpdatedOp) + { + if (!Callback(UpdatedOp.value())) + { + break; + } + } + } + } +} + +void +ProjectStore::Oplog::IterateAddedAttachments(RwLock::SharedLockScope&, std::function<bool(const IoHash& RawHash)>&& Callback) +{ + if (m_NonGCAttachments) + { + for (const IoHash& ReferenceHash : *m_NonGCAttachments) + { + if (!Callback(ReferenceHash)) + { + break; + } + } + } +} + +void ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, const Oid& FileId, const IoHash& Hash, @@ -1279,10 +1363,7 @@ ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn; - if (m_UpdatedLSNs) - { - m_UpdatedLSNs->push_back(OpEntry.OpLsn); - } + CaptureUpdatedLSN(OplogLock, OpEntry.OpLsn); return OpEntry.OpLsn; } @@ -2803,8 +2884,10 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie Attachments.insert(RawHash); }; + auto OnChunkedAttachment = [](const ChunkedInfo&) {}; + RemoteProjectStore::Result RemoteResult = - SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, nullptr); + SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OnChunkedAttachment, nullptr); if (RemoteResult.ErrorCode) { @@ -2865,6 +2948,15 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, } } + size_t ChunkFileSizeLimit = RemoteStoreOptions::DefaultChunkFileSizeLimit; + if (auto Param = Params.GetValue("chunkfilesizelimit"); Param.empty() == false) + { + if (auto Value = ParseInt<size_t>(Param)) + { + ChunkFileSizeLimit = Value.value(); + } + } + CidStore& ChunkStore = m_CidStore; RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer( @@ -2873,11 +2965,12 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, *Oplog, MaxBlockSize, MaxChunkEmbedSize, + ChunkFileSizeLimit, /* BuildBlocks */ false, - /* IgnoreMissingAttachemnts */ false, - [](CompressedBuffer&&, const IoHash) {}, - [](const IoHash&, const TGetAttachmentBufferFunc&) {}, - [](const std::unordered_set<IoHash, IoHash::Hasher>) {}, + /* IgnoreMissingAttachments */ false, + [](CompressedBuffer&&, const IoHash&) {}, + [](const IoHash&, TGetAttachmentBufferFunc&&) {}, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, /* EmbedLooseFiles*/ false); OutResponse = std::move(ContainerResult.ContainerObject); @@ -3239,6 +3332,7 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize); size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); + size_t ChunkFileSizeLimit = Params["chunkfilesizelimit"sv].AsUInt64(RemoteStoreOptions::DefaultChunkFileSizeLimit); bool Force = Params["force"sv].AsBool(false); bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); @@ -3267,6 +3361,7 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op OplogPtr = &Oplog, MaxBlockSize, MaxChunkEmbedSize, + ChunkFileSizeLimit, EmbedLooseFile, Force, IgnoreMissingAttachments](JobContext& Context) { @@ -3276,6 +3371,7 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op *OplogPtr, MaxBlockSize, MaxChunkEmbedSize, + ChunkFileSizeLimit, EmbedLooseFile, Force, IgnoreMissingAttachments, @@ -3573,7 +3669,7 @@ public: virtual ~ProjectStoreReferenceChecker() { m_OplogLock.reset(); - m_Oplog.m_OplogLock.WithExclusiveLock([&]() { m_Oplog.m_UpdatedLSNs.reset(); }); + m_Oplog.DisableUpdateCapture(); } virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}'", m_Oplog.m_BasePath); } @@ -3598,7 +3694,7 @@ public: m_Oplog.OplogId()); }); - m_Oplog.m_OplogLock.WithExclusiveLock([&]() { m_Oplog.m_UpdatedLSNs = std::make_unique<std::vector<int>>(); }); + m_Oplog.EnableUpdateCapture(); RwLock::SharedLockScope __(m_Oplog.m_OplogLock); if (Ctx.IsCancelledFlag) @@ -3608,7 +3704,6 @@ public: m_Oplog.IterateOplog([&](CbObjectView Op) { Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); }); - m_PreCachedLsn = m_Oplog.GetMaxOpIndex(); } } @@ -3632,18 +3727,10 @@ public: m_OplogLock = std::make_unique<RwLock::SharedLockScope>(m_Oplog.m_OplogLock); - if (m_Oplog.m_UpdatedLSNs) - { - for (int UpdatedLSN : *m_Oplog.m_UpdatedLSNs) - { - std::optional<CbObject> UpdatedOp = m_Oplog.GetOpByIndex(UpdatedLSN); - if (UpdatedOp) - { - CbObjectView Op = UpdatedOp.value(); - Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); - } - } - } + m_Oplog.IterateUpdatedLSNs(*m_OplogLock, [&](const CbObjectView& UpdateOp) -> bool { + UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); + return true; + }); } virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override @@ -3676,12 +3763,21 @@ public: } } } + m_Oplog.IterateAddedAttachments(*m_OplogLock, [&](const IoHash& RawHash) -> bool { + if (IoCids.erase(RawHash) == 1) + { + if (IoCids.empty()) + { + return false; + } + } + return true; + }); } ProjectStore::Oplog& m_Oplog; bool m_PreCache; std::unique_ptr<RwLock::SharedLockScope> m_OplogLock; std::vector<IoHash> m_References; - int m_PreCachedLsn = -1; }; std::vector<GcReferenceChecker*> @@ -3792,13 +3888,16 @@ namespace testutils { return Package; }; - std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes) + std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments( + const std::span<const size_t>& Sizes, + OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast) { std::vector<std::pair<Oid, CompressedBuffer>> Result; Result.reserve(Sizes.size()); for (size_t Size : Sizes) { - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(CreateRandomBlob(Size))); + CompressedBuffer Compressed = + CompressedBuffer::Compress(SharedBuffer(CreateRandomBlob(Size)), OodleCompressor::Mermaid, CompressionLevel); Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); } return Result; @@ -3890,6 +3989,99 @@ TEST_CASE("project.store.lifetimes") CHECK(Project->Identifier == "proj1"sv); } +struct ExportForceDisableBlocksTrue_ForceTempBlocksFalse +{ + static const bool ForceDisableBlocks = true; + static const bool ForceEnableTempBlocks = false; +}; + +struct ExportForceDisableBlocksFalse_ForceTempBlocksFalse +{ + static const bool ForceDisableBlocks = false; + static const bool ForceEnableTempBlocks = false; +}; + +struct ExportForceDisableBlocksFalse_ForceTempBlocksTrue +{ + static const bool ForceDisableBlocks = false; + static const bool ForceEnableTempBlocks = true; +}; + +TEST_CASE_TEMPLATE("project.store.export", + Settings, + ExportForceDisableBlocksTrue_ForceTempBlocksFalse, + ExportForceDisableBlocksFalse_ForceTempBlocksFalse, + ExportForceDisableBlocksFalse_ForceTempBlocksTrue) +{ + using namespace std::literals; + using namespace testutils; + + ScopedTemporaryDirectory TempDir; + ScopedTemporaryDirectory ExportDir; + + auto JobQueue = MakeJobQueue(1, ""sv); + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"; + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue); + std::filesystem::path RootDir = TempDir.Path() / "root"; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; + std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; + std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; + + Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {}); + CHECK(Oplog != nullptr); + + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}))); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122}))); + Oplog->AppendNewOplogEntry( + CreateOplogPackage(Oid::NewOid(), + CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None))); + + FileRemoteStoreOptions Options = { + RemoteStoreOptions{.MaxBlockSize = 64u * 1024, .MaxChunkEmbedSize = 32 * 1024u, .ChunkFileSizeLimit = 64u * 1024u}, + /*.FolderPath = */ ExportDir.Path(), + /*.Name = */ std::string("oplog1"), + /*OptionalBaseName = */ std::string(), + /*.ForceDisableBlocks = */ Settings::ForceDisableBlocks, + /*.ForceEnableTempBlocks = */ Settings::ForceEnableTempBlocks}; + std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options); + RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); + + RemoteProjectStore::Result ExportResult = SaveOplog(CidStore, + *RemoteStore, + *Project.Get(), + *Oplog, + Options.MaxBlockSize, + Options.MaxChunkEmbedSize, + Options.ChunkFileSizeLimit, + true, + false, + false, + nullptr); + + CHECK(ExportResult.ErrorCode == 0); + + ProjectStore::Oplog* OplogImport = Project->NewOplog("oplog2", {}); + CHECK(OplogImport != nullptr); + RemoteProjectStore::Result ImportResult = LoadOplog(CidStore, *RemoteStore, *OplogImport, false, false, nullptr); + CHECK(ImportResult.ErrorCode == 0); + + RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore, *RemoteStore, *OplogImport, true, false, nullptr); + CHECK(ImportForceResult.ErrorCode == 0); +} + TEST_CASE("project.store.gc") { using namespace std::literals; @@ -4284,12 +4476,15 @@ TEST_CASE("project.store.block") 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251, 491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225}); - std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes); - std::vector<SharedBuffer> Chunks; + std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes); + std::vector<std::pair<IoHash, FetchChunkFunc>> Chunks; Chunks.reserve(AttachmentSizes.size()); for (const auto& It : AttachmentsWithId) { - Chunks.push_back(It.second.GetCompressed().Flatten()); + Chunks.push_back(std::make_pair(It.second.DecodeRawHash(), + [Buffer = It.second.GetCompressed().Flatten().AsIoBuffer()](const IoHash&) -> CompositeBuffer { + return CompositeBuffer(SharedBuffer(Buffer)); + })); } CompressedBuffer Block = GenerateBlock(std::move(Chunks)); IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer(); |