aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-02-05 16:52:42 +0100
committerGitHub <[email protected]>2024-02-05 16:52:42 +0100
commit222392fff48e1659ec8bc59e42de09f3625111ad (patch)
tree4e17c75beabd7704f2bca3feb26d0aa9e7de204e /src
parent5.4.1-pre0 (diff)
downloadzen-222392fff48e1659ec8bc59e42de09f3625111ad.tar.xz
zen-222392fff48e1659ec8bc59e42de09f3625111ad.zip
compress large attachments on demand (#647)
- Improvement: Speed up oplog export by fetching/compressing big attachments on demand - Improvement: Speed up oplog export by batch-fetcing small attachments - Improvement: Speed up oplog import by batching writes of oplog ops - Improvement: Tweak oplog export default block size and embed size limit - Improvement: Add more messaging and progress during oplog import/export
Diffstat (limited to 'src')
-rw-r--r--src/zencore/compositebuffer.cpp35
-rw-r--r--src/zencore/include/zencore/stream.h4
-rw-r--r--src/zenserver/projectstore/projectstore.cpp183
-rw-r--r--src/zenserver/projectstore/projectstore.h5
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp710
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h14
6 files changed, 561 insertions, 390 deletions
diff --git a/src/zencore/compositebuffer.cpp b/src/zencore/compositebuffer.cpp
index f29f6e810..583ef19c6 100644
--- a/src/zencore/compositebuffer.cpp
+++ b/src/zencore/compositebuffer.cpp
@@ -155,7 +155,11 @@ CompositeBuffer::ViewOrCopyRange(Iterator& It, uint64_t Size, UniqueBuffer& Copy
// A hot path for this code is when we call CompressedBuffer::FromCompressed which
// is only interested in reading the header (first 64 bytes or so) and then throws
// away the materialized data.
- MutableMemoryView WriteView;
+ if (CopyBuffer.GetSize() < Size)
+ {
+ CopyBuffer = UniqueBuffer::Alloc(Size);
+ }
+ MutableMemoryView WriteView = CopyBuffer.GetMutableView();
size_t SegmentCount = m_Segments.size();
ZEN_ASSERT(It.SegmentIndex < SegmentCount);
uint64_t SizeLeft = Size;
@@ -163,31 +167,10 @@ CompositeBuffer::ViewOrCopyRange(Iterator& It, uint64_t Size, UniqueBuffer& Copy
{
const SharedBuffer& Segment = m_Segments[It.SegmentIndex];
size_t SegmentSize = Segment.GetSize();
- if (Size == SizeLeft && Size <= (SegmentSize - It.OffsetInSegment))
- {
- IoBuffer SubSegment(Segment.AsIoBuffer(), It.OffsetInSegment, SizeLeft);
- MemoryView View = SubSegment.GetView();
- It.OffsetInSegment += SizeLeft;
- ZEN_ASSERT_SLOW(It.OffsetInSegment <= SegmentSize);
- if (It.OffsetInSegment == SegmentSize)
- {
- It.SegmentIndex++;
- It.OffsetInSegment = 0;
- }
- return View;
- }
- if (WriteView.GetSize() == 0)
- {
- if (CopyBuffer.GetSize() < Size)
- {
- CopyBuffer = UniqueBuffer::Alloc(Size);
- }
- WriteView = CopyBuffer.GetMutableView();
- }
- size_t CopySize = zen::Min(SegmentSize - It.OffsetInSegment, SizeLeft);
- IoBuffer SubSegment(Segment.AsIoBuffer(), It.OffsetInSegment, CopySize);
- MemoryView ReadView = SubSegment.GetView();
- WriteView = WriteView.CopyFrom(ReadView);
+ size_t CopySize = zen::Min(SegmentSize - It.OffsetInSegment, SizeLeft);
+ IoBuffer SubSegment(Segment.AsIoBuffer(), It.OffsetInSegment, CopySize);
+ MemoryView ReadView = SubSegment.GetView();
+ WriteView = WriteView.CopyFrom(ReadView);
It.OffsetInSegment += CopySize;
ZEN_ASSERT_SLOW(It.OffsetInSegment <= SegmentSize);
if (It.OffsetInSegment == SegmentSize)
diff --git a/src/zencore/include/zencore/stream.h b/src/zencore/include/zencore/stream.h
index a9d35ef1b..a28290041 100644
--- a/src/zencore/include/zencore/stream.h
+++ b/src/zencore/include/zencore/stream.h
@@ -34,8 +34,8 @@ public:
inline const uint8_t* Data() const { return m_Buffer.data(); }
inline const uint8_t* GetData() const { return m_Buffer.data(); }
- inline uint64_t Size() const { return m_Buffer.size(); }
- inline uint64_t GetSize() const { return m_Buffer.size(); }
+ inline uint64_t Size() const { return m_Offset; }
+ inline uint64_t GetSize() const { return m_Offset; }
void Reset();
inline MemoryView GetView() const { return MemoryView(m_Buffer.data(), m_Offset); }
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index ea219f9b0..72a8e1409 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -448,13 +448,40 @@ struct ProjectStore::OplogStorage : public RefCounted
return CbObject(SharedBuffer(std::move(OpBuffer)));
}
- OplogEntry AppendOp(MemoryView Buffer, uint32_t OpCoreHash, Oid KeyHash)
+ struct AppendOpData
+ {
+ MemoryView Buffer;
+ uint32_t OpCoreHash;
+ Oid KeyHash;
+ };
+
+ static OplogStorage::AppendOpData GetAppendOpData(const CbObjectView& Core)
+ {
+ using namespace std::literals;
+
+ AppendOpData OpData;
+
+ OpData.Buffer = Core.GetView();
+ const uint64_t WriteSize = OpData.Buffer.GetSize();
+ OpData.OpCoreHash = uint32_t(XXH3_64bits(OpData.Buffer.GetData(), WriteSize) & 0xffffFFFF);
+
+ ZEN_ASSERT(WriteSize != 0);
+
+ XXH3_128Stream KeyHasher;
+ Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ XXH3_128 KeyHash128 = KeyHasher.GetHash();
+ memcpy(&OpData.KeyHash, KeyHash128.Hash, sizeof OpData.KeyHash);
+
+ return OpData;
+ }
+
+ OplogEntry AppendOp(const AppendOpData& OpData)
{
ZEN_TRACE_CPU("Store::OplogStorage::AppendOp");
using namespace std::literals;
- uint64_t WriteSize = Buffer.GetSize();
+ uint64_t WriteSize = OpData.Buffer.GetSize();
RwLock::ExclusiveLockScope Lock(m_RwLock);
const uint64_t WriteOffset = m_NextOpsOffset;
@@ -467,15 +494,70 @@ struct ProjectStore::OplogStorage : public RefCounted
OplogEntry Entry = {.OpLsn = OpLsn,
.OpCoreOffset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign),
.OpCoreSize = uint32_t(WriteSize),
- .OpCoreHash = OpCoreHash,
- .OpKeyHash = KeyHash};
+ .OpCoreHash = OpData.OpCoreHash,
+ .OpKeyHash = OpData.KeyHash};
m_Oplog.Append(Entry);
- m_OpBlobs.Write(Buffer.GetData(), WriteSize, WriteOffset);
+ m_OpBlobs.Write(OpData.Buffer.GetData(), WriteSize, WriteOffset);
return Entry;
}
+ std::vector<OplogEntry> AppendOps(std::span<const AppendOpData> Ops)
+ {
+ ZEN_TRACE_CPU("Store::OplogStorage::AppendOps");
+
+ using namespace std::literals;
+
+ size_t OpCount = Ops.size();
+ std::vector<std::pair<uint64_t, uint64_t>> OffsetAndSizes;
+ std::vector<uint32_t> OpLsns;
+ OffsetAndSizes.resize(OpCount);
+ OpLsns.resize(OpCount);
+
+ for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
+ {
+ OffsetAndSizes[OpIndex].second = Ops[OpIndex].Buffer.GetSize();
+ }
+
+ uint64_t WriteStart = 0;
+ uint64_t WriteLength = 0;
+ {
+ RwLock::ExclusiveLockScope Lock(m_RwLock);
+ WriteStart = m_NextOpsOffset;
+ ZEN_ASSERT(IsMultipleOf(WriteStart, m_OpsAlign));
+ uint64_t WriteOffset = WriteStart;
+ for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
+ {
+ OffsetAndSizes[OpIndex].first = WriteOffset - WriteStart;
+ OpLsns[OpIndex] = ++m_MaxLsn;
+ WriteOffset = RoundUp(WriteOffset + OffsetAndSizes[OpIndex].second, m_OpsAlign);
+ }
+ WriteLength = WriteOffset - WriteStart;
+ m_NextOpsOffset = RoundUp(WriteOffset, m_OpsAlign);
+ }
+
+ IoBuffer WriteBuffer(WriteLength);
+
+ std::vector<OplogEntry> Entries;
+ Entries.resize(OpCount);
+ for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
+ {
+ MutableMemoryView WriteBufferView = WriteBuffer.GetMutableView().RightChop(OffsetAndSizes[OpIndex].first);
+ WriteBufferView.CopyFrom(Ops[OpIndex].Buffer);
+ Entries[OpIndex] = {.OpLsn = OpLsns[OpIndex],
+ .OpCoreOffset = gsl::narrow_cast<uint32_t>((WriteStart + OffsetAndSizes[OpIndex].first) / m_OpsAlign),
+ .OpCoreSize = uint32_t(OffsetAndSizes[OpIndex].second),
+ .OpCoreHash = Ops[OpIndex].OpCoreHash,
+ .OpKeyHash = Ops[OpIndex].KeyHash};
+ }
+
+ m_OpBlobs.Write(WriteBuffer.GetData(), WriteBuffer.GetSize(), WriteStart);
+ m_Oplog.Append(Entries);
+
+ return Entries;
+ }
+
void AppendTombstone(Oid KeyHash)
{
OplogEntry Entry = {.OpKeyHash = KeyHash};
@@ -1243,6 +1325,17 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
return EntryId;
}
+RefPtr<ProjectStore::OplogStorage>
+ProjectStore::Oplog::GetStorage()
+{
+ RefPtr<OplogStorage> Storage;
+ {
+ RwLock::SharedLockScope _(m_OplogLock);
+ Storage = m_Storage;
+ }
+ return Storage;
+}
+
uint32_t
ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core)
{
@@ -1250,35 +1343,61 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core)
using namespace std::literals;
- OplogEntryMapping Mapping = GetMapping(Core);
+ RefPtr<OplogStorage> Storage = GetStorage();
+ if (!m_Storage)
+ {
+ return 0xffffffffu;
+ }
- MemoryView Buffer = Core.GetView();
- const uint64_t WriteSize = Buffer.GetSize();
- const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF);
+ OplogEntryMapping Mapping = GetMapping(Core);
+ OplogStorage::AppendOpData OpData = OplogStorage::GetAppendOpData(Core);
- ZEN_ASSERT(WriteSize != 0);
+ const OplogEntry OpEntry = m_Storage->AppendOp(OpData);
- XXH3_128Stream KeyHasher;
- Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
- XXH3_128 KeyHash128 = KeyHasher.GetHash();
- Oid KeyHash;
- memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash);
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry);
- RefPtr<OplogStorage> Storage;
+ return EntryId;
+}
+
+std::vector<uint32_t>
+ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores)
+{
+ ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntries");
+
+ using namespace std::literals;
+
+ RefPtr<OplogStorage> Storage = GetStorage();
+ if (!m_Storage)
{
- RwLock::SharedLockScope _(m_OplogLock);
- Storage = m_Storage;
+ return std::vector<uint32_t>(Cores.size(), 0xffffffffu);
}
- if (!m_Storage)
+
+ size_t OpCount = Cores.size();
+ std::vector<OplogEntryMapping> Mappings;
+ std::vector<OplogStorage::AppendOpData> OpDatas;
+ Mappings.resize(OpCount);
+ OpDatas.resize(OpCount);
+
+ for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
{
- return 0xffffffffu;
+ const CbObjectView& Core = Cores[OpIndex];
+ OpDatas[OpIndex] = OplogStorage::GetAppendOpData(Core);
+ Mappings[OpIndex] = GetMapping(Core);
}
- const OplogEntry OpEntry = m_Storage->AppendOp(Buffer, OpCoreHash, KeyHash);
- RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
- const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry);
+ std::vector<OplogEntry> OpEntries = Storage->AppendOps(OpDatas);
- return EntryId;
+ std::vector<uint32_t> EntryIds;
+ EntryIds.resize(OpCount);
+ {
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
+ {
+ EntryIds[OpIndex] = RegisterOplogEntry(OplogLock, Mappings[OpIndex], OpEntries[OpIndex]);
+ }
+ }
+ return EntryIds;
}
//////////////////////////////////////////////////////////////////////////
@@ -2730,7 +2849,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
}
Project->TouchOplog(OplogId);
- size_t MaxBlockSize = 128u * 1024u * 1024u;
+ size_t MaxBlockSize = RemoteStoreOptions::DefaultMaxBlockSize;
if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false)
{
if (auto Value = ParseInt<size_t>(Param))
@@ -2738,7 +2857,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
MaxBlockSize = Value.value();
}
}
- size_t MaxChunkEmbedSize = 1024u * 1024u;
+ size_t MaxChunkEmbedSize = RemoteStoreOptions::DefaultMaxChunkEmbedSize;
if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false)
{
if (auto Value = ParseInt<size_t>(Param))
@@ -2758,9 +2877,9 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
/* BuildBlocks */ false,
/* IgnoreMissingAttachemnts */ false,
[](CompressedBuffer&&, const IoHash) {},
- [](const IoHash&) {},
+ [](const IoHash&, const TGetAttachmentBufferFunc&) {},
[](const std::unordered_set<IoHash, IoHash::Hasher>) {},
- nullptr);
+ /* EmbedLooseFiles*/ false);
OutResponse = std::move(ContainerResult.ContainerObject);
return ConvertResult(ContainerResult);
@@ -3119,8 +3238,8 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op
using namespace std::literals;
- size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(64u * 1024u * 1024u);
- size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
bool Force = Params["force"sv].AsBool(false);
bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false);
@@ -3180,8 +3299,8 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
using namespace std::literals;
- size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(64u * 1024u * 1024u);
- size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
bool Force = Params["force"sv].AsBool(false);
bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index f9611653b..5b873b758 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -109,7 +109,8 @@ public:
*/
uint32_t AppendNewOplogEntry(CbPackage Op);
- uint32_t AppendNewOplogEntry(CbObjectView Core);
+ uint32_t AppendNewOplogEntry(CbObjectView Core);
+ std::vector<uint32_t> AppendNewOplogEntries(std::span<CbObjectView> Cores);
enum UpdateType
{
@@ -166,6 +167,8 @@ public:
RefPtr<OplogStorage> m_Storage;
std::string m_OplogId;
+ RefPtr<OplogStorage> GetStorage();
+
/** Scan oplog and register each entry, thus updating the in-memory tracking tables
*/
void ReplayLog();
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 445179983..a8aa22526 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -197,17 +197,15 @@ CreateBlock(WorkerThreadPool& WorkerPool,
{
return;
}
- if (!Chunks.empty())
+ ZEN_ASSERT(!Chunks.empty());
+ CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); // Move to callback and return IoHash
+ IoHash BlockHash = CompressedBlock.DecodeRawHash();
{
- CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); // Move to callback and return IoHash
- IoHash BlockHash = CompressedBlock.DecodeRawHash();
- AsyncOnBlock(std::move(CompressedBlock), BlockHash);
- {
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope __(SectionsLock);
- Blocks[BlockIndex].BlockHash = BlockHash;
- }
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope __(SectionsLock);
+ Blocks[BlockIndex].BlockHash = BlockHash;
}
+ AsyncOnBlock(std::move(CompressedBlock), BlockHash);
});
}
@@ -234,30 +232,27 @@ BuildContainer(CidStore& ChunkStore,
const std::vector<Block>& KnownBlocks,
WorkerThreadPool& WorkerPool,
const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
- const std::function<void(const IoHash&)>& OnLargeAttachment,
+ const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
- tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutLooseAttachments,
+ bool EmbedLooseFiles,
JobContext* OptionalContext,
AsyncRemoteResult& RemoteResult)
{
using namespace std::literals;
- std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
- CbObjectWriter SectionOpsWriter;
- SectionOpsWriter.BeginArray("ops"sv);
-
size_t OpCount = 0;
CbObject OplogContainerObject;
{
+ std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
+ std::unordered_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseAttachments;
+
RwLock BlocksLock;
std::vector<Block> Blocks;
CompressedBuffer OpsBuffer;
std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
- size_t BlockSize = 0;
- std::vector<SharedBuffer> ChunksInBlock;
std::unordered_map<IoHash, int, IoHash::Hasher> Attachments;
auto RewriteOp = [&](int LSN, CbObjectView Op, const std::function<void(CbObjectView)>& CB) {
@@ -282,111 +277,69 @@ BuildContainer(CidStore& ChunkStore,
if (DataHash == IoHash::Zero)
{
+ std::string_view ServerPath = View["serverpath"sv].AsString();
+ std::filesystem::path FilePath = Project.RootDir / ServerPath;
+ if (!std::filesystem::is_regular_file(FilePath))
{
- // Read file contents into memory and compress
-
- std::string_view ServerPath = View["serverpath"sv].AsString();
- std::filesystem::path FilePath = Project.RootDir / ServerPath;
- BasicFile DataFile;
- std::error_code Ec;
- DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec);
- if (Ec)
- {
- ExtendableStringBuilder<1024> Sb;
- Sb.Append("Failed to find attachment '");
- Sb.Append(FilePath.string());
- Sb.Append("' for op: \n");
- Op.ToJson(Sb);
-
- ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", FilePath, Sb.ToView()));
- if (IgnoreMissingAttachments)
- {
- continue;
- }
- else
- {
- throw std::system_error(
- Ec,
- fmt::format("failed to open file '{}', mode: {:x}", FilePath, uint32_t(BasicFile::Mode::kRead)));
- }
- }
-
- IoBuffer FileIoBuffer = DataFile.ReadAll();
- DataFile.Close();
-
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer)));
-
- DataHash = Compressed.DecodeRawHash();
- uint64_t PayloadSize = Compressed.GetCompressed().GetSize();
- if (PayloadSize > MaxChunkEmbedSize)
+ ExtendableStringBuilder<1024> Sb;
+ Sb.Append("Failed to find attachment '");
+ Sb.Append(FilePath.string());
+ Sb.Append("' for op: \n");
+ Op.ToJson(Sb);
+
+ ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", FilePath, Sb.ToView()));
+ if (IgnoreMissingAttachments)
{
- // Write it out as a temporary file
- IoBuffer AttachmentBuffer;
- std::filesystem::path AttachmentPath = Oplog.TempPath() / DataHash.ToHexString();
- if (std::filesystem::is_regular_file(AttachmentPath))
- {
- AttachmentBuffer = IoBufferBuilder::MakeFromFile(AttachmentPath);
- if (AttachmentBuffer.GetSize() != PayloadSize)
- {
- AttachmentBuffer = IoBuffer{};
- }
- }
- if (!AttachmentBuffer)
- {
- BasicFile BlockFile;
- uint32_t RetriesLeft = 3;
- BlockFile.Open(AttachmentPath, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) {
- if (RetriesLeft == 0)
- {
- return false;
- }
- ZEN_WARN("Failed to create temp attachment '{}': '{}', retries left: {}.",
- AttachmentPath,
- Ec.message(),
- RetriesLeft);
- Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms
- RetriesLeft--;
- return true;
- });
- uint64_t Offset = 0;
- for (const SharedBuffer& Buffer : Compressed.GetCompressed().GetSegments())
- {
- BlockFile.Write(Buffer.GetView(), Offset);
- Offset += Buffer.GetSize();
- }
- void* FileHandle = BlockFile.Detach();
- AttachmentBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true);
-
- AttachmentBuffer.SetDeleteOnClose(true);
- ZEN_DEBUG("Saved temp attachment {}, {}", DataHash, NiceBytes(PayloadSize));
- }
- OutLooseAttachments->insert_or_assign(DataHash, AttachmentBuffer);
+ continue;
}
else
{
- // If it is small we just hang on to the compressed buffer
- OutLooseAttachments->insert_or_assign(DataHash, Compressed.GetCompressed().Flatten().AsIoBuffer());
+ throw std::runtime_error(fmt::format("failed to open file '{}'", FilePath));
}
}
+ SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath));
+ // Large file, just hash it for now and leave compression for later via callback
+ // We don't want to spend time compressing a large attachment if we don't need to upload it
+ DataHash = IoHash::HashBuffer(DataBuffer);
+ if (DataBuffer.GetSize() > MaxChunkEmbedSize)
+ {
+ LargeChunkHashes.insert(DataHash);
+ }
+ LooseAttachments.insert_or_assign(
+ DataHash,
+ [AttachmentBuffer = std::move(DataBuffer)](const IoHash& DataHash) -> IoBuffer {
+ Stopwatch AttachmentTimer;
+ uint64_t RawSize = AttachmentBuffer.GetSize();
+ CompressedBuffer Compressed =
+ CompressedBuffer::Compress(AttachmentBuffer, OodleCompressor::Mermaid, OodleCompressionLevel::Normal);
+ ZEN_ASSERT(Compressed.DecodeRawHash() == DataHash);
+ uint64_t PayloadSize = Compressed.GetCompressedSize();
+ ZEN_INFO("Compressed loose file attachment {} ({} -> {}) in {}",
+ DataHash,
+ NiceBytes(RawSize),
+ NiceBytes(PayloadSize),
+ NiceTimeSpanMs(static_cast<uint64_t>(AttachmentTimer.GetElapsedTimeMs())));
+ return Compressed.GetCompressed().Flatten().AsIoBuffer();
+ });
+ }
- // Rewrite file array entry with new data reference
- CbObjectWriter Writer;
- RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
- if (Field.GetName() == "data"sv)
- {
- // omit this field as we will write it explicitly ourselves
- return true;
- }
- return false;
- });
- Writer.AddBinaryAttachment("data"sv, DataHash);
+ // Rewrite file array entry with new data reference
+ CbObjectWriter Writer;
+ RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
+ if (Field.GetName() == "data"sv)
+ {
+ // omit this field as we will write it explicitly ourselves
+ return true;
+ }
+ return false;
+ });
+ Writer.AddBinaryAttachment("data"sv, DataHash);
- CbObject RewrittenOp = Writer.Save();
- Cbo.AddObject(std::move(RewrittenOp));
- CopyField = false;
+ CbObject RewrittenOp = Writer.Save();
+ Cbo.AddObject(std::move(RewrittenOp));
+ CopyField = false;
- Attachments.insert_or_assign(DataHash, LSN);
- }
+ Attachments.insert_or_assign(DataHash, LSN);
}
if (CopyField)
@@ -423,41 +376,66 @@ BuildContainer(CidStore& ChunkStore,
ReportMessage(OptionalContext, "Building exported oplog and collecting attachments");
- Stopwatch Timer;
- tsl::robin_map<int, std::string> OpLSNToKey;
+ Stopwatch Timer;
+ tsl::robin_map<int, std::string> OpLSNToKey;
+ CompressedBuffer CompressedOpsSection;
{
- Stopwatch RewriteOplogTimer;
- Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObjectView Op) {
- if (RemoteResult.IsError())
- {
- return;
- }
- std::string_view Key = Op["key"sv].AsString();
- OpLSNToKey.insert({LSN, std::string(Key)});
- Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); });
- if (OutLooseAttachments != nullptr)
- {
- RewriteOp(LSN, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; });
- }
- else
- {
- SectionOpsWriter << Op;
- }
- OpCount++;
+ CbObjectWriter SectionOpsWriter;
+ SectionOpsWriter.BeginArray("ops"sv);
+ {
+ Stopwatch RewriteOplogTimer;
+ Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObjectView Op) {
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ std::string_view Key = Op["key"sv].AsString();
+ OpLSNToKey.insert({LSN, std::string(Key)});
+ Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); });
+ if (EmbedLooseFiles)
+ {
+ RewriteOp(LSN, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; });
+ }
+ else
+ {
+ SectionOpsWriter << Op;
+ }
+ OpCount++;
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ return;
+ }
+ if (OpCount % 100000 == 0)
+ {
+ ReportMessage(OptionalContext, fmt::format("Building oplog, at op {}...", OpCount));
+ }
+ });
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ return {};
}
- if (OpCount % 100000 == 0)
- {
- ReportMessage(OptionalContext, fmt::format("Building oplog, at op {}...", OpCount));
- }
- });
- ReportMessage(OptionalContext,
- fmt::format("Rewrote {} ops to new oplog in {}",
- OpCount,
- NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs()))));
+ ReportMessage(OptionalContext,
+ fmt::format("Rewrote {} ops to new oplog in {}",
+ OpCount,
+ NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs()))));
+ }
+ SectionOpsWriter.EndArray(); // "ops"
+
+ {
+ Stopwatch CompressOpsTimer;
+ CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer(),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::Normal);
+ ReportMessage(OptionalContext,
+ fmt::format("Compressed oplog section {} ({} -> {}) in {}",
+ CompressedOpsSection.DecodeRawHash(),
+ NiceBytes(CompressedOpsSection.DecodeRawSize()),
+ NiceBytes(CompressedOpsSection.GetCompressedSize()),
+ NiceTimeSpanMs(static_cast<uint64_t>(CompressOpsTimer.GetElapsedTimeMs()))));
+ }
}
if (IsCancelled(OptionalContext))
@@ -565,96 +543,205 @@ BuildContainer(CidStore& ChunkStore,
SortedAttachments.size(),
OpLSNToKey.size()));
- auto GetPayload = [&](const IoHash& AttachmentHash) {
- if (OutLooseAttachments != nullptr)
+ for (const IoHash& AttachmentHash : LargeChunkHashes)
+ {
+ if (IsCancelled(OptionalContext))
{
- auto PayloadIt = OutLooseAttachments->find(AttachmentHash);
- if (PayloadIt != OutLooseAttachments->end())
- {
- return PayloadIt->second;
- }
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ return {};
+ }
+
+ if (auto It = LooseAttachments.find(AttachmentHash); It != LooseAttachments.end())
+ {
+ OnLargeAttachment(AttachmentHash, std::move(It->second));
+ LooseAttachments.erase(It);
}
- return ChunkStore.FindChunkByCid(AttachmentHash);
+ else
+ {
+ OnLargeAttachment(AttachmentHash,
+ [&ChunkStore](const IoHash& AttachmentHash) { return ChunkStore.FindChunkByCid(AttachmentHash); });
+ }
+ }
+ size_t LargeAttachmentCount = LargeChunkHashes.size();
+
+ Latch BlockCreateLatch(1);
+ size_t GeneratedBlockCount = 0;
+ size_t BlockSize = 0;
+ std::vector<SharedBuffer> ChunksInBlock;
+ int LastLSNOp = -1;
+ auto GetPayload = [&](const IoHash& AttachmentHash) {
+ if (auto It = LooseAttachments.find(AttachmentHash); It != LooseAttachments.end())
+ {
+ IoBuffer Payload = It->second(AttachmentHash);
+ LooseAttachments.erase(It);
+ return Payload;
+ }
+ return ChunkStore.FindChunkByCid(AttachmentHash);
};
- Latch BlockCreateLatch(1);
- size_t GeneratedBlockCount = 0;
- size_t LargeAttachmentCount = 0;
- int LastLSNOp = -1;
+ uint32_t ResolvedLargeCount = 0;
+ uint32_t ResolvedSmallCount = 0;
+ uint32_t ResolvedFailedCount = 0;
+ uint32_t ComposedBlocks = 0;
- for (const IoHash& AttachmentHash : SortedAttachments)
+ for (auto HashIt = SortedAttachments.begin(); HashIt != SortedAttachments.end();)
{
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- BlockCreateLatch.CountDown();
- while (!BlockCreateLatch.Wait(1000))
+ break;
+ }
+ ReportProgress(OptionalContext,
+ fmt::format("Resolving attachments: {} large, {} small, {} blocks built",
+ ResolvedLargeCount,
+ ResolvedSmallCount,
+ ComposedBlocks),
+ SortedAttachments.size(),
+ SortedAttachments.size() - (ResolvedLargeCount + ResolvedSmallCount + ResolvedFailedCount));
+
+ std::vector<IoHash> SmallChunkHashes;
+ SmallChunkHashes.reserve(8);
+ while (HashIt != SortedAttachments.end() && SmallChunkHashes.size() < 8)
+ {
+ if (LargeChunkHashes.contains(*HashIt))
{
- ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
+ ResolvedLargeCount++;
}
- return {};
+ else
+ {
+ SmallChunkHashes.push_back(*HashIt);
+ }
+ ++HashIt;
}
- auto It = Attachments.find(AttachmentHash);
- ZEN_ASSERT(It != Attachments.end());
- IoBuffer Payload = GetPayload(AttachmentHash);
- if (!Payload)
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> ResolvedSmallChunks;
+ RwLock ResolvedSmallChunksLock;
+ Latch ResolvedSmallChunksLatch(1);
+ for (const IoHash& SmallChunkHash : SmallChunkHashes)
{
- std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second);
- ZEN_ASSERT(Op.has_value());
- ExtendableStringBuilder<1024> Sb;
- Sb.Append("Failed to find attachment '");
- Sb.Append(AttachmentHash.ToHexString());
- Sb.Append("' for op: \n");
- Op.value().ToJson(Sb);
+ ResolvedSmallChunksLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&GetPayload, SmallChunkHash, &ResolvedSmallChunksLatch, &ResolvedSmallChunksLock, &ResolvedSmallChunks]() {
+ auto _ = MakeGuard([&ResolvedSmallChunksLatch] { ResolvedSmallChunksLatch.CountDown(); });
- ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView()));
+ IoBuffer Payload = GetPayload(SmallChunkHash);
+ {
+ RwLock::ExclusiveLockScope __(ResolvedSmallChunksLock);
+ ResolvedSmallChunks.insert_or_assign(SmallChunkHash, std::move(Payload));
+ }
+ });
+ }
+ ResolvedSmallChunksLatch.CountDown();
+ ResolvedSmallChunksLatch.Wait();
- if (IgnoreMissingAttachments)
+ for (const IoHash& AttachmentHash : SmallChunkHashes)
+ {
+ auto ResolvedIt = ResolvedSmallChunks.find(AttachmentHash);
+ ZEN_ASSERT(ResolvedIt != ResolvedSmallChunks.end());
+ IoBuffer Payload = std::move(ResolvedIt->second);
+ auto It = Attachments.find(AttachmentHash);
+ ZEN_ASSERT(It != Attachments.end());
+ if (!Payload)
{
- continue;
+ std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second);
+ ZEN_ASSERT(Op.has_value());
+ ExtendableStringBuilder<1024> Sb;
+ Sb.Append("Failed to find attachment '");
+ Sb.Append(AttachmentHash.ToHexString());
+ Sb.Append("' for op: \n");
+ Op.value().ToJson(Sb);
+
+ ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView()));
+
+ if (IgnoreMissingAttachments)
+ {
+ ResolvedFailedCount++;
+ continue;
+ }
+ else
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
+ }
+ return {};
+ }
}
- else
+
+ uint64_t PayloadSize = Payload.GetSize();
+ if (PayloadSize > MaxChunkEmbedSize)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
- BlockCreateLatch.CountDown();
- while (!BlockCreateLatch.Wait(1000))
+ if (LargeChunkHashes.insert(AttachmentHash).second)
{
- ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
+ OnLargeAttachment(AttachmentHash, [Payload = std::move(Payload)](const IoHash&) { return std::move(Payload); });
+ LargeAttachmentCount++;
}
- return {};
+ ResolvedLargeCount++;
+ continue;
+ }
+ else
+ {
+ ResolvedSmallCount++;
}
- }
- uint64_t PayloadSize = Payload.GetSize();
- if (PayloadSize > MaxChunkEmbedSize)
- {
- if (LargeChunkHashes.insert(AttachmentHash).second)
+ if (!BlockAttachmentHashes.insert(AttachmentHash).second)
{
- OnLargeAttachment(AttachmentHash);
- LargeAttachmentCount++;
+ continue;
}
- continue;
- }
- if (!BlockAttachmentHashes.insert(AttachmentHash).second)
- {
- continue;
- }
+ const int CurrentOpLSN = It->second;
- const int CurrentOpLSN = It->second;
+ BlockSize += PayloadSize;
+ if (BuildBlocks)
+ {
+ ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload)));
+ }
+ else
+ {
+ Payload = {};
+ }
- BlockSize += PayloadSize;
- if (BuildBlocks)
- {
- ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload)));
- }
- else
- {
- Payload = {};
+ if (BlockSize >= MaxBlockSize && (CurrentOpLSN != LastLSNOp))
+ {
+ size_t BlockIndex = AddBlock(BlocksLock, Blocks);
+ if (BuildBlocks)
+ {
+ CreateBlock(WorkerPool,
+ BlockCreateLatch,
+ std::move(ChunksInBlock),
+ BlocksLock,
+ Blocks,
+ BlockIndex,
+ AsyncOnBlock,
+ RemoteResult);
+ ComposedBlocks++;
+ }
+ else
+ {
+ ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size());
+ OnBlockChunks(BlockAttachmentHashes);
+ }
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope _(BlocksLock);
+ Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
+ BlockAttachmentHashes.begin(),
+ BlockAttachmentHashes.end());
+ }
+ BlockAttachmentHashes.clear();
+ ChunksInBlock.clear();
+ BlockSize = 0;
+ GeneratedBlockCount++;
+ }
+ LastLSNOp = CurrentOpLSN;
}
+ }
- if (BlockSize >= MaxBlockSize && (CurrentOpLSN != LastLSNOp))
+ if (BlockSize > 0)
+ {
+ if (!IsCancelled(OptionalContext))
{
size_t BlockIndex = AddBlock(BlocksLock, Blocks);
if (BuildBlocks)
@@ -667,6 +754,7 @@ BuildContainer(CidStore& ChunkStore,
BlockIndex,
AsyncOnBlock,
RemoteResult);
+ ComposedBlocks++;
}
else
{
@@ -685,51 +773,15 @@ BuildContainer(CidStore& ChunkStore,
BlockSize = 0;
GeneratedBlockCount++;
}
- LastLSNOp = CurrentOpLSN;
- }
- if (BlockSize > 0)
- {
- size_t BlockIndex = AddBlock(BlocksLock, Blocks);
- if (BuildBlocks)
- {
- CreateBlock(WorkerPool,
- BlockCreateLatch,
- std::move(ChunksInBlock),
- BlocksLock,
- Blocks,
- BlockIndex,
- AsyncOnBlock,
- RemoteResult);
- }
- else
- {
- ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size());
- OnBlockChunks(BlockAttachmentHashes);
- }
- {
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope _(BlocksLock);
- Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
- BlockAttachmentHashes.begin(),
- BlockAttachmentHashes.end());
- }
- BlockAttachmentHashes.clear();
- ChunksInBlock.clear();
- BlockSize = 0;
- GeneratedBlockCount++;
}
- SectionOpsWriter.EndArray(); // "ops"
+ ReportProgress(OptionalContext,
+ fmt::format("Resolving attachments: {} large, {} small, {} blocks built",
+ ResolvedLargeCount,
+ ResolvedSmallCount,
+ ComposedBlocks),
+ SortedAttachments.size(),
+ 0);
- if (IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- BlockCreateLatch.CountDown();
- while (!BlockCreateLatch.Wait(1000))
- {
- ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
- }
- return {};
- }
ReportMessage(OptionalContext,
fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}",
SortedAttachments.size(),
@@ -738,7 +790,6 @@ BuildContainer(CidStore& ChunkStore,
LargeAttachmentCount,
NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs()))));
- CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer());
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
@@ -754,10 +805,6 @@ BuildContainer(CidStore& ChunkStore,
}
return {};
}
- ReportMessage(OptionalContext,
- fmt::format("Added oplog section {}, {}",
- CompressedOpsSection.DecodeRawHash(),
- NiceBytes(CompressedOpsSection.GetCompressedSize())));
BlockCreateLatch.CountDown();
while (!BlockCreateLatch.Wait(1000))
@@ -856,9 +903,9 @@ BuildContainer(CidStore& ChunkStore,
bool BuildBlocks,
bool IgnoreMissingAttachments,
const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
- const std::function<void(const IoHash&)>& OnLargeAttachment,
+ const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
- tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutOptionalTempAttachments)
+ bool EmbedLooseFiles)
{
WorkerThreadPool& WorkerPool = GetSmallWorkerPool();
@@ -875,7 +922,7 @@ BuildContainer(CidStore& ChunkStore,
AsyncOnBlock,
OnLargeAttachment,
OnBlockChunks,
- OutOptionalTempAttachments,
+ EmbedLooseFiles,
nullptr,
RemoteResult);
return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject};
@@ -891,18 +938,18 @@ struct UploadInfo
};
void
-UploadAttachments(WorkerThreadPool& WorkerPool,
- CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments,
- const std::vector<std::vector<IoHash>>& BlockChunks,
- const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks,
- const tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>& TempAttachments,
- const std::unordered_set<IoHash, IoHash::Hasher>& Needs,
- bool ForceAll,
- UploadInfo& Info,
- AsyncRemoteResult& RemoteResult,
- JobContext* OptionalContext)
+UploadAttachments(WorkerThreadPool& WorkerPool,
+ CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments,
+ const std::vector<std::vector<IoHash>>& BlockChunks,
+ const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks,
+ const tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LooseFileAttachments,
+ const std::unordered_set<IoHash, IoHash::Hasher>& Needs,
+ bool ForceAll,
+ UploadInfo& Info,
+ AsyncRemoteResult& RemoteResult,
+ JobContext* OptionalContext)
{
using namespace std::literals;
@@ -1004,16 +1051,6 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
continue;
}
- IoBuffer Payload;
- if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
- {
- Payload = BlockIt->second;
- }
- else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end())
- {
- Payload = LooseTmpFileIt->second;
- }
-
SaveAttachmentsLatch.AddCount(1);
AttachmentsToSave++;
WorkerPool.ScheduleWork([&ChunkStore,
@@ -1022,7 +1059,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
&RemoteResult,
RawHash,
&CreatedBlocks,
- TempPayload = std::move(Payload),
+ &LooseFileAttachments,
&Info,
OptionalContext]() {
auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
@@ -1030,7 +1067,19 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
{
return;
}
- IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash);
+ IoBuffer Payload;
+ if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
+ {
+ Payload = BlockIt->second;
+ }
+ else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
+ {
+ Payload = LooseTmpFileIt->second(RawHash);
+ }
+ else
+ {
+ Payload = ChunkStore.FindChunkByCid(RawHash);
+ }
if (!Payload)
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
@@ -1258,10 +1307,11 @@ SaveOplog(CidStore& ChunkStore,
CreateDirectories(AttachmentTempPath);
}
- AsyncRemoteResult RemoteResult;
- RwLock AttachmentsLock;
- std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments;
- std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks;
+ AsyncRemoteResult RemoteResult;
+ RwLock AttachmentsLock;
+ std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments;
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks;
+ tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles;
auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock,
const IoHash& BlockHash) {
@@ -1331,10 +1381,12 @@ SaveOplog(CidStore& ChunkStore,
ZEN_DEBUG("Found {} block chunks", Chunks.size());
};
- auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments](const IoHash& AttachmentHash) {
+ auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments, &LooseLargeFiles](const IoHash& AttachmentHash,
+ TGetAttachmentBufferFunc&& GetBufferFunc) {
{
RwLock::ExclusiveLockScope _(AttachmentsLock);
LargeAttachments.insert(AttachmentHash);
+ LooseLargeFiles.insert_or_assign(AttachmentHash, std::move(GetBufferFunc));
}
ZEN_DEBUG("Found attachment {}", AttachmentHash);
};
@@ -1394,8 +1446,7 @@ SaveOplog(CidStore& ChunkStore,
}
}
- tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> TempAttachments;
- CbObject OplogContainerObject = BuildContainer(ChunkStore,
+ CbObject OplogContainerObject = BuildContainer(ChunkStore,
Project,
Oplog,
MaxBlockSize,
@@ -1407,7 +1458,7 @@ SaveOplog(CidStore& ChunkStore,
OnBlock,
OnLargeAttachment,
OnBlockChunks,
- EmbedLooseFiles ? &TempAttachments : nullptr,
+ EmbedLooseFiles,
OptionalContext,
/* out */ RemoteResult);
if (!RemoteResult.IsError())
@@ -1451,7 +1502,7 @@ SaveOplog(CidStore& ChunkStore,
LargeAttachments,
BlockChunks,
CreatedBlocks,
- TempAttachments,
+ LooseLargeFiles,
ContainerSaveResult.Needs,
ForceUpload,
Info,
@@ -1510,7 +1561,7 @@ SaveOplog(CidStore& ChunkStore,
LargeAttachments,
BlockChunks,
CreatedBlocks,
- TempAttachments,
+ LooseLargeFiles,
ContainerFinalizeResult.Needs,
false,
Info,
@@ -1518,7 +1569,7 @@ SaveOplog(CidStore& ChunkStore,
OptionalContext);
}
- TempAttachments.clear();
+ LooseLargeFiles.clear();
CreatedBlocks.clear();
}
RemoteProjectStore::Result Result = RemoteResult.ConvertResult();
@@ -1612,54 +1663,65 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer();
- CbObject SectionObject = LoadCompactBinaryObject(SectionPayload);
- if (!SectionObject)
- {
- ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type"));
- return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest),
- Timer.GetElapsedTimeMs() / 1000.500,
- "Section has unexpected data type",
- "Failed to save oplog container"};
- }
-
- CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
- const uint64_t OpCount = OpsArray.Num();
- uint64_t OpsLoaded = 0;
- ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpsArray.Num()));
- BinaryWriter Writer;
- for (CbFieldView OpEntry : OpsArray)
{
- CbObjectView Op = OpEntry.AsObjectView();
- Op.CopyTo(Writer);
- CbObjectView TypedOp(Writer.GetData());
- const uint32_t OpLsn = Oplog.AppendNewOplogEntry(TypedOp);
- Writer.Reset();
- if (OpLsn == ProjectStore::Oplog::kInvalidOp)
+ CbObject SectionObject = LoadCompactBinaryObject(SectionPayload);
+ if (!SectionObject)
{
- ReportMessage(OptionalContext, fmt::format("Failed to save op {}", OpsLoaded));
+ ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type"));
return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest),
Timer.GetElapsedTimeMs() / 1000.500,
- "Failed saving op",
+ "Section has unexpected data type",
"Failed to save oplog container"};
}
- ZEN_DEBUG("oplog entry #{}", OpLsn);
- if (OpCount > 100000)
- {
- if (IsCancelled(OptionalContext))
+
+ CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
+ const uint64_t OpCount = OpsArray.Num();
+
+ ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount));
+
+ const size_t OpsBatchSize = 8192;
+ std::vector<uint8_t> OpsData;
+ std::vector<size_t> OpDataOffsets;
+ size_t OpsCompleteCount = 0;
+
+ OpsData.reserve(OpsBatchSize);
+
+ auto AppendBatch = [&]() {
+ std::vector<CbObjectView> Ops;
+ Ops.reserve(OpDataOffsets.size());
+ for (size_t OpDataOffset : OpDataOffsets)
{
- return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::OK),
- Timer.GetElapsedTimeMs() / 1000.500,
- "Operation cancelled",
- ""};
+ Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset]));
}
- if (OpsLoaded % 10000 == 0)
+ std::vector<uint32_t> OpLsns = Oplog.AppendNewOplogEntries(Ops);
+ OpsCompleteCount += OpLsns.size();
+ OpsData.clear();
+ OpDataOffsets.clear();
+ ReportProgress(OptionalContext,
+ fmt::format("Writing oplog, {} remaining...", OpCount - OpsCompleteCount),
+ OpCount,
+ OpCount - OpsCompleteCount);
+ };
+
+ BinaryWriter Writer;
+ for (CbFieldView OpEntry : OpsArray)
+ {
+ CbObjectView Op = OpEntry.AsObjectView();
+ Op.CopyTo(Writer);
+ OpDataOffsets.push_back(OpsData.size());
+ OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize());
+ Writer.Reset();
+
+ if (OpDataOffsets.size() == OpsBatchSize)
{
- ReportProgress(OptionalContext, "Writing oplog", OpCount, OpCount - OpsLoaded);
+ AppendBatch();
}
}
- OpsLoaded++;
+ if (!OpDataOffsets.empty())
+ {
+ AppendBatch();
+ }
}
- ReportProgress(OptionalContext, "Writing oplog", OpCount, 0);
return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500};
}
@@ -1975,7 +2037,7 @@ LoadOplog(CidStore& ChunkStore,
NiceBytes(Info.AttachmentBlockBytesDownloaded.load()),
Info.AttachmentsDownloaded.load(),
NiceBytes(Info.AttachmentBytesDownloaded.load()),
- NiceBytes(Info.AttachmentsStored.load()),
+ Info.AttachmentsStored.load(),
NiceBytes(Info.AttachmentBytesStored.load()),
Info.MissingAttachmentCount.load()));
return Result;
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index 7f0cb0ebb..c5ebca646 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -84,10 +84,15 @@ public:
struct RemoteStoreOptions
{
- size_t MaxBlockSize = 128u * 1024u * 1024u;
- size_t MaxChunkEmbedSize = 1024u * 1024u;
+ static const size_t DefaultMaxBlockSize = 32u * 1024u * 1024u;
+ static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u;
+
+ size_t MaxBlockSize = DefaultMaxBlockSize;
+ size_t MaxChunkEmbedSize = DefaultMaxChunkEmbedSize;
};
+typedef std::function<IoBuffer(const IoHash& AttachmentHash)> TGetAttachmentBufferFunc;
+
RemoteProjectStore::LoadContainerResult BuildContainer(
CidStore& ChunkStore,
ProjectStore::Project& Project,
@@ -97,10 +102,9 @@ RemoteProjectStore::LoadContainerResult BuildContainer(
bool BuildBlocks,
bool IgnoreMissingAttachments,
const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
- const std::function<void(const IoHash&)>& OnLargeAttachment,
+ const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
- tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>*
- OutOptionalTempAttachments); // Set OutOptionalTempAttachments to nullptr to avoid embedding loose "additional files"
+ bool EmbedLooseFiles);
class JobContext;