aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/projectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-02-05 16:52:42 +0100
committerGitHub <[email protected]>2024-02-05 16:52:42 +0100
commit222392fff48e1659ec8bc59e42de09f3625111ad (patch)
tree4e17c75beabd7704f2bca3feb26d0aa9e7de204e /src/zenserver/projectstore/projectstore.cpp
parent5.4.1-pre0 (diff)
downloadzen-222392fff48e1659ec8bc59e42de09f3625111ad.tar.xz
zen-222392fff48e1659ec8bc59e42de09f3625111ad.zip
compress large attachments on demand (#647)
- Improvement: Speed up oplog export by fetching/compressing big attachments on demand - Improvement: Speed up oplog export by batch-fetcing small attachments - Improvement: Speed up oplog import by batching writes of oplog ops - Improvement: Tweak oplog export default block size and embed size limit - Improvement: Add more messaging and progress during oplog import/export
Diffstat (limited to 'src/zenserver/projectstore/projectstore.cpp')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp183
1 files changed, 151 insertions, 32 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index ea219f9b0..72a8e1409 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -448,13 +448,40 @@ struct ProjectStore::OplogStorage : public RefCounted
return CbObject(SharedBuffer(std::move(OpBuffer)));
}
- OplogEntry AppendOp(MemoryView Buffer, uint32_t OpCoreHash, Oid KeyHash)
+ struct AppendOpData
+ {
+ MemoryView Buffer;
+ uint32_t OpCoreHash;
+ Oid KeyHash;
+ };
+
+ static OplogStorage::AppendOpData GetAppendOpData(const CbObjectView& Core)
+ {
+ using namespace std::literals;
+
+ AppendOpData OpData;
+
+ OpData.Buffer = Core.GetView();
+ const uint64_t WriteSize = OpData.Buffer.GetSize();
+ OpData.OpCoreHash = uint32_t(XXH3_64bits(OpData.Buffer.GetData(), WriteSize) & 0xffffFFFF);
+
+ ZEN_ASSERT(WriteSize != 0);
+
+ XXH3_128Stream KeyHasher;
+ Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ XXH3_128 KeyHash128 = KeyHasher.GetHash();
+ memcpy(&OpData.KeyHash, KeyHash128.Hash, sizeof OpData.KeyHash);
+
+ return OpData;
+ }
+
+ OplogEntry AppendOp(const AppendOpData& OpData)
{
ZEN_TRACE_CPU("Store::OplogStorage::AppendOp");
using namespace std::literals;
- uint64_t WriteSize = Buffer.GetSize();
+ uint64_t WriteSize = OpData.Buffer.GetSize();
RwLock::ExclusiveLockScope Lock(m_RwLock);
const uint64_t WriteOffset = m_NextOpsOffset;
@@ -467,15 +494,70 @@ struct ProjectStore::OplogStorage : public RefCounted
OplogEntry Entry = {.OpLsn = OpLsn,
.OpCoreOffset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign),
.OpCoreSize = uint32_t(WriteSize),
- .OpCoreHash = OpCoreHash,
- .OpKeyHash = KeyHash};
+ .OpCoreHash = OpData.OpCoreHash,
+ .OpKeyHash = OpData.KeyHash};
m_Oplog.Append(Entry);
- m_OpBlobs.Write(Buffer.GetData(), WriteSize, WriteOffset);
+ m_OpBlobs.Write(OpData.Buffer.GetData(), WriteSize, WriteOffset);
return Entry;
}
+ std::vector<OplogEntry> AppendOps(std::span<const AppendOpData> Ops)
+ {
+ ZEN_TRACE_CPU("Store::OplogStorage::AppendOps");
+
+ using namespace std::literals;
+
+ size_t OpCount = Ops.size();
+ std::vector<std::pair<uint64_t, uint64_t>> OffsetAndSizes;
+ std::vector<uint32_t> OpLsns;
+ OffsetAndSizes.resize(OpCount);
+ OpLsns.resize(OpCount);
+
+ for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
+ {
+ OffsetAndSizes[OpIndex].second = Ops[OpIndex].Buffer.GetSize();
+ }
+
+ uint64_t WriteStart = 0;
+ uint64_t WriteLength = 0;
+ {
+ RwLock::ExclusiveLockScope Lock(m_RwLock);
+ WriteStart = m_NextOpsOffset;
+ ZEN_ASSERT(IsMultipleOf(WriteStart, m_OpsAlign));
+ uint64_t WriteOffset = WriteStart;
+ for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
+ {
+ OffsetAndSizes[OpIndex].first = WriteOffset - WriteStart;
+ OpLsns[OpIndex] = ++m_MaxLsn;
+ WriteOffset = RoundUp(WriteOffset + OffsetAndSizes[OpIndex].second, m_OpsAlign);
+ }
+ WriteLength = WriteOffset - WriteStart;
+ m_NextOpsOffset = RoundUp(WriteOffset, m_OpsAlign);
+ }
+
+ IoBuffer WriteBuffer(WriteLength);
+
+ std::vector<OplogEntry> Entries;
+ Entries.resize(OpCount);
+ for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
+ {
+ MutableMemoryView WriteBufferView = WriteBuffer.GetMutableView().RightChop(OffsetAndSizes[OpIndex].first);
+ WriteBufferView.CopyFrom(Ops[OpIndex].Buffer);
+ Entries[OpIndex] = {.OpLsn = OpLsns[OpIndex],
+ .OpCoreOffset = gsl::narrow_cast<uint32_t>((WriteStart + OffsetAndSizes[OpIndex].first) / m_OpsAlign),
+ .OpCoreSize = uint32_t(OffsetAndSizes[OpIndex].second),
+ .OpCoreHash = Ops[OpIndex].OpCoreHash,
+ .OpKeyHash = Ops[OpIndex].KeyHash};
+ }
+
+ m_OpBlobs.Write(WriteBuffer.GetData(), WriteBuffer.GetSize(), WriteStart);
+ m_Oplog.Append(Entries);
+
+ return Entries;
+ }
+
void AppendTombstone(Oid KeyHash)
{
OplogEntry Entry = {.OpKeyHash = KeyHash};
@@ -1243,6 +1325,17 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
return EntryId;
}
+RefPtr<ProjectStore::OplogStorage>
+ProjectStore::Oplog::GetStorage()
+{
+ RefPtr<OplogStorage> Storage;
+ {
+ RwLock::SharedLockScope _(m_OplogLock);
+ Storage = m_Storage;
+ }
+ return Storage;
+}
+
uint32_t
ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core)
{
@@ -1250,35 +1343,61 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core)
using namespace std::literals;
- OplogEntryMapping Mapping = GetMapping(Core);
+ RefPtr<OplogStorage> Storage = GetStorage();
+ if (!m_Storage)
+ {
+ return 0xffffffffu;
+ }
- MemoryView Buffer = Core.GetView();
- const uint64_t WriteSize = Buffer.GetSize();
- const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF);
+ OplogEntryMapping Mapping = GetMapping(Core);
+ OplogStorage::AppendOpData OpData = OplogStorage::GetAppendOpData(Core);
- ZEN_ASSERT(WriteSize != 0);
+ const OplogEntry OpEntry = m_Storage->AppendOp(OpData);
- XXH3_128Stream KeyHasher;
- Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
- XXH3_128 KeyHash128 = KeyHasher.GetHash();
- Oid KeyHash;
- memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash);
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry);
- RefPtr<OplogStorage> Storage;
+ return EntryId;
+}
+
+std::vector<uint32_t>
+ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores)
+{
+ ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntries");
+
+ using namespace std::literals;
+
+ RefPtr<OplogStorage> Storage = GetStorage();
+ if (!m_Storage)
{
- RwLock::SharedLockScope _(m_OplogLock);
- Storage = m_Storage;
+ return std::vector<uint32_t>(Cores.size(), 0xffffffffu);
}
- if (!m_Storage)
+
+ size_t OpCount = Cores.size();
+ std::vector<OplogEntryMapping> Mappings;
+ std::vector<OplogStorage::AppendOpData> OpDatas;
+ Mappings.resize(OpCount);
+ OpDatas.resize(OpCount);
+
+ for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
{
- return 0xffffffffu;
+ const CbObjectView& Core = Cores[OpIndex];
+ OpDatas[OpIndex] = OplogStorage::GetAppendOpData(Core);
+ Mappings[OpIndex] = GetMapping(Core);
}
- const OplogEntry OpEntry = m_Storage->AppendOp(Buffer, OpCoreHash, KeyHash);
- RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
- const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry);
+ std::vector<OplogEntry> OpEntries = Storage->AppendOps(OpDatas);
- return EntryId;
+ std::vector<uint32_t> EntryIds;
+ EntryIds.resize(OpCount);
+ {
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
+ {
+ EntryIds[OpIndex] = RegisterOplogEntry(OplogLock, Mappings[OpIndex], OpEntries[OpIndex]);
+ }
+ }
+ return EntryIds;
}
//////////////////////////////////////////////////////////////////////////
@@ -2730,7 +2849,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
}
Project->TouchOplog(OplogId);
- size_t MaxBlockSize = 128u * 1024u * 1024u;
+ size_t MaxBlockSize = RemoteStoreOptions::DefaultMaxBlockSize;
if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false)
{
if (auto Value = ParseInt<size_t>(Param))
@@ -2738,7 +2857,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
MaxBlockSize = Value.value();
}
}
- size_t MaxChunkEmbedSize = 1024u * 1024u;
+ size_t MaxChunkEmbedSize = RemoteStoreOptions::DefaultMaxChunkEmbedSize;
if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false)
{
if (auto Value = ParseInt<size_t>(Param))
@@ -2758,9 +2877,9 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
/* BuildBlocks */ false,
/* IgnoreMissingAttachemnts */ false,
[](CompressedBuffer&&, const IoHash) {},
- [](const IoHash&) {},
+ [](const IoHash&, const TGetAttachmentBufferFunc&) {},
[](const std::unordered_set<IoHash, IoHash::Hasher>) {},
- nullptr);
+ /* EmbedLooseFiles*/ false);
OutResponse = std::move(ContainerResult.ContainerObject);
return ConvertResult(ContainerResult);
@@ -3119,8 +3238,8 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op
using namespace std::literals;
- size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(64u * 1024u * 1024u);
- size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
bool Force = Params["force"sv].AsBool(false);
bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false);
@@ -3180,8 +3299,8 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
using namespace std::literals;
- size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(64u * 1024u * 1024u);
- size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
bool Force = Params["force"sv].AsBool(false);
bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);