diff options
| author | Dan Engelbrecht <[email protected]> | 2024-02-05 16:52:42 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-02-05 16:52:42 +0100 |
| commit | 222392fff48e1659ec8bc59e42de09f3625111ad (patch) | |
| tree | 4e17c75beabd7704f2bca3feb26d0aa9e7de204e /src/zenserver/projectstore/projectstore.cpp | |
| parent | 5.4.1-pre0 (diff) | |
| download | zen-222392fff48e1659ec8bc59e42de09f3625111ad.tar.xz zen-222392fff48e1659ec8bc59e42de09f3625111ad.zip | |
compress large attachments on demand (#647)
- Improvement: Speed up oplog export by fetching/compressing big attachments on demand
- Improvement: Speed up oplog export by batch-fetcing small attachments
- Improvement: Speed up oplog import by batching writes of oplog ops
- Improvement: Tweak oplog export default block size and embed size limit
- Improvement: Add more messaging and progress during oplog import/export
Diffstat (limited to 'src/zenserver/projectstore/projectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 183 |
1 files changed, 151 insertions, 32 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index ea219f9b0..72a8e1409 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -448,13 +448,40 @@ struct ProjectStore::OplogStorage : public RefCounted return CbObject(SharedBuffer(std::move(OpBuffer))); } - OplogEntry AppendOp(MemoryView Buffer, uint32_t OpCoreHash, Oid KeyHash) + struct AppendOpData + { + MemoryView Buffer; + uint32_t OpCoreHash; + Oid KeyHash; + }; + + static OplogStorage::AppendOpData GetAppendOpData(const CbObjectView& Core) + { + using namespace std::literals; + + AppendOpData OpData; + + OpData.Buffer = Core.GetView(); + const uint64_t WriteSize = OpData.Buffer.GetSize(); + OpData.OpCoreHash = uint32_t(XXH3_64bits(OpData.Buffer.GetData(), WriteSize) & 0xffffFFFF); + + ZEN_ASSERT(WriteSize != 0); + + XXH3_128Stream KeyHasher; + Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + XXH3_128 KeyHash128 = KeyHasher.GetHash(); + memcpy(&OpData.KeyHash, KeyHash128.Hash, sizeof OpData.KeyHash); + + return OpData; + } + + OplogEntry AppendOp(const AppendOpData& OpData) { ZEN_TRACE_CPU("Store::OplogStorage::AppendOp"); using namespace std::literals; - uint64_t WriteSize = Buffer.GetSize(); + uint64_t WriteSize = OpData.Buffer.GetSize(); RwLock::ExclusiveLockScope Lock(m_RwLock); const uint64_t WriteOffset = m_NextOpsOffset; @@ -467,15 +494,70 @@ struct ProjectStore::OplogStorage : public RefCounted OplogEntry Entry = {.OpLsn = OpLsn, .OpCoreOffset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign), .OpCoreSize = uint32_t(WriteSize), - .OpCoreHash = OpCoreHash, - .OpKeyHash = KeyHash}; + .OpCoreHash = OpData.OpCoreHash, + .OpKeyHash = OpData.KeyHash}; m_Oplog.Append(Entry); - m_OpBlobs.Write(Buffer.GetData(), WriteSize, WriteOffset); + m_OpBlobs.Write(OpData.Buffer.GetData(), WriteSize, WriteOffset); return Entry; } + std::vector<OplogEntry> AppendOps(std::span<const AppendOpData> Ops) + { + ZEN_TRACE_CPU("Store::OplogStorage::AppendOps"); + + using namespace std::literals; + + size_t OpCount = Ops.size(); + std::vector<std::pair<uint64_t, uint64_t>> OffsetAndSizes; + std::vector<uint32_t> OpLsns; + OffsetAndSizes.resize(OpCount); + OpLsns.resize(OpCount); + + for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) + { + OffsetAndSizes[OpIndex].second = Ops[OpIndex].Buffer.GetSize(); + } + + uint64_t WriteStart = 0; + uint64_t WriteLength = 0; + { + RwLock::ExclusiveLockScope Lock(m_RwLock); + WriteStart = m_NextOpsOffset; + ZEN_ASSERT(IsMultipleOf(WriteStart, m_OpsAlign)); + uint64_t WriteOffset = WriteStart; + for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) + { + OffsetAndSizes[OpIndex].first = WriteOffset - WriteStart; + OpLsns[OpIndex] = ++m_MaxLsn; + WriteOffset = RoundUp(WriteOffset + OffsetAndSizes[OpIndex].second, m_OpsAlign); + } + WriteLength = WriteOffset - WriteStart; + m_NextOpsOffset = RoundUp(WriteOffset, m_OpsAlign); + } + + IoBuffer WriteBuffer(WriteLength); + + std::vector<OplogEntry> Entries; + Entries.resize(OpCount); + for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) + { + MutableMemoryView WriteBufferView = WriteBuffer.GetMutableView().RightChop(OffsetAndSizes[OpIndex].first); + WriteBufferView.CopyFrom(Ops[OpIndex].Buffer); + Entries[OpIndex] = {.OpLsn = OpLsns[OpIndex], + .OpCoreOffset = gsl::narrow_cast<uint32_t>((WriteStart + OffsetAndSizes[OpIndex].first) / m_OpsAlign), + .OpCoreSize = uint32_t(OffsetAndSizes[OpIndex].second), + .OpCoreHash = Ops[OpIndex].OpCoreHash, + .OpKeyHash = Ops[OpIndex].KeyHash}; + } + + m_OpBlobs.Write(WriteBuffer.GetData(), WriteBuffer.GetSize(), WriteStart); + m_Oplog.Append(Entries); + + return Entries; + } + void AppendTombstone(Oid KeyHash) { OplogEntry Entry = {.OpKeyHash = KeyHash}; @@ -1243,6 +1325,17 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) return EntryId; } +RefPtr<ProjectStore::OplogStorage> +ProjectStore::Oplog::GetStorage() +{ + RefPtr<OplogStorage> Storage; + { + RwLock::SharedLockScope _(m_OplogLock); + Storage = m_Storage; + } + return Storage; +} + uint32_t ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core) { @@ -1250,35 +1343,61 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core) using namespace std::literals; - OplogEntryMapping Mapping = GetMapping(Core); + RefPtr<OplogStorage> Storage = GetStorage(); + if (!m_Storage) + { + return 0xffffffffu; + } - MemoryView Buffer = Core.GetView(); - const uint64_t WriteSize = Buffer.GetSize(); - const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF); + OplogEntryMapping Mapping = GetMapping(Core); + OplogStorage::AppendOpData OpData = OplogStorage::GetAppendOpData(Core); - ZEN_ASSERT(WriteSize != 0); + const OplogEntry OpEntry = m_Storage->AppendOp(OpData); - XXH3_128Stream KeyHasher; - Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); - XXH3_128 KeyHash128 = KeyHasher.GetHash(); - Oid KeyHash; - memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry); - RefPtr<OplogStorage> Storage; + return EntryId; +} + +std::vector<uint32_t> +ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores) +{ + ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntries"); + + using namespace std::literals; + + RefPtr<OplogStorage> Storage = GetStorage(); + if (!m_Storage) { - RwLock::SharedLockScope _(m_OplogLock); - Storage = m_Storage; + return std::vector<uint32_t>(Cores.size(), 0xffffffffu); } - if (!m_Storage) + + size_t OpCount = Cores.size(); + std::vector<OplogEntryMapping> Mappings; + std::vector<OplogStorage::AppendOpData> OpDatas; + Mappings.resize(OpCount); + OpDatas.resize(OpCount); + + for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) { - return 0xffffffffu; + const CbObjectView& Core = Cores[OpIndex]; + OpDatas[OpIndex] = OplogStorage::GetAppendOpData(Core); + Mappings[OpIndex] = GetMapping(Core); } - const OplogEntry OpEntry = m_Storage->AppendOp(Buffer, OpCoreHash, KeyHash); - RwLock::ExclusiveLockScope OplogLock(m_OplogLock); - const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry); + std::vector<OplogEntry> OpEntries = Storage->AppendOps(OpDatas); - return EntryId; + std::vector<uint32_t> EntryIds; + EntryIds.resize(OpCount); + { + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) + { + EntryIds[OpIndex] = RegisterOplogEntry(OplogLock, Mappings[OpIndex], OpEntries[OpIndex]); + } + } + return EntryIds; } ////////////////////////////////////////////////////////////////////////// @@ -2730,7 +2849,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, } Project->TouchOplog(OplogId); - size_t MaxBlockSize = 128u * 1024u * 1024u; + size_t MaxBlockSize = RemoteStoreOptions::DefaultMaxBlockSize; if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false) { if (auto Value = ParseInt<size_t>(Param)) @@ -2738,7 +2857,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, MaxBlockSize = Value.value(); } } - size_t MaxChunkEmbedSize = 1024u * 1024u; + size_t MaxChunkEmbedSize = RemoteStoreOptions::DefaultMaxChunkEmbedSize; if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false) { if (auto Value = ParseInt<size_t>(Param)) @@ -2758,9 +2877,9 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, /* BuildBlocks */ false, /* IgnoreMissingAttachemnts */ false, [](CompressedBuffer&&, const IoHash) {}, - [](const IoHash&) {}, + [](const IoHash&, const TGetAttachmentBufferFunc&) {}, [](const std::unordered_set<IoHash, IoHash::Hasher>) {}, - nullptr); + /* EmbedLooseFiles*/ false); OutResponse = std::move(ContainerResult.ContainerObject); return ConvertResult(ContainerResult); @@ -3119,8 +3238,8 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op using namespace std::literals; - size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(64u * 1024u * 1024u); - size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize); + size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); bool Force = Params["force"sv].AsBool(false); bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); @@ -3180,8 +3299,8 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, using namespace std::literals; - size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(64u * 1024u * 1024u); - size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize); + size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); bool Force = Params["force"sv].AsBool(false); bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); |