aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/projectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-03-14 16:50:18 +0100
committerGitHub Enterprise <[email protected]>2024-03-14 16:50:18 +0100
commit0a935231009cb21680d364ef125f0296a5a5bed6 (patch)
tree7e55a67ae60883b0eab71a0d636aeec23f307d14 /src/zenserver/projectstore/projectstore.cpp
parentclean up test linking (#4) (diff)
downloadzen-0a935231009cb21680d364ef125f0296a5a5bed6.tar.xz
zen-0a935231009cb21680d364ef125f0296a5a5bed6.zip
special treatment large oplog attachments v2 (#5)
- Bugfix: Install Ctrl+C handler earlier when doing `zen oplog-export` and `zen oplog-export` to properly cancel jobs - Improvement: Add ability to block a set of CAS entries from GC in project store - Improvement: Large attachments and loose files are now split into smaller chunks and stored in blocks during oplog export
Diffstat (limited to 'src/zenserver/projectstore/projectstore.cpp')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp255
1 files changed, 225 insertions, 30 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 47f8b7357..cfa53c080 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -986,6 +986,17 @@ ProjectStore::Oplog::IterateOplog(std::function<void(CbObjectView)>&& Handler)
m_Storage->ReplayLogEntries(Entries, [&](CbObjectView Op) { Handler(Op); });
}
+size_t
+ProjectStore::Oplog::GetOplogEntryCount() const
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return 0;
+ }
+ return m_LatestOpMap.size();
+}
+
void
ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbObjectView)>&& Handler)
{
@@ -1122,6 +1133,79 @@ ProjectStore::Oplog::AddChunkMappings(const std::unordered_map<Oid, IoHash, Oid:
}
void
+ProjectStore::Oplog::CaptureUpdatedLSN(RwLock::ExclusiveLockScope&, uint32_t LSN)
+{
+ if (m_UpdatedLSNs)
+ {
+ m_UpdatedLSNs->push_back(LSN);
+ }
+}
+
+void
+ProjectStore::Oplog::CaptureAddedAttachments(std::span<const IoHash> AttachmentHashes)
+{
+ m_OplogLock.WithExclusiveLock([this, AttachmentHashes]() {
+ if (m_NonGCAttachments)
+ {
+ m_NonGCAttachments->reserve(m_NonGCAttachments->size() + AttachmentHashes.size());
+ m_NonGCAttachments->insert(m_NonGCAttachments->end(), AttachmentHashes.begin(), AttachmentHashes.end());
+ }
+ });
+}
+
+void
+ProjectStore::Oplog::EnableUpdateCapture()
+{
+ m_OplogLock.WithExclusiveLock([&]() {
+ m_UpdatedLSNs = std::make_unique<std::vector<int>>();
+ m_NonGCAttachments = std::make_unique<std::vector<IoHash>>();
+ });
+}
+
+void
+ProjectStore::Oplog::DisableUpdateCapture()
+{
+ m_OplogLock.WithExclusiveLock([&]() {
+ m_UpdatedLSNs.reset();
+ m_NonGCAttachments.reset();
+ });
+}
+
+void
+ProjectStore::Oplog::IterateUpdatedLSNs(RwLock::SharedLockScope&, std::function<bool(const CbObjectView& UpdateOp)>&& Callback)
+{
+ if (m_UpdatedLSNs)
+ {
+ for (int UpdatedLSN : *m_UpdatedLSNs)
+ {
+ std::optional<CbObject> UpdatedOp = GetOpByIndex(UpdatedLSN);
+ if (UpdatedOp)
+ {
+ if (!Callback(UpdatedOp.value()))
+ {
+ break;
+ }
+ }
+ }
+ }
+}
+
+void
+ProjectStore::Oplog::IterateAddedAttachments(RwLock::SharedLockScope&, std::function<bool(const IoHash& RawHash)>&& Callback)
+{
+ if (m_NonGCAttachments)
+ {
+ for (const IoHash& ReferenceHash : *m_NonGCAttachments)
+ {
+ if (!Callback(ReferenceHash))
+ {
+ break;
+ }
+ }
+ }
+}
+
+void
ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&,
const Oid& FileId,
const IoHash& Hash,
@@ -1279,10 +1363,7 @@ ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize});
m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn;
- if (m_UpdatedLSNs)
- {
- m_UpdatedLSNs->push_back(OpEntry.OpLsn);
- }
+ CaptureUpdatedLSN(OplogLock, OpEntry.OpLsn);
return OpEntry.OpLsn;
}
@@ -2803,8 +2884,10 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie
Attachments.insert(RawHash);
};
+ auto OnChunkedAttachment = [](const ChunkedInfo&) {};
+
RemoteProjectStore::Result RemoteResult =
- SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, nullptr);
+ SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OnChunkedAttachment, nullptr);
if (RemoteResult.ErrorCode)
{
@@ -2865,6 +2948,15 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
}
}
+ size_t ChunkFileSizeLimit = RemoteStoreOptions::DefaultChunkFileSizeLimit;
+ if (auto Param = Params.GetValue("chunkfilesizelimit"); Param.empty() == false)
+ {
+ if (auto Value = ParseInt<size_t>(Param))
+ {
+ ChunkFileSizeLimit = Value.value();
+ }
+ }
+
CidStore& ChunkStore = m_CidStore;
RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer(
@@ -2873,11 +2965,12 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
*Oplog,
MaxBlockSize,
MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
/* BuildBlocks */ false,
- /* IgnoreMissingAttachemnts */ false,
- [](CompressedBuffer&&, const IoHash) {},
- [](const IoHash&, const TGetAttachmentBufferFunc&) {},
- [](const std::unordered_set<IoHash, IoHash::Hasher>) {},
+ /* IgnoreMissingAttachments */ false,
+ [](CompressedBuffer&&, const IoHash&) {},
+ [](const IoHash&, TGetAttachmentBufferFunc&&) {},
+ [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {},
/* EmbedLooseFiles*/ false);
OutResponse = std::move(ContainerResult.ContainerObject);
@@ -3239,6 +3332,7 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op
size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize);
size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
+ size_t ChunkFileSizeLimit = Params["chunkfilesizelimit"sv].AsUInt64(RemoteStoreOptions::DefaultChunkFileSizeLimit);
bool Force = Params["force"sv].AsBool(false);
bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false);
@@ -3267,6 +3361,7 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op
OplogPtr = &Oplog,
MaxBlockSize,
MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
EmbedLooseFile,
Force,
IgnoreMissingAttachments](JobContext& Context) {
@@ -3276,6 +3371,7 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op
*OplogPtr,
MaxBlockSize,
MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
EmbedLooseFile,
Force,
IgnoreMissingAttachments,
@@ -3573,7 +3669,7 @@ public:
virtual ~ProjectStoreReferenceChecker()
{
m_OplogLock.reset();
- m_Oplog.m_OplogLock.WithExclusiveLock([&]() { m_Oplog.m_UpdatedLSNs.reset(); });
+ m_Oplog.DisableUpdateCapture();
}
virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}'", m_Oplog.m_BasePath); }
@@ -3598,7 +3694,7 @@ public:
m_Oplog.OplogId());
});
- m_Oplog.m_OplogLock.WithExclusiveLock([&]() { m_Oplog.m_UpdatedLSNs = std::make_unique<std::vector<int>>(); });
+ m_Oplog.EnableUpdateCapture();
RwLock::SharedLockScope __(m_Oplog.m_OplogLock);
if (Ctx.IsCancelledFlag)
@@ -3608,7 +3704,6 @@ public:
m_Oplog.IterateOplog([&](CbObjectView Op) {
Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); });
});
- m_PreCachedLsn = m_Oplog.GetMaxOpIndex();
}
}
@@ -3632,18 +3727,10 @@ public:
m_OplogLock = std::make_unique<RwLock::SharedLockScope>(m_Oplog.m_OplogLock);
- if (m_Oplog.m_UpdatedLSNs)
- {
- for (int UpdatedLSN : *m_Oplog.m_UpdatedLSNs)
- {
- std::optional<CbObject> UpdatedOp = m_Oplog.GetOpByIndex(UpdatedLSN);
- if (UpdatedOp)
- {
- CbObjectView Op = UpdatedOp.value();
- Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); });
- }
- }
- }
+ m_Oplog.IterateUpdatedLSNs(*m_OplogLock, [&](const CbObjectView& UpdateOp) -> bool {
+ UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); });
+ return true;
+ });
}
virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override
@@ -3676,12 +3763,21 @@ public:
}
}
}
+ m_Oplog.IterateAddedAttachments(*m_OplogLock, [&](const IoHash& RawHash) -> bool {
+ if (IoCids.erase(RawHash) == 1)
+ {
+ if (IoCids.empty())
+ {
+ return false;
+ }
+ }
+ return true;
+ });
}
ProjectStore::Oplog& m_Oplog;
bool m_PreCache;
std::unique_ptr<RwLock::SharedLockScope> m_OplogLock;
std::vector<IoHash> m_References;
- int m_PreCachedLsn = -1;
};
std::vector<GcReferenceChecker*>
@@ -3792,13 +3888,16 @@ namespace testutils {
return Package;
};
- std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes)
+ std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(
+ const std::span<const size_t>& Sizes,
+ OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast)
{
std::vector<std::pair<Oid, CompressedBuffer>> Result;
Result.reserve(Sizes.size());
for (size_t Size : Sizes)
{
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(CreateRandomBlob(Size)));
+ CompressedBuffer Compressed =
+ CompressedBuffer::Compress(SharedBuffer(CreateRandomBlob(Size)), OodleCompressor::Mermaid, CompressionLevel);
Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
}
return Result;
@@ -3890,6 +3989,99 @@ TEST_CASE("project.store.lifetimes")
CHECK(Project->Identifier == "proj1"sv);
}
+struct ExportForceDisableBlocksTrue_ForceTempBlocksFalse
+{
+ static const bool ForceDisableBlocks = true;
+ static const bool ForceEnableTempBlocks = false;
+};
+
+struct ExportForceDisableBlocksFalse_ForceTempBlocksFalse
+{
+ static const bool ForceDisableBlocks = false;
+ static const bool ForceEnableTempBlocks = false;
+};
+
+struct ExportForceDisableBlocksFalse_ForceTempBlocksTrue
+{
+ static const bool ForceDisableBlocks = false;
+ static const bool ForceEnableTempBlocks = true;
+};
+
+TEST_CASE_TEMPLATE("project.store.export",
+ Settings,
+ ExportForceDisableBlocksTrue_ForceTempBlocksFalse,
+ ExportForceDisableBlocksFalse_ForceTempBlocksFalse,
+ ExportForceDisableBlocksFalse_ForceTempBlocksTrue)
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ ScopedTemporaryDirectory TempDir;
+ ScopedTemporaryDirectory ExportDir;
+
+ auto JobQueue = MakeJobQueue(1, ""sv);
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore";
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue);
+ std::filesystem::path RootDir = TempDir.Path() / "root";
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
+ std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
+ std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject";
+
+ Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ ProjectRootDir.string(),
+ ProjectFilePath.string()));
+ ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {});
+ CHECK(Oplog != nullptr);
+
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {}));
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77})));
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99})));
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122})));
+ Oplog->AppendNewOplogEntry(
+ CreateOplogPackage(Oid::NewOid(),
+ CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None)));
+
+ FileRemoteStoreOptions Options = {
+ RemoteStoreOptions{.MaxBlockSize = 64u * 1024, .MaxChunkEmbedSize = 32 * 1024u, .ChunkFileSizeLimit = 64u * 1024u},
+ /*.FolderPath = */ ExportDir.Path(),
+ /*.Name = */ std::string("oplog1"),
+ /*OptionalBaseName = */ std::string(),
+ /*.ForceDisableBlocks = */ Settings::ForceDisableBlocks,
+ /*.ForceEnableTempBlocks = */ Settings::ForceEnableTempBlocks};
+ std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options);
+ RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+
+ RemoteProjectStore::Result ExportResult = SaveOplog(CidStore,
+ *RemoteStore,
+ *Project.Get(),
+ *Oplog,
+ Options.MaxBlockSize,
+ Options.MaxChunkEmbedSize,
+ Options.ChunkFileSizeLimit,
+ true,
+ false,
+ false,
+ nullptr);
+
+ CHECK(ExportResult.ErrorCode == 0);
+
+ ProjectStore::Oplog* OplogImport = Project->NewOplog("oplog2", {});
+ CHECK(OplogImport != nullptr);
+ RemoteProjectStore::Result ImportResult = LoadOplog(CidStore, *RemoteStore, *OplogImport, false, false, nullptr);
+ CHECK(ImportResult.ErrorCode == 0);
+
+ RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore, *RemoteStore, *OplogImport, true, false, nullptr);
+ CHECK(ImportForceResult.ErrorCode == 0);
+}
+
TEST_CASE("project.store.gc")
{
using namespace std::literals;
@@ -4284,12 +4476,15 @@ TEST_CASE("project.store.block")
7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251,
491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225});
- std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes);
- std::vector<SharedBuffer> Chunks;
+ std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes);
+ std::vector<std::pair<IoHash, FetchChunkFunc>> Chunks;
Chunks.reserve(AttachmentSizes.size());
for (const auto& It : AttachmentsWithId)
{
- Chunks.push_back(It.second.GetCompressed().Flatten());
+ Chunks.push_back(std::make_pair(It.second.DecodeRawHash(),
+ [Buffer = It.second.GetCompressed().Flatten().AsIoBuffer()](const IoHash&) -> CompositeBuffer {
+ return CompositeBuffer(SharedBuffer(Buffer));
+ }));
}
CompressedBuffer Block = GenerateBlock(std::move(Chunks));
IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer();