aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-03-14 16:50:18 +0100
committerGitHub Enterprise <[email protected]>2024-03-14 16:50:18 +0100
commit0a935231009cb21680d364ef125f0296a5a5bed6 (patch)
tree7e55a67ae60883b0eab71a0d636aeec23f307d14 /src
parentclean up test linking (#4) (diff)
downloadzen-0a935231009cb21680d364ef125f0296a5a5bed6.tar.xz
zen-0a935231009cb21680d364ef125f0296a5a5bed6.zip
special treatment large oplog attachments v2 (#5)
- Bugfix: Install Ctrl+C handler earlier when doing `zen oplog-export` and `zen oplog-export` to properly cancel jobs - Improvement: Add ability to block a set of CAS entries from GC in project store - Improvement: Large attachments and loose files are now split into smaller chunks and stored in blocks during oplog export
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp18
-rw-r--r--src/zencore/blake3.cpp7
-rw-r--r--src/zencore/compress.cpp30
-rw-r--r--src/zencore/include/zencore/compress.h10
-rw-r--r--src/zencore/iobuffer.cpp7
-rw-r--r--src/zencore/iohash.cpp7
-rw-r--r--src/zenserver/projectstore/projectstore.cpp255
-rw-r--r--src/zenserver/projectstore/projectstore.h19
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp1652
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h23
-rw-r--r--src/zenstore/chunkedfile.cpp505
-rw-r--r--src/zenstore/chunking.cpp2
-rw-r--r--src/zenstore/include/zenstore/chunkedfile.h54
-rw-r--r--src/zenutil/basicfile.cpp12
-rw-r--r--src/zenutil/include/zenutil/basicfile.h3
15 files changed, 1950 insertions, 654 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index 2132d428d..e1ee31aaf 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -57,11 +57,14 @@ namespace {
void ExecuteAsyncOperation(HttpClient& Http, std::string_view Url, IoBuffer&& Payload)
{
+ signal(SIGINT, SignalCallbackHandler);
+#if ZEN_PLATFORM_WINDOWS
+ signal(SIGBREAK, SignalCallbackHandler);
+#endif // ZEN_PLATFORM_WINDOWS
if (HttpClient::Response Result = Http.Post(Url, Payload))
{
if (Result.StatusCode == HttpResponseCode::Accepted)
{
- signal(SIGINT, SignalCallbackHandler);
bool Cancelled = false;
std::string_view JobIdText = Result.AsText();
@@ -136,10 +139,17 @@ namespace {
double QueueTimeS = StatusObject["QueueTimeS"].AsDouble();
ZEN_CONSOLE("Queued, waited {:.3} s...", QueueTimeS);
}
- uint32_t AbortCounter = SignalCounter[SIGINT].load();
- if (SignalCounter[SIGINT] > 0)
+ uint32_t InterruptCounter = SignalCounter[SIGINT].load();
+ uint32_t BreakCounter = 0;
+#if ZEN_PLATFORM_WINDOWS
+ BreakCounter = SignalCounter[SIGBREAK].load();
+#endif // ZEN_PLATFORM_WINDOWS
+ if (InterruptCounter > 0 || BreakCounter > 0)
{
- SignalCounter[SIGINT].fetch_sub(AbortCounter);
+ SignalCounter[SIGINT].fetch_sub(InterruptCounter);
+#if ZEN_PLATFORM_WINDOWS
+ SignalCounter[SIGBREAK].fetch_sub(BreakCounter);
+#endif // ZEN_PLATFORM_WINDOWS
if (HttpClient::Response DeleteResult = Http.Delete(fmt::format("/admin/jobs/{}", JobId)))
{
ZEN_CONSOLE("Requested cancel...");
diff --git a/src/zencore/blake3.cpp b/src/zencore/blake3.cpp
index bdbc8fb3e..e4edff227 100644
--- a/src/zencore/blake3.cpp
+++ b/src/zencore/blake3.cpp
@@ -45,14 +45,15 @@ BLAKE3::HashBuffer(const CompositeBuffer& Buffer)
for (const SharedBuffer& Segment : Buffer.GetSegments())
{
- size_t SegmentSize = Segment.GetSize();
- if (SegmentSize >= (65536 + 32768) && Segment.IsFileReference())
+ size_t SegmentSize = Segment.GetSize();
+ static const size_t BufferingSize = 512 * 1024;
+ if (SegmentSize >= (BufferingSize + BufferingSize / 2) && Segment.IsFileReference())
{
const IoBuffer SegmentBuffer = Segment.AsIoBuffer();
size_t Offset = 0;
while (Offset < SegmentSize)
{
- size_t ChunkSize = Min<size_t>(SegmentSize - Offset, 65536u);
+ size_t ChunkSize = Min<size_t>(SegmentSize - Offset, BufferingSize);
IoBuffer SubRange(SegmentBuffer, Offset, ChunkSize);
blake3_hasher_update(&Hasher, SubRange.GetData(), ChunkSize);
Offset += ChunkSize;
diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp
index c41bdac42..a8e8a79f4 100644
--- a/src/zencore/compress.cpp
+++ b/src/zencore/compress.cpp
@@ -193,15 +193,7 @@ class NoneEncoder final : public BaseEncoder
public:
[[nodiscard]] CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t /* BlockSize */) const final
{
- BufferHeader Header;
- Header.Method = CompressionMethod::None;
- Header.BlockCount = 1;
- Header.TotalRawSize = RawData.GetSize();
- Header.TotalCompressedSize = Header.TotalRawSize + sizeof(BufferHeader);
- Header.RawHash = BLAKE3::HashBuffer(RawData);
-
- UniqueBuffer HeaderData = UniqueBuffer::Alloc(sizeof(BufferHeader));
- Header.Write(HeaderData);
+ UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), BLAKE3::HashBuffer(RawData));
return CompositeBuffer(HeaderData.MoveToShared(), RawData.MakeOwned());
}
};
@@ -1301,6 +1293,26 @@ CompressedBuffer::ValidateCompressedHeader(const IoBuffer& CompressedData, IoHas
return detail::BufferHeader::IsValid(SharedBuffer(CompressedData), OutRawHash, OutRawSize);
}
+size_t
+CompressedBuffer::GetHeaderSizeForNoneEncoder()
+{
+ return sizeof(detail::BufferHeader);
+}
+
+UniqueBuffer
+CompressedBuffer::CreateHeaderForNoneEncoder(uint64_t RawSize, const BLAKE3& RawHash)
+{
+ detail::BufferHeader Header;
+ Header.Method = detail::CompressionMethod::None;
+ Header.BlockCount = 1;
+ Header.TotalRawSize = RawSize;
+ Header.TotalCompressedSize = Header.TotalRawSize + sizeof(detail::BufferHeader);
+ Header.RawHash = RawHash;
+ UniqueBuffer HeaderData = UniqueBuffer::Alloc(sizeof(detail::BufferHeader));
+ Header.Write(HeaderData);
+ return HeaderData;
+}
+
uint64_t
CompressedBuffer::DecodeRawSize() const
{
diff --git a/src/zencore/include/zencore/compress.h b/src/zencore/include/zencore/compress.h
index 44431f299..c51b5407f 100644
--- a/src/zencore/include/zencore/compress.h
+++ b/src/zencore/include/zencore/compress.h
@@ -94,10 +94,12 @@ public:
uint64_t& OutRawSize);
[[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressedNoValidate(IoBuffer&& CompressedData);
[[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressedNoValidate(CompositeBuffer&& CompressedData);
- [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(IoBuffer&& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize);
- [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(const IoBuffer& CompressedData,
- IoHash& OutRawHash,
- uint64_t& OutRawSize);
+ [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(IoBuffer&& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize);
+ [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(const IoBuffer& CompressedData,
+ IoHash& OutRawHash,
+ uint64_t& OutRawSize);
+ [[nodiscard]] ZENCORE_API static size_t GetHeaderSizeForNoneEncoder();
+ [[nodiscard]] ZENCORE_API static UniqueBuffer CreateHeaderForNoneEncoder(uint64_t RawSize, const BLAKE3& RawHash);
/** Reset this to null. */
inline void Reset() { CompressedData.Reset(); }
diff --git a/src/zencore/iobuffer.cpp b/src/zencore/iobuffer.cpp
index c8bc4a629..96a893082 100644
--- a/src/zencore/iobuffer.cpp
+++ b/src/zencore/iobuffer.cpp
@@ -704,8 +704,9 @@ IoBufferBuilder::MakeFromTemporaryFile(const std::filesystem::path& FileName)
IoHash
HashBuffer(IoBuffer& Buffer)
{
- size_t BufferSize = Buffer.Size();
- if (BufferSize >= (65536 + 32768))
+ size_t BufferSize = Buffer.Size();
+ static const size_t BufferingSize = 512 * 1024;
+ if (BufferSize >= (BufferingSize + BufferingSize / 2))
{
IoBufferFileReference _;
if (Buffer.GetFileReference(/* out */ _))
@@ -714,7 +715,7 @@ HashBuffer(IoBuffer& Buffer)
IoHashStream HashStream;
while (Offset < BufferSize)
{
- size_t ChunkSize = Min<size_t>(BufferSize - Offset, 65536u);
+ size_t ChunkSize = Min<size_t>(BufferSize - Offset, BufferingSize);
IoBuffer SubRange(Buffer, Offset, ChunkSize);
HashStream.Append(SubRange.GetData(), SubRange.GetSize());
Offset += ChunkSize;
diff --git a/src/zencore/iohash.cpp b/src/zencore/iohash.cpp
index cedee913a..a6bf25f6c 100644
--- a/src/zencore/iohash.cpp
+++ b/src/zencore/iohash.cpp
@@ -31,14 +31,15 @@ IoHash::HashBuffer(const CompositeBuffer& Buffer)
for (const SharedBuffer& Segment : Buffer.GetSegments())
{
- size_t SegmentSize = Segment.GetSize();
- if (SegmentSize >= (65536 + 32768) && Segment.IsFileReference())
+ size_t SegmentSize = Segment.GetSize();
+ static const size_t BufferingSize = 512 * 1024;
+ if (SegmentSize >= (BufferingSize + BufferingSize / 2) && Segment.IsFileReference())
{
const IoBuffer SegmentBuffer = Segment.AsIoBuffer();
size_t Offset = 0;
while (Offset < SegmentSize)
{
- size_t ChunkSize = Min<size_t>(SegmentSize - Offset, 65536u);
+ size_t ChunkSize = Min<size_t>(SegmentSize - Offset, BufferingSize);
IoBuffer SubRange(SegmentBuffer, Offset, ChunkSize);
Hasher.Append(SubRange.GetData(), ChunkSize);
Offset += ChunkSize;
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 47f8b7357..cfa53c080 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -986,6 +986,17 @@ ProjectStore::Oplog::IterateOplog(std::function<void(CbObjectView)>&& Handler)
m_Storage->ReplayLogEntries(Entries, [&](CbObjectView Op) { Handler(Op); });
}
+size_t
+ProjectStore::Oplog::GetOplogEntryCount() const
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return 0;
+ }
+ return m_LatestOpMap.size();
+}
+
void
ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbObjectView)>&& Handler)
{
@@ -1122,6 +1133,79 @@ ProjectStore::Oplog::AddChunkMappings(const std::unordered_map<Oid, IoHash, Oid:
}
void
+ProjectStore::Oplog::CaptureUpdatedLSN(RwLock::ExclusiveLockScope&, uint32_t LSN)
+{
+ if (m_UpdatedLSNs)
+ {
+ m_UpdatedLSNs->push_back(LSN);
+ }
+}
+
+void
+ProjectStore::Oplog::CaptureAddedAttachments(std::span<const IoHash> AttachmentHashes)
+{
+ m_OplogLock.WithExclusiveLock([this, AttachmentHashes]() {
+ if (m_NonGCAttachments)
+ {
+ m_NonGCAttachments->reserve(m_NonGCAttachments->size() + AttachmentHashes.size());
+ m_NonGCAttachments->insert(m_NonGCAttachments->end(), AttachmentHashes.begin(), AttachmentHashes.end());
+ }
+ });
+}
+
+void
+ProjectStore::Oplog::EnableUpdateCapture()
+{
+ m_OplogLock.WithExclusiveLock([&]() {
+ m_UpdatedLSNs = std::make_unique<std::vector<int>>();
+ m_NonGCAttachments = std::make_unique<std::vector<IoHash>>();
+ });
+}
+
+void
+ProjectStore::Oplog::DisableUpdateCapture()
+{
+ m_OplogLock.WithExclusiveLock([&]() {
+ m_UpdatedLSNs.reset();
+ m_NonGCAttachments.reset();
+ });
+}
+
+void
+ProjectStore::Oplog::IterateUpdatedLSNs(RwLock::SharedLockScope&, std::function<bool(const CbObjectView& UpdateOp)>&& Callback)
+{
+ if (m_UpdatedLSNs)
+ {
+ for (int UpdatedLSN : *m_UpdatedLSNs)
+ {
+ std::optional<CbObject> UpdatedOp = GetOpByIndex(UpdatedLSN);
+ if (UpdatedOp)
+ {
+ if (!Callback(UpdatedOp.value()))
+ {
+ break;
+ }
+ }
+ }
+ }
+}
+
+void
+ProjectStore::Oplog::IterateAddedAttachments(RwLock::SharedLockScope&, std::function<bool(const IoHash& RawHash)>&& Callback)
+{
+ if (m_NonGCAttachments)
+ {
+ for (const IoHash& ReferenceHash : *m_NonGCAttachments)
+ {
+ if (!Callback(ReferenceHash))
+ {
+ break;
+ }
+ }
+ }
+}
+
+void
ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&,
const Oid& FileId,
const IoHash& Hash,
@@ -1279,10 +1363,7 @@ ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize});
m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn;
- if (m_UpdatedLSNs)
- {
- m_UpdatedLSNs->push_back(OpEntry.OpLsn);
- }
+ CaptureUpdatedLSN(OplogLock, OpEntry.OpLsn);
return OpEntry.OpLsn;
}
@@ -2803,8 +2884,10 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie
Attachments.insert(RawHash);
};
+ auto OnChunkedAttachment = [](const ChunkedInfo&) {};
+
RemoteProjectStore::Result RemoteResult =
- SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, nullptr);
+ SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OnChunkedAttachment, nullptr);
if (RemoteResult.ErrorCode)
{
@@ -2865,6 +2948,15 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
}
}
+ size_t ChunkFileSizeLimit = RemoteStoreOptions::DefaultChunkFileSizeLimit;
+ if (auto Param = Params.GetValue("chunkfilesizelimit"); Param.empty() == false)
+ {
+ if (auto Value = ParseInt<size_t>(Param))
+ {
+ ChunkFileSizeLimit = Value.value();
+ }
+ }
+
CidStore& ChunkStore = m_CidStore;
RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer(
@@ -2873,11 +2965,12 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
*Oplog,
MaxBlockSize,
MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
/* BuildBlocks */ false,
- /* IgnoreMissingAttachemnts */ false,
- [](CompressedBuffer&&, const IoHash) {},
- [](const IoHash&, const TGetAttachmentBufferFunc&) {},
- [](const std::unordered_set<IoHash, IoHash::Hasher>) {},
+ /* IgnoreMissingAttachments */ false,
+ [](CompressedBuffer&&, const IoHash&) {},
+ [](const IoHash&, TGetAttachmentBufferFunc&&) {},
+ [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {},
/* EmbedLooseFiles*/ false);
OutResponse = std::move(ContainerResult.ContainerObject);
@@ -3239,6 +3332,7 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op
size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize);
size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
+ size_t ChunkFileSizeLimit = Params["chunkfilesizelimit"sv].AsUInt64(RemoteStoreOptions::DefaultChunkFileSizeLimit);
bool Force = Params["force"sv].AsBool(false);
bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false);
@@ -3267,6 +3361,7 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op
OplogPtr = &Oplog,
MaxBlockSize,
MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
EmbedLooseFile,
Force,
IgnoreMissingAttachments](JobContext& Context) {
@@ -3276,6 +3371,7 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op
*OplogPtr,
MaxBlockSize,
MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
EmbedLooseFile,
Force,
IgnoreMissingAttachments,
@@ -3573,7 +3669,7 @@ public:
virtual ~ProjectStoreReferenceChecker()
{
m_OplogLock.reset();
- m_Oplog.m_OplogLock.WithExclusiveLock([&]() { m_Oplog.m_UpdatedLSNs.reset(); });
+ m_Oplog.DisableUpdateCapture();
}
virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}'", m_Oplog.m_BasePath); }
@@ -3598,7 +3694,7 @@ public:
m_Oplog.OplogId());
});
- m_Oplog.m_OplogLock.WithExclusiveLock([&]() { m_Oplog.m_UpdatedLSNs = std::make_unique<std::vector<int>>(); });
+ m_Oplog.EnableUpdateCapture();
RwLock::SharedLockScope __(m_Oplog.m_OplogLock);
if (Ctx.IsCancelledFlag)
@@ -3608,7 +3704,6 @@ public:
m_Oplog.IterateOplog([&](CbObjectView Op) {
Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); });
});
- m_PreCachedLsn = m_Oplog.GetMaxOpIndex();
}
}
@@ -3632,18 +3727,10 @@ public:
m_OplogLock = std::make_unique<RwLock::SharedLockScope>(m_Oplog.m_OplogLock);
- if (m_Oplog.m_UpdatedLSNs)
- {
- for (int UpdatedLSN : *m_Oplog.m_UpdatedLSNs)
- {
- std::optional<CbObject> UpdatedOp = m_Oplog.GetOpByIndex(UpdatedLSN);
- if (UpdatedOp)
- {
- CbObjectView Op = UpdatedOp.value();
- Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); });
- }
- }
- }
+ m_Oplog.IterateUpdatedLSNs(*m_OplogLock, [&](const CbObjectView& UpdateOp) -> bool {
+ UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); });
+ return true;
+ });
}
virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override
@@ -3676,12 +3763,21 @@ public:
}
}
}
+ m_Oplog.IterateAddedAttachments(*m_OplogLock, [&](const IoHash& RawHash) -> bool {
+ if (IoCids.erase(RawHash) == 1)
+ {
+ if (IoCids.empty())
+ {
+ return false;
+ }
+ }
+ return true;
+ });
}
ProjectStore::Oplog& m_Oplog;
bool m_PreCache;
std::unique_ptr<RwLock::SharedLockScope> m_OplogLock;
std::vector<IoHash> m_References;
- int m_PreCachedLsn = -1;
};
std::vector<GcReferenceChecker*>
@@ -3792,13 +3888,16 @@ namespace testutils {
return Package;
};
- std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes)
+ std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(
+ const std::span<const size_t>& Sizes,
+ OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast)
{
std::vector<std::pair<Oid, CompressedBuffer>> Result;
Result.reserve(Sizes.size());
for (size_t Size : Sizes)
{
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(CreateRandomBlob(Size)));
+ CompressedBuffer Compressed =
+ CompressedBuffer::Compress(SharedBuffer(CreateRandomBlob(Size)), OodleCompressor::Mermaid, CompressionLevel);
Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
}
return Result;
@@ -3890,6 +3989,99 @@ TEST_CASE("project.store.lifetimes")
CHECK(Project->Identifier == "proj1"sv);
}
+struct ExportForceDisableBlocksTrue_ForceTempBlocksFalse
+{
+ static const bool ForceDisableBlocks = true;
+ static const bool ForceEnableTempBlocks = false;
+};
+
+struct ExportForceDisableBlocksFalse_ForceTempBlocksFalse
+{
+ static const bool ForceDisableBlocks = false;
+ static const bool ForceEnableTempBlocks = false;
+};
+
+struct ExportForceDisableBlocksFalse_ForceTempBlocksTrue
+{
+ static const bool ForceDisableBlocks = false;
+ static const bool ForceEnableTempBlocks = true;
+};
+
+TEST_CASE_TEMPLATE("project.store.export",
+ Settings,
+ ExportForceDisableBlocksTrue_ForceTempBlocksFalse,
+ ExportForceDisableBlocksFalse_ForceTempBlocksFalse,
+ ExportForceDisableBlocksFalse_ForceTempBlocksTrue)
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ ScopedTemporaryDirectory TempDir;
+ ScopedTemporaryDirectory ExportDir;
+
+ auto JobQueue = MakeJobQueue(1, ""sv);
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore";
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue);
+ std::filesystem::path RootDir = TempDir.Path() / "root";
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
+ std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
+ std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject";
+
+ Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ ProjectRootDir.string(),
+ ProjectFilePath.string()));
+ ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {});
+ CHECK(Oplog != nullptr);
+
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {}));
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77})));
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99})));
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122})));
+ Oplog->AppendNewOplogEntry(
+ CreateOplogPackage(Oid::NewOid(),
+ CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None)));
+
+ FileRemoteStoreOptions Options = {
+ RemoteStoreOptions{.MaxBlockSize = 64u * 1024, .MaxChunkEmbedSize = 32 * 1024u, .ChunkFileSizeLimit = 64u * 1024u},
+ /*.FolderPath = */ ExportDir.Path(),
+ /*.Name = */ std::string("oplog1"),
+ /*OptionalBaseName = */ std::string(),
+ /*.ForceDisableBlocks = */ Settings::ForceDisableBlocks,
+ /*.ForceEnableTempBlocks = */ Settings::ForceEnableTempBlocks};
+ std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options);
+ RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+
+ RemoteProjectStore::Result ExportResult = SaveOplog(CidStore,
+ *RemoteStore,
+ *Project.Get(),
+ *Oplog,
+ Options.MaxBlockSize,
+ Options.MaxChunkEmbedSize,
+ Options.ChunkFileSizeLimit,
+ true,
+ false,
+ false,
+ nullptr);
+
+ CHECK(ExportResult.ErrorCode == 0);
+
+ ProjectStore::Oplog* OplogImport = Project->NewOplog("oplog2", {});
+ CHECK(OplogImport != nullptr);
+ RemoteProjectStore::Result ImportResult = LoadOplog(CidStore, *RemoteStore, *OplogImport, false, false, nullptr);
+ CHECK(ImportResult.ErrorCode == 0);
+
+ RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore, *RemoteStore, *OplogImport, true, false, nullptr);
+ CHECK(ImportForceResult.ErrorCode == 0);
+}
+
TEST_CASE("project.store.gc")
{
using namespace std::literals;
@@ -4284,12 +4476,15 @@ TEST_CASE("project.store.block")
7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251,
491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225});
- std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes);
- std::vector<SharedBuffer> Chunks;
+ std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes);
+ std::vector<std::pair<IoHash, FetchChunkFunc>> Chunks;
Chunks.reserve(AttachmentSizes.size());
for (const auto& It : AttachmentsWithId)
{
- Chunks.push_back(It.second.GetCompressed().Flatten());
+ Chunks.push_back(std::make_pair(It.second.DecodeRawHash(),
+ [Buffer = It.second.GetCompressed().Flatten().AsIoBuffer()](const IoHash&) -> CompositeBuffer {
+ return CompositeBuffer(SharedBuffer(Buffer));
+ }));
}
CompressedBuffer Block = GenerateBlock(std::move(Chunks));
IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer();
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 897231a2e..d8c053649 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -91,9 +91,11 @@ public:
std::vector<ChunkInfo> GetAllChunksInfo();
void IterateChunkMap(std::function<void(const Oid&, const IoHash& Hash)>&& Fn);
- void IterateFileMap(std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn);
- void IterateOplog(std::function<void(CbObjectView)>&& Fn);
- void IterateOplogWithKey(std::function<void(int, const Oid&, CbObjectView)>&& Fn);
+ void IterateFileMap(std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn);
+ void IterateOplog(std::function<void(CbObjectView)>&& Fn);
+ void IterateOplogWithKey(std::function<void(int, const Oid&, CbObjectView)>&& Fn);
+ size_t GetOplogEntryCount() const;
+
std::optional<CbObject> GetOpByKey(const Oid& Key);
std::optional<CbObject> GetOpByIndex(int Index);
int GetOpIndexByKey(const Oid& Key);
@@ -140,6 +142,14 @@ public:
void AddChunkMappings(const std::unordered_map<Oid, IoHash, Oid::Hasher>& ChunkMappings);
+ void CaptureUpdatedLSN(RwLock::ExclusiveLockScope& OplogLock, uint32_t LSN);
+ void CaptureAddedAttachments(std::span<const IoHash> AttachmentHashes);
+
+ void EnableUpdateCapture();
+ void DisableUpdateCapture();
+ void IterateUpdatedLSNs(RwLock::SharedLockScope& OplogLock, std::function<bool(const CbObjectView& UpdateOp)>&& Callback);
+ void IterateAddedAttachments(RwLock::SharedLockScope& OplogLock, std::function<bool(const IoHash& RawHash)>&& Callback);
+
private:
struct FileMapEntry
{
@@ -164,7 +174,8 @@ public:
tsl::robin_map<int, OplogEntryAddress> m_OpAddressMap; // Index LSN -> op data in ops blob file
OidMap<int> m_LatestOpMap; // op key -> latest op LSN for key
- std::unique_ptr<std::vector<int>> m_UpdatedLSNs;
+ std::unique_ptr<std::vector<int>> m_UpdatedLSNs;
+ std::unique_ptr<std::vector<IoHash>> m_NonGCAttachments;
RefPtr<OplogStorage> m_Storage;
std::string m_OplogId;
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 672292290..ce3411114 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -12,6 +12,7 @@
#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zencore/workthreadpool.h>
+#include <zenstore/chunkedfile.h>
#include <zenstore/cidstore.h>
#include <zenutil/workerpools.h>
@@ -38,6 +39,14 @@ namespace zen {
CbArray("chunks") // Optional, only if we are not creating blocks (Zen)
CbFieldType::BinaryAttachment // Chunk attachment hashes
+ CbArray("chunkedfiles");
+ CbFieldType::Hash "rawhash"
+ CbFieldType::Integer "rawsize"
+ CbArray("chunks");
+ CbFieldType::Hash "chunkhash"
+ CbArray("sequence");
+ CbFieldType::Integer chunks index
+
CompressedBinary ChunkBlock
{
VarUInt ChunkCount
@@ -143,30 +152,36 @@ IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& C
};
CompressedBuffer
-GenerateBlock(std::vector<SharedBuffer>&& Chunks)
+GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks)
{
- size_t ChunkCount = Chunks.size();
- SharedBuffer SizeBuffer;
+ std::vector<SharedBuffer> ChunkSegments;
+ ChunkSegments.resize(1);
+ ChunkSegments.reserve(1 + FetchChunks.size());
+ size_t ChunkCount = FetchChunks.size();
{
IoBuffer TempBuffer(ChunkCount * 9);
MutableMemoryView View = TempBuffer.GetMutableView();
uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData());
uint8_t* BufferEndPtr = BufferStartPtr;
BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr);
- auto It = Chunks.begin();
- while (It != Chunks.end())
+ for (const auto& It : FetchChunks)
{
- BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(It->GetSize()), BufferEndPtr);
- It++;
+ CompositeBuffer Chunk = It.second(It.first);
+ uint64_t ChunkSize = 0;
+ std::span<const SharedBuffer> Segments = Chunk.GetSegments();
+ for (const SharedBuffer& Segment : Segments)
+ {
+ ChunkSize += Segment.GetSize();
+ ChunkSegments.push_back(Segment);
+ }
+ BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr);
}
ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd());
ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr);
- SizeBuffer = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength)));
+ ChunkSegments[0] = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength)));
}
CompressedBuffer CompressedBlock =
- CompressedBuffer::Compress(CompositeBuffer(std::move(SizeBuffer), CompositeBuffer(std::move(Chunks))),
- OodleCompressor::Mermaid,
- OodleCompressionLevel::None);
+ CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None);
return CompressedBlock;
}
@@ -180,7 +195,7 @@ struct Block
void
CreateBlock(WorkerThreadPool& WorkerPool,
Latch& OpSectionsLatch,
- std::vector<SharedBuffer>&& ChunksInBlock,
+ std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock,
RwLock& SectionsLock,
std::vector<Block>& Blocks,
size_t BlockIndex,
@@ -251,12 +266,16 @@ WriteToTempFile(CompressedBuffer&& CompressedBuffer, std::filesystem::path Path)
RetriesLeft--;
return true;
});
-
uint64_t Offset = 0;
- for (const SharedBuffer& Buffer : CompressedBuffer.GetCompressed().GetSegments())
{
- BlockFile.Write(Buffer.GetView(), Offset);
- Offset += Buffer.GetSize();
+ CompositeBuffer Compressed = std::move(CompressedBuffer).GetCompressed();
+ BasicFileWriter BlockWriter(BlockFile, 64u * 1024u);
+ for (const SharedBuffer& Segment : Compressed.GetSegments())
+ {
+ size_t SegmentSize = Segment.GetSize();
+ BlockWriter.Write(Segment.GetData(), SegmentSize, Offset);
+ Offset += SegmentSize;
+ }
}
void* FileHandle = BlockFile.Detach();
BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true);
@@ -270,13 +289,14 @@ BuildContainer(CidStore& ChunkStore,
ProjectStore::Oplog& Oplog,
size_t MaxBlockSize,
size_t MaxChunkEmbedSize,
+ size_t ChunkFileSizeLimit,
bool BuildBlocks,
bool IgnoreMissingAttachments,
const std::vector<Block>& KnownBlocks,
WorkerThreadPool& WorkerPool,
const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
- const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
+ const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles,
JobContext* OptionalContext,
AsyncRemoteResult& RemoteResult)
@@ -287,22 +307,24 @@ BuildContainer(CidStore& ChunkStore,
CbObject OplogContainerObject;
{
- std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
- std::unordered_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseAttachments;
+ struct FoundAttachment
+ {
+ std::filesystem::path RawPath; // If not stored in cid
+ uint64_t Size = 0;
+ Oid Key = Oid::Zero;
+ };
+
+ std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments;
RwLock BlocksLock;
std::vector<Block> Blocks;
CompressedBuffer OpsBuffer;
- std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
-
- std::unordered_map<IoHash, int, IoHash::Hasher> Attachments;
-
std::filesystem::path AttachmentTempPath = Oplog.TempPath();
AttachmentTempPath.append(".pending");
CreateDirectories(AttachmentTempPath);
- auto RewriteOp = [&](int LSN, CbObjectView Op, const std::function<void(CbObjectView)>& CB) {
+ auto RewriteOp = [&](const Oid& Key, CbObjectView Op, const std::function<void(CbObjectView)>& CB) {
bool OpRewritten = false;
CbArrayView Files = Op["files"sv].AsArrayView();
if (Files.Num() == 0)
@@ -316,6 +338,15 @@ BuildContainer(CidStore& ChunkStore,
for (CbFieldView& Field : Files)
{
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ CB(Op);
+ return;
+ }
+
bool CopyField = true;
if (CbObjectView View = Field.AsObjectView())
@@ -344,73 +375,35 @@ BuildContainer(CidStore& ChunkStore,
throw std::runtime_error(fmt::format("failed to open file '{}'", FilePath));
}
}
- SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath));
- // Loose file, just hash it for now and leave compression for later via callback
-
- const uint64_t RawSize = DataBuffer.GetSize();
- if (RawSize > MaxChunkEmbedSize)
- {
- IoHashStream Hasher;
- CompositeBuffer RawData(DataBuffer);
- UniqueBuffer RawBlockCopy;
- CompositeBuffer::Iterator It = RawData.GetIterator(0);
- const uint64_t BlockSize = MaxChunkEmbedSize;
- for (uint64_t RawOffset = 0; RawOffset < RawSize;)
- {
- const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize);
- const MemoryView RawBlock = RawData.ViewOrCopyRange(It, RawBlockSize, RawBlockCopy);
- Hasher.Append(RawBlock);
- RawOffset += RawBlockSize;
- }
- DataHash = Hasher.GetHash();
- LargeChunkHashes.insert(DataHash);
- }
- else
{
- DataHash = IoHash::HashBuffer(DataBuffer);
+ Stopwatch HashTimer;
+ SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath));
+ DataHash = IoHash::HashBuffer(CompositeBuffer(DataBuffer));
+ ZEN_INFO("Hashed loose file '{}' {}: {} in {}",
+ FilePath,
+ NiceBytes(DataBuffer.GetSize()),
+ DataHash,
+ NiceTimeSpanMs(HashTimer.GetElapsedTimeMs()));
}
- LooseAttachments.insert_or_assign(
- DataHash,
- [AttachmentBuffer = std::move(DataBuffer), &Oplog, AttachmentTempPath](const IoHash& DataHash) -> IoBuffer {
- Stopwatch AttachmentTimer;
- uint64_t RawSize = AttachmentBuffer.GetSize();
- CompressedBuffer Compressed =
- CompressedBuffer::Compress(AttachmentBuffer, OodleCompressor::Mermaid, OodleCompressionLevel::Normal);
- ZEN_ASSERT(Compressed.DecodeRawHash() == DataHash);
- uint64_t PayloadSize = Compressed.GetCompressedSize();
- ZEN_INFO("Compressed loose file attachment {} ({} -> {}) in {}",
- DataHash,
- NiceBytes(RawSize),
- NiceBytes(PayloadSize),
- NiceTimeSpanMs(static_cast<uint64_t>(AttachmentTimer.GetElapsedTimeMs())));
- std::filesystem::path AttachmentPath = AttachmentTempPath;
- AttachmentPath.append(DataHash.ToHexString());
-
- IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed), AttachmentPath);
- ZEN_INFO("Saved temp attachment to '{}', {}", AttachmentPath, NiceBytes(TempAttachmentBuffer.GetSize()));
- return TempAttachmentBuffer;
- });
+ // Rewrite file array entry with new data reference
+ CbObjectWriter Writer;
+ RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
+ if (Field.GetName() == "data"sv)
+ {
+ // omit this field as we will write it explicitly ourselves
+ return true;
+ }
+ return false;
+ });
+ Writer.AddBinaryAttachment("data"sv, DataHash);
+ UploadAttachments.insert_or_assign(DataHash, FoundAttachment{.RawPath = FilePath, .Key = Key});
+
+ CbObject RewrittenOp = Writer.Save();
+ Cbo.AddObject(std::move(RewrittenOp));
+ CopyField = false;
}
-
- // Rewrite file array entry with new data reference
- CbObjectWriter Writer;
- RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
- if (Field.GetName() == "data"sv)
- {
- // omit this field as we will write it explicitly ourselves
- return true;
- }
- return false;
- });
- Writer.AddBinaryAttachment("data"sv, DataHash);
-
- CbObject RewrittenOp = Writer.Save();
- Cbo.AddObject(std::move(RewrittenOp));
- CopyField = false;
-
- Attachments.insert_or_assign(DataHash, LSN);
}
if (CopyField)
@@ -449,24 +442,24 @@ BuildContainer(CidStore& ChunkStore,
Stopwatch Timer;
- tsl::robin_map<int, std::string> OpLSNToKey;
- CompressedBuffer CompressedOpsSection;
+ size_t TotalOpCount = Oplog.GetOplogEntryCount();
+ CompressedBuffer CompressedOpsSection;
{
+ Stopwatch RewriteOplogTimer;
CbObjectWriter SectionOpsWriter;
SectionOpsWriter.BeginArray("ops"sv);
{
- Stopwatch RewriteOplogTimer;
- Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObjectView Op) {
+ Oplog.IterateOplogWithKey([&](int, const Oid& Key, CbObjectView Op) {
if (RemoteResult.IsError())
{
return;
}
- std::string_view Key = Op["key"sv].AsString();
- OpLSNToKey.insert({LSN, std::string(Key)});
- Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); });
+ Op.IterateAttachments([&](CbFieldView FieldView) {
+ UploadAttachments.insert_or_assign(FieldView.AsAttachment(), FoundAttachment{.Key = Key});
+ });
if (EmbedLooseFiles)
{
- RewriteOp(LSN, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; });
+ RewriteOp(Key, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; });
}
else
{
@@ -476,30 +469,42 @@ BuildContainer(CidStore& ChunkStore,
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return;
}
- if (OpCount % 100000 == 0)
+ if (OpCount % 1000 == 0)
{
- ReportMessage(OptionalContext, fmt::format("Building oplog, at op {}...", OpCount));
+ ReportProgress(OptionalContext,
+ fmt::format("Building oplog: {} ops processed", OpCount),
+ TotalOpCount,
+ TotalOpCount - OpCount);
}
});
+ if (RemoteResult.IsError())
+ {
+ return {};
+ }
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return {};
}
- ReportMessage(OptionalContext,
- fmt::format("Rewrote {} ops to new oplog in {}",
- OpCount,
- NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs()))));
+ ReportProgress(OptionalContext, fmt::format("Building oplog: {} ops processed", OpCount), TotalOpCount, 0);
}
SectionOpsWriter.EndArray(); // "ops"
+ ReportMessage(OptionalContext,
+ fmt::format("Rewrote {} ops to new oplog in {}",
+ OpCount,
+ NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs()))));
+
{
Stopwatch CompressOpsTimer;
- CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer(),
- OodleCompressor::Mermaid,
- OodleCompressionLevel::Normal);
+ CompressedOpsSection =
+ CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer(), OodleCompressor::Mermaid, OodleCompressionLevel::Fast);
ReportMessage(OptionalContext,
fmt::format("Compressed oplog section {} ({} -> {}) in {}",
CompressedOpsSection.DecodeRawHash(),
@@ -512,357 +517,706 @@ BuildContainer(CidStore& ChunkStore,
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return {};
}
- if (!Attachments.empty() && !KnownBlocks.empty())
- {
- ReportMessage(OptionalContext, fmt::format("Checking {} known blocks for reuse", KnownBlocks.size()));
- Stopwatch ReuseTimer;
-
- size_t SkippedAttachmentCount = 0;
- for (const Block& KnownBlock : KnownBlocks)
+ auto FindReuseBlocks = [](const std::vector<Block>& KnownBlocks,
+ const std::unordered_set<IoHash, IoHash::Hasher>& Attachments,
+ JobContext* OptionalContext) -> std::vector<size_t> {
+ std::vector<size_t> ReuseBlockIndexes;
+ if (!Attachments.empty() && !KnownBlocks.empty())
{
- size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size();
- if (BlockAttachmentCount == 0)
- {
- continue;
- }
- size_t FoundAttachmentCount = 0;
- for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
+ ReportMessage(
+ OptionalContext,
+ fmt::format("Checking {} Attachments against {} known blocks for reuse", Attachments.size(), KnownBlocks.size()));
+ Stopwatch ReuseTimer;
+
+ for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++)
{
- if (Attachments.contains(KnownHash))
+ const Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size();
+ if (BlockAttachmentCount == 0)
{
- FoundAttachmentCount++;
+ continue;
}
- }
-
- size_t ReusePercent = (FoundAttachmentCount * 100) / BlockAttachmentCount;
- // TODO: Configure reuse-level
- if (ReusePercent > 80)
- {
- ZEN_DEBUG("Reusing block {}. {} attachments found, usage level: {}%",
- KnownBlock.BlockHash,
- FoundAttachmentCount,
- ReusePercent);
+ size_t FoundAttachmentCount = 0;
for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
{
- Attachments.erase(KnownHash);
- SkippedAttachmentCount++;
+ if (Attachments.contains(KnownHash))
+ {
+ FoundAttachmentCount++;
+ }
}
- Blocks.push_back(KnownBlock);
- }
- else if (FoundAttachmentCount > 0)
- {
- ZEN_DEBUG("Skipping block {}. {} attachments found, usage level: {}%",
- KnownBlock.BlockHash,
- FoundAttachmentCount,
- ReusePercent);
+ size_t ReusePercent = (FoundAttachmentCount * 100) / BlockAttachmentCount;
+ // TODO: Configure reuse-level
+ if (ReusePercent > 80)
+ {
+ ZEN_DEBUG("Reusing block {}. {} attachments found, usage level: {}%",
+ KnownBlock.BlockHash,
+ FoundAttachmentCount,
+ ReusePercent);
+ ReuseBlockIndexes.push_back(KnownBlockIndex);
+ }
+ else if (FoundAttachmentCount > 0)
+ {
+ ZEN_DEBUG("Skipping block {}. {} attachments found, usage level: {}%",
+ KnownBlock.BlockHash,
+ FoundAttachmentCount,
+ ReusePercent);
+ }
}
}
- ReportMessage(OptionalContext,
- fmt::format("Reusing {} out of {} known blocks, skipping upload of {} attachments, completed in {}",
- Blocks.size(),
- KnownBlocks.size(),
- SkippedAttachmentCount,
- NiceTimeSpanMs(static_cast<uint64_t>(ReuseTimer.GetElapsedTimeMs()))));
- }
+ return ReuseBlockIndexes;
+ };
- if (IsCancelled(OptionalContext))
+ std::unordered_set<IoHash, IoHash::Hasher> FoundHashes;
+ FoundHashes.reserve(UploadAttachments.size());
+ for (const auto& It : UploadAttachments)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- return {};
+ FoundHashes.insert(It.first);
}
- ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size()));
-
- // Sort attachments so we get predictable blocks for the same oplog upload
- std::vector<IoHash> SortedAttachments;
+ size_t ReusedAttachmentCount = 0;
+ std::vector<size_t> ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext);
+ for (size_t KnownBlockIndex : ReusedBlockIndexes)
{
- SortedAttachments.reserve(Attachments.size());
- for (const auto& It : Attachments)
+ const Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
{
- SortedAttachments.push_back(It.first);
+ if (UploadAttachments.erase(KnownHash) == 1)
+ {
+ ReusedAttachmentCount++;
+ }
}
- std::sort(SortedAttachments.begin(),
- SortedAttachments.end(),
- [&Attachments, &OpLSNToKey](const IoHash& Lhs, const IoHash& Rhs) {
- auto LhsLNSIt = Attachments.find(Lhs);
- ZEN_ASSERT_SLOW(LhsLNSIt != Attachments.end());
- auto RhsLNSIt = Attachments.find(Rhs);
- ZEN_ASSERT_SLOW(RhsLNSIt != Attachments.end());
- if (LhsLNSIt->second == RhsLNSIt->second)
- {
- return Lhs < Rhs;
- }
- auto LhsKeyIt = OpLSNToKey.find(LhsLNSIt->second);
- ZEN_ASSERT_SLOW(LhsKeyIt != OpLSNToKey.end());
- auto RhsKeyIt = OpLSNToKey.find(RhsLNSIt->second);
- ZEN_ASSERT_SLOW(RhsKeyIt != OpLSNToKey.end());
- return LhsKeyIt->second < RhsKeyIt->second;
- });
}
- if (IsCancelled(OptionalContext))
+ struct ChunkedFile
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- return {};
- }
- ReportMessage(OptionalContext,
- fmt::format("Assembling {} attachments from {} ops into blocks and loose attachments",
- SortedAttachments.size(),
- OpLSNToKey.size()));
+ IoBuffer Source;
- for (const IoHash& AttachmentHash : LargeChunkHashes)
- {
- if (IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- return {};
- }
+ ChunkedInfoWithSource Chunked;
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkLoookup;
+ };
+ std::vector<ChunkedFile> ChunkedFiles;
- if (auto It = LooseAttachments.find(AttachmentHash); It != LooseAttachments.end())
- {
- OnLargeAttachment(AttachmentHash, std::move(It->second));
- LooseAttachments.erase(It);
- }
- else
- {
- OnLargeAttachment(AttachmentHash,
- [&ChunkStore](const IoHash& AttachmentHash) { return ChunkStore.FindChunkByCid(AttachmentHash); });
- }
- }
- size_t LargeAttachmentCount = LargeChunkHashes.size();
-
- Latch BlockCreateLatch(1);
- size_t GeneratedBlockCount = 0;
- size_t BlockSize = 0;
- std::vector<SharedBuffer> ChunksInBlock;
- int LastLSNOp = -1;
- auto GetPayload = [&](const IoHash& AttachmentHash) {
- if (auto It = LooseAttachments.find(AttachmentHash); It != LooseAttachments.end())
- {
- IoBuffer Payload = It->second(AttachmentHash);
- LooseAttachments.erase(It);
- return Payload;
- }
- return ChunkStore.FindChunkByCid(AttachmentHash);
+ auto ChunkFile = [AttachmentTempPath](const IoHash& RawHash,
+ IoBuffer& RawData,
+ const IoBufferFileReference& FileRef,
+ JobContext*) -> ChunkedFile {
+ ChunkedFile Chunked;
+ Stopwatch Timer;
+
+ uint64_t Offset = FileRef.FileChunkOffset;
+ uint64_t Size = FileRef.FileChunkSize;
+
+ BasicFile SourceFile;
+ SourceFile.Attach(FileRef.FileHandle);
+ auto __ = MakeGuard([&SourceFile]() { SourceFile.Detach(); });
+
+ Chunked.Chunked = ChunkData(SourceFile, Offset, Size, UShaderByteCodeParams);
+ Chunked.Source = RawData;
+
+ ZEN_INFO("Chunked large attachment '{}' {} into {} chunks in {}",
+ RawHash,
+ NiceBytes(Chunked.Chunked.Info.RawSize),
+ Chunked.Chunked.Info.ChunkHashes.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+
+ return Chunked;
};
- uint32_t ResolvedLargeCount = 0;
- uint32_t ResolvedSmallCount = 0;
- uint32_t ResolvedFailedCount = 0;
- uint32_t ComposedBlocks = 0;
+ RwLock ResolveLock;
+ std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes;
+ std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
+ std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments;
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> LooseUploadAttachments;
+ std::unordered_set<IoHash, IoHash::Hasher> MissingHashes;
- uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs();
+ ReportMessage(OptionalContext, fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount));
- for (auto HashIt = SortedAttachments.begin(); HashIt != SortedAttachments.end(); HashIt++)
+ Latch ResolveAttachmentsLatch(1);
+ for (auto& It : UploadAttachments)
{
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- break;
- }
- if ((ResolvedLargeCount + ResolvedSmallCount) % 1000 == 0)
- {
- ReportProgress(OptionalContext,
- fmt::format("Resolving attachments: {} large, {} small, {} blocks assembled",
- ResolvedLargeCount,
- ResolvedSmallCount,
- ComposedBlocks),
- SortedAttachments.size(),
- SortedAttachments.size() - (ResolvedLargeCount + ResolvedSmallCount + ResolvedFailedCount));
- }
- const IoHash& AttachmentHash(*HashIt);
- if (LargeChunkHashes.contains(AttachmentHash))
- {
- ResolvedLargeCount++;
- continue;
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
}
- IoBuffer Payload = GetPayload(AttachmentHash);
- if (!Payload)
- {
- auto It = Attachments.find(AttachmentHash);
- ZEN_ASSERT(It != Attachments.end());
- std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second);
- ZEN_ASSERT(Op.has_value());
- ExtendableStringBuilder<1024> Sb;
- Sb.Append("Failed to find attachment '");
- Sb.Append(AttachmentHash.ToHexString());
- Sb.Append("' for op: \n");
- Op.value().ToJson(Sb);
- ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView()));
-
- if (IgnoreMissingAttachments)
- {
- ResolvedFailedCount++;
- continue;
- }
- else
+ ResolveAttachmentsLatch.AddCount(1);
+
+ WorkerPool.ScheduleWork([&ChunkStore,
+ UploadAttachment = &It.second,
+ RawHash = It.first,
+ &ResolveAttachmentsLatch,
+ &ResolveLock,
+ &ChunkedHashes,
+ &LargeChunkHashes,
+ &ChunkedUploadAttachments,
+ &LooseUploadAttachments,
+ &MissingHashes,
+ &OnLargeAttachment,
+ &AttachmentTempPath,
+ &ChunkFile,
+ &ChunkedFiles,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ &RemoteResult,
+ OptionalContext]() {
+ auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); });
+ try
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
- BlockCreateLatch.CountDown();
- while (!BlockCreateLatch.Wait(1000))
+ if (IsCancelled(OptionalContext))
{
- ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
+ return;
+ }
+
+ if (!UploadAttachment->RawPath.empty())
+ {
+ const std::filesystem::path& FilePath = UploadAttachment->RawPath;
+ IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath);
+ if (RawData)
+ {
+ if (RawData.GetSize() > ChunkFileSizeLimit)
+ {
+ IoBufferFileReference FileRef;
+ (void)RawData.GetFileReference(FileRef);
+
+ ChunkedFile Chunked = ChunkFile(RawHash, RawData, FileRef, OptionalContext);
+ ResolveLock.WithExclusiveLock(
+ [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() {
+ ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size());
+ ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size());
+ for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
+ {
+ ChunkedHashes.insert(ChunkHash);
+ }
+ ChunkedFiles.emplace_back(std::move(Chunked));
+ });
+ }
+ else if (RawData.GetSize() > (MaxChunkEmbedSize * 2))
+ {
+ // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't
+ // it will be a loose attachment instead of going into a block
+ OnLargeAttachment(RawHash, [RawData = std::move(RawData), AttachmentTempPath](const IoHash& RawHash) {
+ size_t RawSize = RawData.GetSize();
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::VeryFast);
+
+ std::filesystem::path AttachmentPath = AttachmentTempPath;
+ AttachmentPath.append(RawHash.ToHexString());
+
+ IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed), AttachmentPath);
+ ZEN_INFO("Saved temp attachment to '{}', {} ({})",
+ AttachmentPath,
+ NiceBytes(RawSize),
+ NiceBytes(TempAttachmentBuffer.GetSize()));
+ return TempAttachmentBuffer;
+ });
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ }
+ else
+ {
+ size_t RawSize = RawData.GetSize();
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::VeryFast);
+
+ std::filesystem::path AttachmentPath = AttachmentTempPath;
+ AttachmentPath.append(RawHash.ToHexString());
+
+ IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed), AttachmentPath);
+ ZEN_INFO("Saved temp attachment to '{}', {} ({})",
+ AttachmentPath,
+ NiceBytes(RawSize),
+ NiceBytes(TempAttachmentBuffer.GetSize()));
+
+ if (Compressed.GetCompressedSize() > MaxChunkEmbedSize)
+ {
+ OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; });
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ }
+ else
+ {
+ UploadAttachment->Size = Compressed.GetCompressedSize();
+ ResolveLock.WithExclusiveLock(
+ [RawHash, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() {
+ LooseUploadAttachments.insert_or_assign(RawHash, std::move(Data));
+ });
+ }
+ }
+ }
+ else
+ {
+ ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
+ }
+ }
+ else
+ {
+ IoBuffer Data = ChunkStore.FindChunkByCid(RawHash);
+ if (Data)
+ {
+ auto GetForChunking =
+ [](size_t ChunkFileSizeLimit, const IoBuffer& Data, IoBufferFileReference& OutFileRef) -> bool {
+ if (Data.IsWholeFile())
+ {
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize);
+ if (Compressed)
+ {
+ if (VerifyRawSize > ChunkFileSizeLimit)
+ {
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize;
+ if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ {
+ if (CompressionLevel == OodleCompressionLevel::None)
+ {
+ CompositeBuffer Decompressed = Compressed.DecompressToComposite();
+ if (Decompressed)
+ {
+ std::span<const SharedBuffer> Segments = Decompressed.GetSegments();
+ if (Segments.size() == 1)
+ {
+ IoBuffer DecompressedData = Segments[0].AsIoBuffer();
+ if (DecompressedData.GetFileReference(OutFileRef))
+ {
+ return true;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ return false;
+ };
+
+ IoBufferFileReference FileRef;
+ if (GetForChunking(ChunkFileSizeLimit, Data, FileRef))
+ {
+ ChunkedFile Chunked = ChunkFile(RawHash, Data, FileRef, OptionalContext);
+ ResolveLock.WithExclusiveLock(
+ [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() {
+ ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size());
+ ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size());
+ for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
+ {
+ ChunkedHashes.insert(ChunkHash);
+ }
+ ChunkedFiles.emplace_back(std::move(Chunked));
+ });
+ }
+ else if (Data.GetSize() > MaxChunkEmbedSize)
+ {
+ OnLargeAttachment(RawHash,
+ [&ChunkStore](const IoHash& RawHash) { return ChunkStore.FindChunkByCid(RawHash); });
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ }
+ else
+ {
+ UploadAttachment->Size = Data.GetSize();
+ }
+ }
+ else
+ {
+ ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
+ }
}
- return {};
}
- }
+ catch (std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to resolve attachment {}", RawHash),
+ Ex.what());
+ }
+ });
+ }
+ ResolveAttachmentsLatch.CountDown();
- uint64_t PayloadSize = Payload.GetSize();
- if (PayloadSize > MaxChunkEmbedSize)
+ while (!ResolveAttachmentsLatch.Wait(1000))
+ {
+ ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining();
+ if (IsCancelled(OptionalContext))
{
- if (LargeChunkHashes.insert(AttachmentHash).second)
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ while (!ResolveAttachmentsLatch.Wait(1000))
{
- OnLargeAttachment(AttachmentHash, [Payload = std::move(Payload)](const IoHash&) { return std::move(Payload); });
- LargeAttachmentCount++;
+ Remaining = ResolveAttachmentsLatch.Remaining();
+ ReportProgress(OptionalContext,
+ fmt::format("Aborting, {} attachments remaining...", Remaining),
+ UploadAttachments.size(),
+ Remaining);
}
- ResolvedLargeCount++;
- continue;
+ ReportProgress(OptionalContext, fmt::format("Resolving attachments, {} remaining...", 0), UploadAttachments.size(), 0);
+ return {};
}
- else
+ ReportProgress(OptionalContext,
+ fmt::format("Resolving attachments, {} remaining...", Remaining),
+ UploadAttachments.size(),
+ Remaining);
+ }
+ if (UploadAttachments.size() > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Resolving attachments, {} remaining...", 0), UploadAttachments.size(), 0);
+ }
+
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
+
+ for (const IoHash& AttachmentHash : MissingHashes)
+ {
+ auto It = UploadAttachments.find(AttachmentHash);
+ ZEN_ASSERT(It != UploadAttachments.end());
+ std::optional<CbObject> Op = Oplog.GetOpByKey(It->second.Key);
+ ZEN_ASSERT(Op.has_value());
+ ExtendableStringBuilder<1024> Sb;
+ Sb.Append("Failed to find attachment '");
+ Sb.Append(AttachmentHash.ToHexString());
+ Sb.Append("' for op: \n");
+ Op.value().ToJson(Sb);
+
+ if (IgnoreMissingAttachments)
{
- ResolvedSmallCount++;
+ ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView()));
}
-
- if (!BlockAttachmentHashes.insert(AttachmentHash).second)
+ else
{
- continue;
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
+ return {};
}
+ UploadAttachments.erase(AttachmentHash);
+ }
- auto It = Attachments.find(AttachmentHash);
- const int CurrentOpLSN = It->second;
+ for (const auto& It : ChunkedUploadAttachments)
+ {
+ UploadAttachments.erase(It.first);
+ }
+ for (const auto& It : LargeChunkHashes)
+ {
+ UploadAttachments.erase(It);
+ }
- BlockSize += PayloadSize;
- if (BuildBlocks)
+ std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext);
+ for (size_t KnownBlockIndex : ReusedBlockIndexes)
+ {
+ const Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
{
- ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload)));
+ if (ChunkedHashes.erase(KnownHash) == 1)
+ {
+ ReusedAttachmentCount++;
+ }
}
- else
+ }
+
+ ReusedBlockIndexes.insert(ReusedBlockIndexes.end(), ReusedBlockFromChunking.begin(), ReusedBlockFromChunking.end());
+ std::sort(ReusedBlockIndexes.begin(), ReusedBlockIndexes.end());
+ auto UniqueKnownBlocksEnd = std::unique(ReusedBlockIndexes.begin(), ReusedBlockIndexes.end());
+ size_t ReuseBlockCount = std::distance(ReusedBlockIndexes.begin(), UniqueKnownBlocksEnd);
+ if (ReuseBlockCount > 0)
+ {
+ Blocks.reserve(ReuseBlockCount);
+ for (auto It = ReusedBlockIndexes.begin(); It != UniqueKnownBlocksEnd; It++)
{
- Payload = {};
+ Blocks.push_back(KnownBlocks[*It]);
}
+ ReportMessage(OptionalContext, fmt::format("Reused {} attachments from {} blocks", ReusedAttachmentCount, ReuseBlockCount));
+ }
+
+ std::vector<std::pair<IoHash, Oid>> SortedUploadAttachments;
+ SortedUploadAttachments.reserve(UploadAttachments.size());
+ for (const auto& It : UploadAttachments)
+ {
+ SortedUploadAttachments.push_back(std::make_pair(It.first, It.second.Key));
+ }
+
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
+
+ ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", SortedUploadAttachments.size(), TotalOpCount));
+
+ // Sort attachments so we get predictable blocks for the same oplog upload
+ std::sort(SortedUploadAttachments.begin(),
+ SortedUploadAttachments.end(),
+ [](const std::pair<IoHash, Oid>& Lhs, const std::pair<IoHash, Oid>& Rhs) {
+ if (Lhs.second == Rhs.second)
+ {
+ // Same key, sort by raw hash
+ return Lhs.first < Rhs.first;
+ }
+ // Sort by key
+ return Lhs.second < Rhs.second;
+ });
+
+ std::vector<size_t> ChunkedFilesOrder;
+ ChunkedFilesOrder.reserve(ChunkedFiles.size());
+ for (size_t Index = 0; Index < ChunkedFiles.size(); Index++)
+ {
+ ChunkedFilesOrder.push_back(Index);
+ }
+ std::sort(ChunkedFilesOrder.begin(), ChunkedFilesOrder.end(), [&ChunkedFiles](size_t Lhs, size_t Rhs) {
+ return ChunkedFiles[Lhs].Chunked.Info.RawHash < ChunkedFiles[Rhs].Chunked.Info.RawHash;
+ });
+
+ // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded
+ // ChunkedHashes contains all chunked up chunks to be composed into blocks
+
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
+ ReportMessage(OptionalContext,
+ fmt::format("Assembling {} attachments and {} chunked parts from {} ops into blocks and loose attachments",
+ SortedUploadAttachments.size(),
+ ChunkedHashes.size(),
+ TotalOpCount));
+
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
+
+ // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded
+ // ChunkedHashes contains all chunked up chunks to be composed into blocks
+
+ size_t ChunkAssembleCount = SortedUploadAttachments.size() + ChunkedHashes.size();
+ size_t ChunksAssembled = 0;
+ ReportMessage(OptionalContext, fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount));
- if (BlockSize >= MaxBlockSize && (CurrentOpLSN != LastLSNOp))
+ Latch BlockCreateLatch(1);
+ size_t GeneratedBlockCount = 0;
+ size_t BlockSize = 0;
+ std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock;
+
+ Oid LastOpKey = Oid::Zero;
+ uint32_t ComposedBlocks = 0;
+
+ uint64_t CreateBlocksStartMS = Timer.GetElapsedTimeMs();
+ try
+ {
+ uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs();
+ std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
+ auto NewBlock = [&]() {
+ size_t BlockIndex = AddBlock(BlocksLock, Blocks);
+ size_t ChunkCount = ChunksInBlock.size();
+ if (BuildBlocks)
+ {
+ CreateBlock(WorkerPool,
+ BlockCreateLatch,
+ std::move(ChunksInBlock),
+ BlocksLock,
+ Blocks,
+ BlockIndex,
+ AsyncOnBlock,
+ RemoteResult);
+ ComposedBlocks++;
+ }
+ else
+ {
+ ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size());
+ OnBlockChunks(std::move(ChunksInBlock));
+ }
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope _(BlocksLock);
+ Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
+ BlockAttachmentHashes.begin(),
+ BlockAttachmentHashes.end());
+ }
+ uint64_t NowMS = Timer.GetElapsedTimeMs();
+ ZEN_INFO("Assembled block {} with {} chunks in {} ({})",
+ BlockIndex,
+ ChunkCount,
+ NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS),
+ NiceBytes(BlockSize));
+ FetchAttachmentsStartMS = NowMS;
+ BlockAttachmentHashes.clear();
+ ChunksInBlock.clear();
+ BlockSize = 0;
+ GeneratedBlockCount++;
+ };
+
+ for (auto HashIt = SortedUploadAttachments.begin(); HashIt != SortedUploadAttachments.end(); HashIt++)
{
- size_t BlockIndex = AddBlock(BlocksLock, Blocks);
- size_t ChunkCount = ChunksInBlock.size();
- if (BuildBlocks)
+ if (IsCancelled(OptionalContext))
{
- CreateBlock(WorkerPool,
- BlockCreateLatch,
- std::move(ChunksInBlock),
- BlocksLock,
- Blocks,
- BlockIndex,
- AsyncOnBlock,
- RemoteResult);
- ComposedBlocks++;
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ break;
}
- else
+ if (ChunksAssembled % 1000 == 0)
{
- ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size());
- OnBlockChunks(BlockAttachmentHashes);
+ ReportProgress(
+ OptionalContext,
+ fmt::format("Assembling blocks: {} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
+ ChunkAssembleCount,
+ ChunkAssembleCount - ChunksAssembled);
}
+ const IoHash& RawHash(HashIt->first);
+ const Oid CurrentOpKey = HashIt->second;
+ const IoHash& AttachmentHash(HashIt->first);
+ auto InfoIt = UploadAttachments.find(RawHash);
+ ZEN_ASSERT(InfoIt != UploadAttachments.end());
+ uint64_t PayloadSize = InfoIt->second.Size;
+
+ if (BlockAttachmentHashes.insert(AttachmentHash).second)
{
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope _(BlocksLock);
- Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
- BlockAttachmentHashes.begin(),
- BlockAttachmentHashes.end());
+ if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end())
+ {
+ ChunksInBlock.emplace_back(std::make_pair(RawHash, [IoBuffer = SharedBuffer(It->second)](const IoHash&) {
+ return CompositeBuffer(IoBuffer);
+ }));
+ LooseUploadAttachments.erase(It);
+ }
+ else
+ {
+ ChunksInBlock.emplace_back(std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) {
+ return CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash)));
+ }));
+ }
+ BlockSize += PayloadSize;
+
+ if (BlockSize >= MaxBlockSize && (CurrentOpKey != LastOpKey))
+ {
+ NewBlock();
+ }
+ LastOpKey = CurrentOpKey;
+ ChunksAssembled++;
}
- uint64_t NowMS = Timer.GetElapsedTimeMs();
- ZEN_INFO("Assembled block {} with {} chunks in {} ({})",
- BlockIndex,
- ChunkCount,
- NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS),
- NiceBytes(BlockSize));
- FetchAttachmentsStartMS = NowMS;
- BlockAttachmentHashes.clear();
- ChunksInBlock.clear();
- BlockSize = 0;
- GeneratedBlockCount++;
}
- LastLSNOp = CurrentOpLSN;
- }
-
- if (BlockSize > 0)
- {
- if (!IsCancelled(OptionalContext))
+ if (!RemoteResult.IsError())
{
- size_t BlockIndex = AddBlock(BlocksLock, Blocks);
- size_t ChunkCount = ChunksInBlock.size();
- if (BuildBlocks)
+ // Keep the chunked files as separate blocks to make the blocks generated
+ // more consistent
+ if (BlockSize > 0)
{
- CreateBlock(WorkerPool,
- BlockCreateLatch,
- std::move(ChunksInBlock),
- BlocksLock,
- Blocks,
- BlockIndex,
- AsyncOnBlock,
- RemoteResult);
- ComposedBlocks++;
+ NewBlock();
}
- else
+
+ for (size_t ChunkedFileIndex : ChunkedFilesOrder)
{
- ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size());
- OnBlockChunks(BlockAttachmentHashes);
+ const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex];
+ const ChunkedInfoWithSource& Chunked = ChunkedFile.Chunked;
+ size_t ChunkCount = Chunked.Info.ChunkHashes.size();
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++)
+ {
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ break;
+ }
+ if (ChunksAssembled % 1000 == 0)
+ {
+ ReportProgress(OptionalContext,
+ fmt::format("Assembling blocks: {} attachments processed, {} blocks assembled",
+ ChunksAssembled,
+ ComposedBlocks),
+ ChunkAssembleCount,
+ ChunkAssembleCount - ChunksAssembled);
+ }
+ const IoHash& ChunkHash = ChunkedFile.Chunked.Info.ChunkHashes[ChunkIndex];
+ if (auto FindIt = ChunkedHashes.find(ChunkHash); FindIt != ChunkedHashes.end())
+ {
+ if (BlockAttachmentHashes.insert(ChunkHash).second)
+ {
+ const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex];
+ ChunksInBlock.emplace_back(std::make_pair(
+ ChunkHash,
+ [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](const IoHash&) {
+ return CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::None)
+ .GetCompressed();
+ }));
+ BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size;
+ if (BuildBlocks)
+ {
+ if (BlockSize >= MaxBlockSize)
+ {
+ NewBlock();
+ }
+ }
+ ChunksAssembled++;
+ }
+ ChunkedHashes.erase(FindIt);
+ }
+ }
}
+ }
+
+ if (BlockSize > 0 && !RemoteResult.IsError())
+ {
+ if (!IsCancelled(OptionalContext))
{
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope _(BlocksLock);
- Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
- BlockAttachmentHashes.begin(),
- BlockAttachmentHashes.end());
+ NewBlock();
}
- uint64_t NowMS = Timer.GetElapsedTimeMs();
- ZEN_INFO("Assembled block {} with {} chunks in {} ({})",
- BlockIndex,
- ChunkCount,
- NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS),
- NiceBytes(BlockSize));
- FetchAttachmentsStartMS = NowMS;
-
- BlockAttachmentHashes.clear();
- ChunksInBlock.clear();
- BlockSize = 0;
- GeneratedBlockCount++;
}
- }
- ReportProgress(OptionalContext,
- fmt::format("Resolving attachments: {} large, {} small, {} blocks assembled",
- ResolvedLargeCount,
- ResolvedSmallCount,
- ComposedBlocks),
- SortedAttachments.size(),
- 0);
- ReportMessage(OptionalContext,
- fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}",
- SortedAttachments.size(),
- OpLSNToKey.size(),
- GeneratedBlockCount,
- LargeAttachmentCount,
- NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs()))));
+ ReportProgress(OptionalContext,
+ fmt::format("Assembling blocks: {} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
+ ChunkAssembleCount,
+ 0);
- if (IsCancelled(OptionalContext))
+ ReportMessage(OptionalContext,
+ fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and in {}",
+ ChunkAssembleCount,
+ TotalOpCount,
+ GeneratedBlockCount,
+ NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs()))));
+
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ptrdiff_t Remaining = BlockCreateLatch.Remaining();
+ ReportProgress(OptionalContext,
+ fmt::format("Aborting, {} blocks remaining...", Remaining),
+ GeneratedBlockCount,
+ Remaining);
+ }
+ if (GeneratedBlockCount > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", 0), GeneratedBlockCount, 0);
+ }
+ return {};
+ }
+ }
+ catch (std::exception& Ex)
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
BlockCreateLatch.CountDown();
while (!BlockCreateLatch.Wait(1000))
{
- ptrdiff_t Remaining = BlockCreateLatch.Remaining();
- ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", Remaining), GeneratedBlockCount, Remaining);
}
- if (GeneratedBlockCount > 0)
- {
- ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", 0), GeneratedBlockCount, 0);
- }
- return {};
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), "Block creation failed", Ex.what());
+ throw;
}
BlockCreateLatch.CountDown();
@@ -872,6 +1226,7 @@ BuildContainer(CidStore& ChunkStore,
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
while (!BlockCreateLatch.Wait(1000))
{
Remaining = BlockCreateLatch.Remaining();
@@ -885,9 +1240,13 @@ BuildContainer(CidStore& ChunkStore,
}
ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", Remaining), GeneratedBlockCount, Remaining);
}
+
if (GeneratedBlockCount > 0)
{
+ uint64_t NowMS = Timer.GetElapsedTimeMs();
ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0);
+ ReportMessage(OptionalContext,
+ fmt::format("Created {} blocks in {}", GeneratedBlockCount, NiceTimeSpanMs(NowMS - CreateBlocksStartMS)));
}
if (!RemoteResult.IsError())
@@ -937,6 +1296,35 @@ BuildContainer(CidStore& ChunkStore,
}
}
OplogContinerWriter.EndArray(); // "blocks"sv
+ OplogContinerWriter.BeginArray("chunkedfiles"sv);
+ {
+ for (const ChunkedFile& F : ChunkedFiles)
+ {
+ OplogContinerWriter.BeginObject();
+ {
+ OplogContinerWriter.AddHash("rawhash"sv, F.Chunked.Info.RawHash);
+ OplogContinerWriter.AddInteger("rawsize"sv, F.Chunked.Info.RawSize);
+ OplogContinerWriter.BeginArray("chunks"sv);
+ {
+ for (const IoHash& RawHash : F.Chunked.Info.ChunkHashes)
+ {
+ OplogContinerWriter.AddHash(RawHash);
+ }
+ }
+ OplogContinerWriter.EndArray(); // "chunks"
+ OplogContinerWriter.BeginArray("sequence"sv);
+ {
+ for (uint32_t ChunkIndex : F.Chunked.Info.ChunkSequence)
+ {
+ OplogContinerWriter.AddInteger(ChunkIndex);
+ }
+ }
+ OplogContinerWriter.EndArray(); // "sequence"
+ }
+ OplogContinerWriter.EndObject();
+ }
+ }
+ OplogContinerWriter.EndArray(); // "chunkedfiles"sv
OplogContinerWriter.BeginArray("chunks"sv);
{
@@ -959,11 +1347,12 @@ BuildContainer(CidStore& ChunkStore,
ProjectStore::Oplog& Oplog,
size_t MaxBlockSize,
size_t MaxChunkEmbedSize,
+ size_t ChunkFileSizeLimit,
bool BuildBlocks,
bool IgnoreMissingAttachments,
const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
- const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
+ const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles)
{
WorkerThreadPool& WorkerPool = GetSmallWorkerPool();
@@ -974,6 +1363,7 @@ BuildContainer(CidStore& ChunkStore,
Oplog,
MaxBlockSize,
MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
BuildBlocks,
IgnoreMissingAttachments,
{},
@@ -1001,7 +1391,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments,
- const std::vector<std::vector<IoHash>>& BlockChunks,
+ const std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>>& BlockChunks,
const std::unordered_map<IoHash, IoBuffer, IoHash::Hasher>& CreatedBlocks,
const tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LooseFileAttachments,
const std::unordered_set<IoHash, IoHash::Hasher>& Needs,
@@ -1019,11 +1409,12 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
ReportMessage(OptionalContext, "Filtering needed attachments for upload...");
- std::unordered_set<IoHash, IoHash::Hasher> AttachmentsToUpload;
+ std::unordered_set<IoHash, IoHash::Hasher> AttachmentsToUpload;
+ std::unordered_map<IoHash, FetchChunkFunc, IoHash::Hasher> BulkBlockAttachmentsToUpload;
- size_t BlockAttachmentCountToUpload = 0;
- size_t LargeAttachmentCountToUpload = 0;
- std::atomic<ptrdiff_t> BulkAttachmentCountToUpload = 0;
+ size_t BlockAttachmentCountToUpload = 0;
+ size_t LargeAttachmentCountToUpload = 0;
+ size_t BulkAttachmentCountToUpload = 0;
AttachmentsToUpload.reserve(ForceAll ? CreatedBlocks.size() + LargeAttachments.size() : Needs.size());
for (const auto& CreatedBlock : CreatedBlocks)
@@ -1042,39 +1433,19 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
LargeAttachmentCountToUpload++;
}
}
- for (const std::vector<IoHash>& BlockHashes : BlockChunks)
+ for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& BlockHashes : BlockChunks)
{
- if (ForceAll)
- {
- AttachmentsToUpload.insert(BlockHashes.begin(), BlockHashes.end());
- BulkAttachmentCountToUpload += BlockHashes.size();
- continue;
- }
- for (const IoHash& Hash : BlockHashes)
+ for (const std::pair<IoHash, FetchChunkFunc>& Chunk : BlockHashes)
{
- if (Needs.contains(Hash))
+ if (ForceAll || Needs.contains(Chunk.first))
{
- AttachmentsToUpload.insert(Hash);
+ BulkBlockAttachmentsToUpload.insert(std::make_pair(Chunk.first, Chunk.second));
BulkAttachmentCountToUpload++;
}
}
}
- for (const IoHash& Needed : Needs)
- {
- if (!AttachmentsToUpload.contains(Needed))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- "Invalid attachment",
- fmt::format("Upload requested an unknown attachment '{}'", Needed));
- ReportMessage(
- OptionalContext,
- fmt::format("Failed to upload attachment '{}'. ({}): {}", Needed, RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return;
- }
- }
-
- if (AttachmentsToUpload.empty())
+ if (AttachmentsToUpload.empty() && BulkBlockAttachmentsToUpload.empty())
{
ReportMessage(OptionalContext, "No attachments needed");
return;
@@ -1085,30 +1456,29 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
if (!RemoteResult.IsError())
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
}
return;
}
ReportMessage(OptionalContext,
fmt::format("Saving {} attachments ({} blocks, {} attachments, {} bulk attachments)",
- AttachmentsToUpload.size(),
+ AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(),
BlockAttachmentCountToUpload,
LargeAttachmentCountToUpload,
- BulkAttachmentCountToUpload.load()));
+ BulkAttachmentCountToUpload));
+
+ Stopwatch Timer;
ptrdiff_t AttachmentsToSave(0);
Latch SaveAttachmentsLatch(1);
- for (const IoHash& RawHash : LargeAttachments)
+ for (const IoHash& RawHash : AttachmentsToUpload)
{
if (RemoteResult.IsError())
{
break;
}
- if (!AttachmentsToUpload.contains(RawHash))
- {
- continue;
- }
SaveAttachmentsLatch.AddCount(1);
AttachmentsToSave++;
@@ -1126,10 +1496,12 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
{
return;
}
+ bool IsBlock = false;
IoBuffer Payload;
if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
{
Payload = BlockIt->second;
+ IsBlock = true;
}
else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
{
@@ -1161,76 +1533,24 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
RemoteResult.GetErrorReason()));
return;
}
- Info.AttachmentsUploaded.fetch_add(1);
- Info.AttachmentBytesUploaded.fetch_add(PayloadSize);
- ZEN_INFO("Saved large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(PayloadSize));
- return;
- });
- }
-
- if (IsCancelled(OptionalContext))
- {
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- }
- return;
- }
-
- for (auto& It : CreatedBlocks)
- {
- if (RemoteResult.IsError())
- {
- break;
- }
- const IoHash& RawHash = It.first;
- if (!AttachmentsToUpload.contains(RawHash))
- {
- continue;
- }
- IoBuffer Payload = It.second;
- ZEN_ASSERT(Payload);
- SaveAttachmentsLatch.AddCount(1);
- AttachmentsToSave++;
- WorkerPool.ScheduleWork([&ChunkStore,
- &RemoteStore,
- &SaveAttachmentsLatch,
- &RemoteResult,
- Payload = std::move(Payload),
- RawHash,
- &Info,
- OptionalContext]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
+ if (IsBlock)
{
- return;
+ Info.AttachmentBlocksUploaded.fetch_add(1);
+ Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize);
+ ZEN_INFO("Saved block attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(PayloadSize));
}
-
- size_t PayloadSize = Payload.GetSize();
- RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash);
- if (Result.ErrorCode)
+ else
{
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachment '{}', {} ({}): {}",
- RawHash,
- NiceBytes(PayloadSize),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- return;
+ Info.AttachmentsUploaded.fetch_add(1);
+ Info.AttachmentBytesUploaded.fetch_add(PayloadSize);
+ ZEN_INFO("Saved large attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(PayloadSize));
}
-
- Info.AttachmentBlocksUploaded.fetch_add(1);
- Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize);
-
- ZEN_INFO("Saved block attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(PayloadSize));
return;
});
}
@@ -1240,80 +1560,85 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
if (!RemoteResult.IsError())
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
}
return;
}
- for (const std::vector<IoHash>& Chunks : BlockChunks)
+ if (!BulkBlockAttachmentsToUpload.empty())
{
- if (RemoteResult.IsError())
- {
- break;
- }
-
- std::vector<IoHash> NeededChunks;
- NeededChunks.reserve(Chunks.size());
- for (const IoHash& Chunk : Chunks)
+ for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& Chunks : BlockChunks)
{
- if (AttachmentsToUpload.contains(Chunk))
+ if (RemoteResult.IsError())
{
- NeededChunks.push_back(Chunk);
+ break;
}
- }
- if (NeededChunks.empty())
- {
- continue;
- }
- SaveAttachmentsLatch.AddCount(1);
- AttachmentsToSave++;
- WorkerPool.ScheduleWork([&RemoteStore,
- &ChunkStore,
- &SaveAttachmentsLatch,
- &RemoteResult,
- &Chunks,
- NeededChunks = std::move(NeededChunks),
- &BulkAttachmentCountToUpload,
- &Info,
- OptionalContext]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- size_t ChunksSize = 0;
- std::vector<SharedBuffer> ChunkBuffers;
- ChunkBuffers.reserve(NeededChunks.size());
- for (const IoHash& Chunk : NeededChunks)
+ std::vector<IoHash> NeededChunks;
+ NeededChunks.reserve(Chunks.size());
+ for (const std::pair<IoHash, FetchChunkFunc>& Chunk : Chunks)
{
- IoBuffer ChunkPayload = ChunkStore.FindChunkByCid(Chunk);
- if (!ChunkPayload)
+ const IoHash& ChunkHash = Chunk.first;
+ if (BulkBlockAttachmentsToUpload.contains(ChunkHash) && !AttachmentsToUpload.contains(ChunkHash))
{
- RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
- fmt::format("Missing chunk {}"sv, Chunk),
- fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
- ChunkBuffers.clear();
- break;
+ NeededChunks.push_back(Chunk.first);
}
- ChunksSize += ChunkPayload.GetSize();
- ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload)));
}
- RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
- if (Result.ErrorCode)
+ if (NeededChunks.empty())
{
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachments with {} chunks ({}): {}",
- Chunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- return;
+ continue;
}
- Info.AttachmentsUploaded.fetch_add(NeededChunks.size());
- Info.AttachmentBytesUploaded.fetch_add(ChunksSize);
-
- ZEN_INFO("Saved {} bulk attachments in {} ({})",
- Chunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(ChunksSize));
- BulkAttachmentCountToUpload.fetch_sub(Chunks.size());
- });
+
+ SaveAttachmentsLatch.AddCount(1);
+ AttachmentsToSave++;
+ WorkerPool.ScheduleWork([&RemoteStore,
+ &ChunkStore,
+ &SaveAttachmentsLatch,
+ &RemoteResult,
+ NeededChunks = std::move(NeededChunks),
+ &BulkBlockAttachmentsToUpload,
+ &Info,
+ OptionalContext]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ size_t ChunksSize = 0;
+ std::vector<SharedBuffer> ChunkBuffers;
+ ChunkBuffers.reserve(NeededChunks.size());
+ for (const IoHash& Chunk : NeededChunks)
+ {
+ auto It = BulkBlockAttachmentsToUpload.find(Chunk);
+ ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end());
+ CompositeBuffer ChunkPayload = It->second(It->first);
+ if (!ChunkPayload)
+ {
+ RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
+ fmt::format("Missing chunk {}"sv, Chunk),
+ fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
+ ChunkBuffers.clear();
+ break;
+ }
+ ChunksSize += ChunkPayload.GetSize();
+ ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer()));
+ }
+ RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to save attachments with {} chunks ({}): {}",
+ NeededChunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ return;
+ }
+ Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size());
+ Info.AttachmentBytesUploaded.fetch_add(ChunksSize);
+
+ ZEN_INFO("Saved {} bulk attachments in {} ({})",
+ NeededChunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(ChunksSize));
+ });
+ }
}
SaveAttachmentsLatch.CountDown();
@@ -1325,18 +1650,22 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
if (!RemoteResult.IsError())
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
}
}
- ReportProgress(
- OptionalContext,
- fmt::format("Saving attachments, {} remaining...", BlockChunks.empty() ? Remaining : BulkAttachmentCountToUpload.load()),
- AttachmentsToSave,
- Remaining);
+ ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", Remaining), AttachmentsToSave, Remaining);
}
if (AttachmentsToSave > 0)
{
ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0);
}
+ ReportMessage(OptionalContext,
+ fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {}",
+ AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(),
+ BlockAttachmentCountToUpload,
+ LargeAttachmentCountToUpload,
+ BulkAttachmentCountToUpload,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs())));
}
RemoteProjectStore::Result
@@ -1346,6 +1675,7 @@ SaveOplog(CidStore& ChunkStore,
ProjectStore::Oplog& Oplog,
size_t MaxBlockSize,
size_t MaxChunkEmbedSize,
+ size_t ChunkFileSizeLimit,
bool EmbedLooseFiles,
bool ForceUpload,
bool IgnoreMissingAttachments,
@@ -1409,8 +1739,8 @@ SaveOplog(CidStore& ChunkStore,
ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()));
};
- std::vector<std::vector<IoHash>> BlockChunks;
- auto OnBlockChunks = [&BlockChunks](const std::unordered_set<IoHash, IoHash::Hasher>& Chunks) {
+ std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>> BlockChunks;
+ auto OnBlockChunks = [&BlockChunks](std::vector<std::pair<IoHash, FetchChunkFunc>>&& Chunks) {
BlockChunks.push_back({Chunks.begin(), Chunks.end()});
ZEN_DEBUG("Found {} block chunks", Chunks.size());
};
@@ -1480,11 +1810,16 @@ SaveOplog(CidStore& ChunkStore,
}
}
+ // TODO: We need to check if remote store actually *has* all KnownBlocks
+ // We can't reconstruct known blocks on demand as they may contain chunks that we don't have
+ // and we don't care about :(
+
CbObject OplogContainerObject = BuildContainer(ChunkStore,
Project,
Oplog,
MaxBlockSize,
MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
RemoteStoreInfo.CreateBlocks,
IgnoreMissingAttachments,
KnownBlocks,
@@ -1504,6 +1839,7 @@ SaveOplog(CidStore& ChunkStore,
RemoteProjectStore::Result Result = {.ErrorCode = 0,
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
.Text = "Operation cancelled"};
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return Result;
}
@@ -1550,6 +1886,7 @@ SaveOplog(CidStore& ChunkStore,
RemoteProjectStore::Result Result = {.ErrorCode = 0,
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
.Text = "Operation cancelled"};
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Text));
return Result;
}
@@ -1629,6 +1966,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
const std::function<bool(const IoHash& RawHash)>& HasAttachment,
const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
JobContext* OptionalContext)
{
using namespace std::literals;
@@ -1657,10 +1995,11 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment();
CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView();
+
+ std::vector<IoHash> NeededChunks;
+ NeededChunks.reserve(ChunksArray.Num());
if (BlockHash == IoHash::Zero)
{
- std::vector<IoHash> NeededChunks;
- NeededChunks.reserve(ChunksArray.GetSize());
for (CbFieldView ChunkField : ChunksArray)
{
IoHash ChunkHash = ChunkField.AsBinaryAttachment();
@@ -1670,27 +2009,55 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
}
NeededChunks.emplace_back(ChunkHash);
}
-
- if (!NeededChunks.empty())
+ }
+ else
+ {
+ for (CbFieldView ChunkField : ChunksArray)
{
- OnNeedBlock(IoHash::Zero, std::move(NeededChunks));
+ const IoHash ChunkHash = ChunkField.AsHash();
+ if (HasAttachment(ChunkHash))
+ {
+ continue;
+ }
+ NeededChunks.emplace_back(ChunkHash);
}
- continue;
}
- for (CbFieldView ChunkField : ChunksArray)
+ if (!NeededChunks.empty())
{
- IoHash ChunkHash = ChunkField.AsHash();
- if (HasAttachment(ChunkHash))
+ OnNeedBlock(BlockHash, std::move(NeededChunks));
+ if (BlockHash != IoHash::Zero)
{
- continue;
+ NeedBlockCount++;
}
+ }
+ }
- OnNeedBlock(BlockHash, {});
- NeedBlockCount++;
- break;
+ CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView();
+ for (CbFieldView ChunkedFileField : ChunkedFilesArray)
+ {
+ CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView();
+ ChunkedInfo Chunked;
+ Chunked.RawHash = ChunkedFileView["rawhash"sv].AsHash();
+ Chunked.RawSize = ChunkedFileView["rawsize"sv].AsUInt64();
+ CbArrayView ChunksArray = ChunkedFileView["chunks"sv].AsArrayView();
+ Chunked.ChunkHashes.reserve(ChunksArray.Num());
+ for (CbFieldView ChunkField : ChunksArray)
+ {
+ const IoHash ChunkHash = ChunkField.AsHash();
+ Chunked.ChunkHashes.emplace_back(ChunkHash);
}
- };
+ CbArrayView SequenceArray = ChunkedFileView["sequence"sv].AsArrayView();
+ Chunked.ChunkSequence.reserve(SequenceArray.Num());
+ for (CbFieldView SequenceField : SequenceArray)
+ {
+ uint32_t SequenceIndex = SequenceField.AsUInt32();
+ ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size());
+ Chunked.ChunkSequence.push_back(SequenceIndex);
+ }
+ OnChunkedAttachment(Chunked);
+ }
+
ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
@@ -1788,7 +2155,6 @@ LoadOplog(CidStore& ChunkStore,
WorkerThreadPool& WorkerPool = GetSmallWorkerPool();
std::unordered_set<IoHash, IoHash::Hasher> Attachments;
- std::vector<std::vector<IoHash>> ChunksInBlocks;
RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo();
ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName));
@@ -1815,12 +2181,19 @@ LoadOplog(CidStore& ChunkStore,
std::atomic_size_t AttachmentCount = 0;
auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) {
- return !ForceDownload && ChunkStore.ContainsChunk(RawHash);
+ if (ForceDownload)
+ {
+ return false;
+ }
+ if (ChunkStore.ContainsChunk(RawHash))
+ {
+ return true;
+ }
+ return false;
};
auto OnNeedBlock = [&RemoteStore,
&ChunkStore,
&WorkerPool,
- &ChunksInBlocks,
&AttachmentsWorkLatch,
&AttachmentCount,
&RemoteResult,
@@ -1896,6 +2269,7 @@ LoadOplog(CidStore& ChunkStore,
&RemoteStore,
BlockHash,
&RemoteResult,
+ Chunks = std::move(Chunks),
&Info,
IgnoreMissingAttachments,
OptionalContext]() {
@@ -1922,30 +2296,36 @@ LoadOplog(CidStore& ChunkStore,
}
return;
}
- Info.AttachmentBlocksDownloaded.fetch_add(1);
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
uint64_t BlockSize = BlockResult.Bytes.GetSize();
+ Info.AttachmentBlocksDownloaded.fetch_add(1);
ZEN_INFO("Loaded block attachment '{}' in {} ({})",
BlockHash,
NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
NiceBytes(BlockSize));
- if (RemoteResult.IsError())
- {
- return;
- }
Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
-
+ std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
+ WantedChunks.reserve(Chunks.size());
+ WantedChunks.insert(Chunks.begin(), Chunks.end());
bool StoreChunksOK =
- IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
- uint64_t ChunkSize = Chunk.GetCompressedSize();
- CidStore::InsertResult InsertResult =
- ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
- if (InsertResult.New)
- {
- Info.AttachmentBytesStored.fetch_add(ChunkSize);
- Info.AttachmentsStored.fetch_add(1);
- }
- });
-
+ IterateBlock(std::move(BlockResult.Bytes),
+ [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
+ if (WantedChunks.contains(AttachmentRawHash))
+ {
+ uint64_t ChunkSize = Chunk.GetCompressedSize();
+ CidStore::InsertResult InsertResult =
+ ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(ChunkSize);
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ WantedChunks.erase(AttachmentRawHash);
+ }
+ });
if (!StoreChunksOK)
{
ReportMessage(OptionalContext,
@@ -1958,6 +2338,7 @@ LoadOplog(CidStore& ChunkStore,
{});
return;
}
+ ZEN_ASSERT(WantedChunks.empty());
});
};
@@ -2027,8 +2408,22 @@ LoadOplog(CidStore& ChunkStore,
});
};
- RemoteProjectStore::Result Result =
- SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OptionalContext);
+ std::vector<ChunkedInfo> FilesToDechunk;
+ auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) {
+ if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash))
+ {
+ Oplog.CaptureAddedAttachments(Chunked.ChunkHashes);
+ FilesToDechunk.push_back(Chunked);
+ }
+ };
+
+ RemoteProjectStore::Result Result = SaveOplogContainer(Oplog,
+ LoadContainerResult.ContainerObject,
+ HasAttachment,
+ OnNeedBlock,
+ OnNeedAttachment,
+ OnChunkedAttachment,
+ OptionalContext);
if (Result.ErrorCode != 0)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
@@ -2057,8 +2452,101 @@ LoadOplog(CidStore& ChunkStore,
}
if (Result.ErrorCode == 0)
{
+ if (!FilesToDechunk.empty())
+ {
+ ReportMessage(OptionalContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size()));
+
+ Latch DechunkLatch(1);
+ std::filesystem::path TempFilePath = Oplog.TempPath();
+ for (const ChunkedInfo& Chunked : FilesToDechunk)
+ {
+ std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString();
+ DechunkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&ChunkStore, &DechunkLatch, TempFileName, &Chunked, &RemoteResult, OptionalContext]() {
+ auto _ = MakeGuard([&DechunkLatch] { DechunkLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ Stopwatch Timer;
+ IoBuffer TmpBuffer;
+ {
+ BasicFile TmpFile;
+ TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate);
+ {
+ BasicFileWriter TmpWriter(TmpFile, 64u * 1024u);
+
+ uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder();
+ BLAKE3Stream HashingStream;
+ for (std::uint32_t SequenceIndex : Chunked.ChunkSequence)
+ {
+ const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex];
+ IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash);
+ if (!Chunk)
+ {
+ RemoteResult.SetError(
+ gsl::narrow<int>(HttpResponseCode::NotFound),
+ "Missing chunk",
+ fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
+ ReportMessage(OptionalContext,
+ fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
+ return;
+ }
+ CompositeBuffer Decompressed =
+ CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite();
+ for (const SharedBuffer& Segment : Decompressed.GetSegments())
+ {
+ MemoryView SegmentData = Segment.GetView();
+ HashingStream.Append(SegmentData);
+ TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset);
+ Offset += SegmentData.GetSize();
+ }
+ }
+ BLAKE3 RawHash = HashingStream.GetHash();
+ ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash));
+ UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash);
+ TmpWriter.Write(Header.GetData(), Header.GetSize(), 0);
+ }
+ TmpFile.Flush();
+ uint64_t TmpFileSize = TmpFile.FileSize();
+ TmpBuffer = IoBuffer(IoBuffer::File, TmpFile.Detach(), 0, TmpFileSize, /*IsWholeFile*/ true);
+ IoHash ValidateRawHash;
+ uint64_t ValidateRawSize = 0;
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(TmpBuffer, ValidateRawHash, ValidateRawSize));
+ ZEN_ASSERT(ValidateRawHash == Chunked.RawHash);
+ ZEN_ASSERT(ValidateRawSize == Chunked.RawSize);
+ }
+ ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace);
+ ZEN_INFO("Dechunked attachment {} ({}) in {}",
+ Chunked.RawHash,
+ NiceBytes(Chunked.RawSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ }
+ DechunkLatch.CountDown();
+
+ while (!DechunkLatch.Wait(1000))
+ {
+ ptrdiff_t Remaining = DechunkLatch.Remaining();
+ if (IsCancelled(OptionalContext))
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ ReportMessage(OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ }
+ }
+ ReportProgress(OptionalContext,
+ fmt::format("Dechunking attachments, {} remaining...", Remaining),
+ FilesToDechunk.size(),
+ Remaining);
+ }
+ ReportProgress(OptionalContext, fmt::format("Dechunking attachments, {} remaining...", 0), FilesToDechunk.size(), 0);
+ }
Result = RemoteResult.ConvertResult();
}
+
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
ReportMessage(OptionalContext,
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index 7254b9d3f..da93f0a27 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -11,6 +11,7 @@ namespace zen {
class CidStore;
class WorkerThreadPool;
+struct ChunkedInfo;
class RemoteProjectStore
{
@@ -84,14 +85,17 @@ public:
struct RemoteStoreOptions
{
- static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u;
- static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u;
+ static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u;
+ static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u;
+ static const size_t DefaultChunkFileSizeLimit = 256u * 1024u * 1024u;
- size_t MaxBlockSize = DefaultMaxBlockSize;
- size_t MaxChunkEmbedSize = DefaultMaxChunkEmbedSize;
+ size_t MaxBlockSize = DefaultMaxBlockSize;
+ size_t MaxChunkEmbedSize = DefaultMaxChunkEmbedSize;
+ size_t ChunkFileSizeLimit = DefaultChunkFileSizeLimit;
};
typedef std::function<IoBuffer(const IoHash& AttachmentHash)> TGetAttachmentBufferFunc;
+typedef std::function<CompositeBuffer(const IoHash& RawHash)> FetchChunkFunc;
RemoteProjectStore::LoadContainerResult BuildContainer(
CidStore& ChunkStore,
@@ -99,11 +103,12 @@ RemoteProjectStore::LoadContainerResult BuildContainer(
ProjectStore::Oplog& Oplog,
size_t MaxBlockSize,
size_t MaxChunkEmbedSize,
+ size_t ChunkFileSizeLimit,
bool BuildBlocks,
bool IgnoreMissingAttachments,
const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
- const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
+ const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles);
class JobContext;
@@ -112,8 +117,9 @@ RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog,
const CbObject& ContainerObject,
const std::function<bool(const IoHash& RawHash)>& HasAttachment,
const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
- JobContext* OptionalContext);
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ const std::function<void(const ChunkedInfo& Chunked)>& OnChunkedAttachment,
+ JobContext* OptionalContext);
RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
@@ -121,6 +127,7 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
ProjectStore::Oplog& Oplog,
size_t MaxBlockSize,
size_t MaxChunkEmbedSize,
+ size_t ChunkFileSizeLimit,
bool EmbedLooseFiles,
bool ForceUpload,
bool IgnoreMissingAttachments,
@@ -133,7 +140,7 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
bool IgnoreMissingAttachments,
JobContext* OptionalContext);
-CompressedBuffer GenerateBlock(std::vector<SharedBuffer>&& Chunks);
+CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks);
bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
} // namespace zen
diff --git a/src/zenstore/chunkedfile.cpp b/src/zenstore/chunkedfile.cpp
new file mode 100644
index 000000000..0b66c7b9b
--- /dev/null
+++ b/src/zenstore/chunkedfile.cpp
@@ -0,0 +1,505 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenstore/chunkedfile.h>
+#include <zenutil/basicfile.h>
+
+#include "chunking.h"
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+namespace {
+ struct ChunkedHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x646b6863; // chkd
+ static constexpr uint32_t CurrentVersion = 1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint32_t ChunkSequenceLength;
+ uint32_t ChunkHashCount;
+ uint64_t ChunkSequenceOffset;
+ uint64_t ChunkHashesOffset;
+ uint64_t RawSize = 0;
+ IoHash RawHash;
+ };
+} // namespace
+
+IoBuffer
+SerializeChunkedInfo(const ChunkedInfo& Info)
+{
+ size_t HeaderSize = RoundUp(sizeof(ChunkedHeader), 16) + RoundUp(sizeof(uint32_t) * Info.ChunkSequence.size(), 16) +
+ RoundUp(sizeof(IoHash) * Info.ChunkHashes.size(), 16);
+ IoBuffer HeaderData(HeaderSize);
+
+ ChunkedHeader Header;
+ Header.ChunkSequenceLength = gsl::narrow<uint32_t>(Info.ChunkSequence.size());
+ Header.ChunkHashCount = gsl::narrow<uint32_t>(Info.ChunkHashes.size());
+ Header.ChunkSequenceOffset = RoundUp(sizeof(ChunkedHeader), 16);
+ Header.ChunkHashesOffset = RoundUp(Header.ChunkSequenceOffset + sizeof(uint32_t) * Header.ChunkSequenceLength, 16);
+ Header.RawSize = Info.RawSize;
+ Header.RawHash = Info.RawHash;
+
+ MutableMemoryView WriteView = HeaderData.GetMutableView();
+ {
+ MutableMemoryView HeaderWriteView = WriteView.Left(sizeof(Header));
+ HeaderWriteView.CopyFrom(MemoryView(&Header, sizeof(Header)));
+ }
+ {
+ MutableMemoryView ChunkSequenceWriteView = WriteView.Mid(Header.ChunkSequenceOffset, sizeof(uint32_t) * Header.ChunkSequenceLength);
+ ChunkSequenceWriteView.CopyFrom(MemoryView(Info.ChunkSequence.data(), ChunkSequenceWriteView.GetSize()));
+ }
+ {
+ MutableMemoryView ChunksWriteView = WriteView.Mid(Header.ChunkHashesOffset, sizeof(IoHash) * Header.ChunkHashCount);
+ ChunksWriteView.CopyFrom(MemoryView(Info.ChunkHashes.data(), ChunksWriteView.GetSize()));
+ }
+
+ return HeaderData;
+}
+
+ChunkedInfo
+DeserializeChunkedInfo(IoBuffer& Buffer)
+{
+ MemoryView View = Buffer.GetView();
+ ChunkedHeader Header;
+ {
+ MutableMemoryView HeaderWriteView(&Header, sizeof(Header));
+ HeaderWriteView.CopyFrom(View.Left(sizeof(Header)));
+ }
+ if (Header.Magic != ChunkedHeader::ExpectedMagic)
+ {
+ return {};
+ }
+ if (Header.Version != ChunkedHeader::CurrentVersion)
+ {
+ return {};
+ }
+ ChunkedInfo Info;
+ Info.RawSize = Header.RawSize;
+ Info.RawHash = Header.RawHash;
+ Info.ChunkSequence.resize(Header.ChunkSequenceLength);
+ Info.ChunkHashes.resize(Header.ChunkHashCount);
+ {
+ MutableMemoryView ChunkSequenceWriteView(Info.ChunkSequence.data(), sizeof(uint32_t) * Header.ChunkSequenceLength);
+ ChunkSequenceWriteView.CopyFrom(View.Mid(Header.ChunkSequenceOffset, ChunkSequenceWriteView.GetSize()));
+ }
+ {
+ MutableMemoryView ChunksWriteView(Info.ChunkHashes.data(), sizeof(IoHash) * Header.ChunkHashCount);
+ ChunksWriteView.CopyFrom(View.Mid(Header.ChunkHashesOffset, ChunksWriteView.GetSize()));
+ }
+
+ return Info;
+}
+
+void
+Reconstruct(const ChunkedInfo& Info, const std::filesystem::path& TargetPath, std::function<IoBuffer(const IoHash& ChunkHash)> GetChunk)
+{
+ BasicFile Reconstructed;
+ Reconstructed.Open(TargetPath, BasicFile::Mode::kTruncate);
+ BasicFileWriter ReconstructedWriter(Reconstructed, 64 * 1024);
+ uint64_t Offset = 0;
+ for (uint32_t SequenceIndex : Info.ChunkSequence)
+ {
+ IoBuffer Chunk = GetChunk(Info.ChunkHashes[SequenceIndex]);
+ ReconstructedWriter.Write(Chunk.GetData(), Chunk.GetSize(), Offset);
+ Offset += Chunk.GetSize();
+ }
+}
+
+ChunkedInfoWithSource
+ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Params)
+{
+ ChunkedInfoWithSource Result;
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> FoundChunks;
+
+ ZenChunkHelper Chunker;
+ Chunker.SetUseThreshold(Params.UseThreshold);
+ Chunker.SetChunkSize(Params.MinSize, Params.MaxSize, Params.AvgSize);
+ size_t End = Offset + Size;
+ const size_t ScanBufferSize = 1u * 1024 * 1024; // (Params.MaxSize * 9) / 3;//1 * 1024 * 1024;
+ BasicFileBuffer RawBuffer(RawData, ScanBufferSize);
+ MemoryView SliceView = RawBuffer.MakeView(Min(End - Offset, ScanBufferSize), Offset);
+ ZEN_ASSERT(!SliceView.IsEmpty());
+ size_t SliceSize = SliceView.GetSize();
+ IoHashStream RawHashStream;
+ while (Offset < End)
+ {
+ size_t ScanLength = Chunker.ScanChunk(SliceView.GetData(), SliceSize);
+ if (ScanLength == ZenChunkHelper::kNoBoundaryFound)
+ {
+ if (Offset + SliceSize == End)
+ {
+ ScanLength = SliceSize;
+ }
+ else
+ {
+ SliceView = RawBuffer.MakeView(Min(End - Offset, ScanBufferSize), Offset);
+ SliceSize = SliceView.GetSize();
+ Chunker.Reset();
+ continue;
+ }
+ }
+ uint32_t ChunkLength = gsl::narrow<uint32_t>(ScanLength); // +HashedLength);
+ MemoryView ChunkView = SliceView.Left(ScanLength);
+ RawHashStream.Append(ChunkView);
+ IoHash ChunkHash = IoHash::HashBuffer(ChunkView);
+ SliceView.RightChopInline(ScanLength);
+ if (auto It = FoundChunks.find(ChunkHash); It != FoundChunks.end())
+ {
+ Result.Info.ChunkSequence.push_back(It->second);
+ }
+ else
+ {
+ uint32_t ChunkIndex = gsl::narrow<uint32_t>(Result.Info.ChunkHashes.size());
+ FoundChunks.insert_or_assign(ChunkHash, ChunkIndex);
+ Result.Info.ChunkHashes.push_back(ChunkHash);
+ Result.ChunkSources.push_back(ChunkSource{.Offset = Offset, .Size = ChunkLength});
+ Result.Info.ChunkSequence.push_back(ChunkIndex);
+ }
+
+ SliceSize = SliceView.GetSize();
+ Offset += ChunkLength;
+ }
+ Result.Info.RawSize = Size;
+ Result.Info.RawHash = RawHashStream.GetHash();
+ return Result;
+}
+
+} // namespace zen
+
+#if ZEN_WITH_TESTS
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iohash.h>
+# include <zencore/logging.h>
+# include <zencore/scopeguard.h>
+# include <zencore/timer.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
+
+# include "chunking.h"
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <tsl/robin_map.h>
+# include <tsl/robin_set.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+# if 0
+TEST_CASE("chunkedfile.findparams")
+{
+# if 1
+ DirectoryContent SourceContent1;
+ GetDirectoryContent("E:\\Temp\\ChunkingTestData\\31379208", DirectoryContent::IncludeFilesFlag, SourceContent1);
+ const std::vector<std::filesystem::path>& SourceFiles1 = SourceContent1.Files;
+ DirectoryContent SourceContent2;
+ GetDirectoryContent("E:\\Temp\\ChunkingTestData\\31379208_2", DirectoryContent::IncludeFilesFlag, SourceContent2);
+ const std::vector<std::filesystem::path>& SourceFiles2 = SourceContent2.Files;
+# else
+ std::filesystem::path SourcePath1 =
+ "E:\\Temp\\ChunkingTestData\\31375996\\ShaderArchive-FortniteGame_Chunk10-PCD3D_SM6-PCD3D_SM6.ushaderbytecode";
+ std::filesystem::path SourcePath2 =
+ "E:\\Temp\\ChunkingTestData\\31379208\\ShaderArchive-FortniteGame_Chunk10-PCD3D_SM6-PCD3D_SM6.ushaderbytecode";
+ const std::vector<std::filesystem::path>& SourceFiles1 = {SourcePath1};
+ const std::vector<std::filesystem::path>& SourceFiles2 = {SourcePath2};
+# endif
+ ChunkedParams Params[] = {ChunkedParams{.UseThreshold = false, .MinSize = 17280, .MaxSize = 139264, .AvgSize = 36340},
+ ChunkedParams{.UseThreshold = false, .MinSize = 15456, .MaxSize = 122880, .AvgSize = 35598},
+ ChunkedParams{.UseThreshold = false, .MinSize = 16848, .MaxSize = 135168, .AvgSize = 39030},
+ ChunkedParams{.UseThreshold = false, .MinSize = 14256, .MaxSize = 114688, .AvgSize = 36222},
+ ChunkedParams{.UseThreshold = false, .MinSize = 15744, .MaxSize = 126976, .AvgSize = 36600},
+ ChunkedParams{.UseThreshold = false, .MinSize = 15264, .MaxSize = 122880, .AvgSize = 35442},
+ ChunkedParams{.UseThreshold = false, .MinSize = 16464, .MaxSize = 131072, .AvgSize = 37950},
+ ChunkedParams{.UseThreshold = false, .MinSize = 15408, .MaxSize = 122880, .AvgSize = 38914},
+ ChunkedParams{.UseThreshold = false, .MinSize = 15408, .MaxSize = 122880, .AvgSize = 35556},
+ ChunkedParams{.UseThreshold = false, .MinSize = 15360, .MaxSize = 122880, .AvgSize = 35520},
+ ChunkedParams{.UseThreshold = false, .MinSize = 15312, .MaxSize = 122880, .AvgSize = 35478},
+ ChunkedParams{.UseThreshold = false, .MinSize = 16896, .MaxSize = 135168, .AvgSize = 39072},
+ ChunkedParams{.UseThreshold = false, .MinSize = 15360, .MaxSize = 122880, .AvgSize = 38880},
+ ChunkedParams{.UseThreshold = false, .MinSize = 15840, .MaxSize = 126976, .AvgSize = 36678},
+ ChunkedParams{.UseThreshold = false, .MinSize = 16800, .MaxSize = 135168, .AvgSize = 38994},
+ ChunkedParams{.UseThreshold = false, .MinSize = 15888, .MaxSize = 126976, .AvgSize = 36714},
+ ChunkedParams{.UseThreshold = false, .MinSize = 15792, .MaxSize = 126976, .AvgSize = 36636},
+ ChunkedParams{.UseThreshold = false, .MinSize = 14880, .MaxSize = 118784, .AvgSize = 37609},
+ ChunkedParams{.UseThreshold = false, .MinSize = 15936, .MaxSize = 126976, .AvgSize = 36756},
+ ChunkedParams{.UseThreshold = false, .MinSize = 15456, .MaxSize = 122880, .AvgSize = 38955},
+ ChunkedParams{.UseThreshold = false, .MinSize = 15984, .MaxSize = 126976, .AvgSize = 36792},
+ ChunkedParams{.UseThreshold = false, .MinSize = 14400, .MaxSize = 114688, .AvgSize = 36338},
+ ChunkedParams{.UseThreshold = false, .MinSize = 14832, .MaxSize = 118784, .AvgSize = 37568},
+ ChunkedParams{.UseThreshold = false, .MinSize = 16944, .MaxSize = 135168, .AvgSize = 39108},
+ ChunkedParams{.UseThreshold = false, .MinSize = 14352, .MaxSize = 114688, .AvgSize = 36297},
+ ChunkedParams{.UseThreshold = false, .MinSize = 14208, .MaxSize = 114688, .AvgSize = 36188},
+ ChunkedParams{.UseThreshold = false, .MinSize = 14448, .MaxSize = 114688, .AvgSize = 36372},
+ ChunkedParams{.UseThreshold = false, .MinSize = 13296, .MaxSize = 106496, .AvgSize = 36592},
+ ChunkedParams{.UseThreshold = false, .MinSize = 15264, .MaxSize = 122880, .AvgSize = 38805},
+ ChunkedParams{.UseThreshold = false, .MinSize = 14304, .MaxSize = 114688, .AvgSize = 36263},
+ ChunkedParams{.UseThreshold = false, .MinSize = 14784, .MaxSize = 118784, .AvgSize = 37534},
+ ChunkedParams{.UseThreshold = false, .MinSize = 15312, .MaxSize = 122880, .AvgSize = 38839},
+ ChunkedParams{.UseThreshold = false, .MinSize = 14256, .MaxSize = 114688, .AvgSize = 39360},
+ ChunkedParams{.UseThreshold = false, .MinSize = 13776, .MaxSize = 110592, .AvgSize = 37976},
+ ChunkedParams{.UseThreshold = false, .MinSize = 14736, .MaxSize = 118784, .AvgSize = 37493},
+ ChunkedParams{.UseThreshold = false, .MinSize = 14928, .MaxSize = 118784, .AvgSize = 37643},
+ ChunkedParams{.UseThreshold = false, .MinSize = 14448, .MaxSize = 114688, .AvgSize = 39504},
+ ChunkedParams{.UseThreshold = false, .MinSize = 13392, .MaxSize = 106496, .AvgSize = 36664},
+ ChunkedParams{.UseThreshold = false, .MinSize = 13872, .MaxSize = 110592, .AvgSize = 38048},
+ ChunkedParams{.UseThreshold = false, .MinSize = 14352, .MaxSize = 114688, .AvgSize = 39432},
+ ChunkedParams{.UseThreshold = false, .MinSize = 13200, .MaxSize = 106496, .AvgSize = 36520},
+ ChunkedParams{.UseThreshold = false, .MinSize = 17328, .MaxSize = 139264, .AvgSize = 36378},
+ ChunkedParams{.UseThreshold = false, .MinSize = 17376, .MaxSize = 139264, .AvgSize = 36421},
+ ChunkedParams{.UseThreshold = false, .MinSize = 17424, .MaxSize = 139264, .AvgSize = 36459},
+ ChunkedParams{.UseThreshold = false, .MinSize = 17472, .MaxSize = 139264, .AvgSize = 36502},
+ ChunkedParams{.UseThreshold = false, .MinSize = 17520, .MaxSize = 139264, .AvgSize = 36540},
+ ChunkedParams{.UseThreshold = false, .MinSize = 17808, .MaxSize = 143360, .AvgSize = 37423},
+ ChunkedParams{.UseThreshold = false, .MinSize = 17856, .MaxSize = 143360, .AvgSize = 37466},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18000, .MaxSize = 143360, .AvgSize = 25834},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18000, .MaxSize = 143360, .AvgSize = 21917},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18000, .MaxSize = 143360, .AvgSize = 29751},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18000, .MaxSize = 143360, .AvgSize = 33668},
+ ChunkedParams{.UseThreshold = false, .MinSize = 17952, .MaxSize = 143360, .AvgSize = 37547},
+ ChunkedParams{.UseThreshold = false, .MinSize = 17904, .MaxSize = 143360, .AvgSize = 37504},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18336, .MaxSize = 147456, .AvgSize = 22371},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18000, .MaxSize = 143360, .AvgSize = 37585},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18336, .MaxSize = 147456, .AvgSize = 26406},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18384, .MaxSize = 147456, .AvgSize = 26450},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18528, .MaxSize = 147456, .AvgSize = 30615},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18336, .MaxSize = 147456, .AvgSize = 30441},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18384, .MaxSize = 147456, .AvgSize = 22417},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18528, .MaxSize = 147456, .AvgSize = 22557},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18432, .MaxSize = 147456, .AvgSize = 30528},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18816, .MaxSize = 151552, .AvgSize = 27112},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18528, .MaxSize = 147456, .AvgSize = 34644},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18336, .MaxSize = 147456, .AvgSize = 34476},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18816, .MaxSize = 151552, .AvgSize = 35408},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18432, .MaxSize = 147456, .AvgSize = 38592},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18384, .MaxSize = 147456, .AvgSize = 30483},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18528, .MaxSize = 147456, .AvgSize = 26586},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18432, .MaxSize = 147456, .AvgSize = 26496},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18864, .MaxSize = 151552, .AvgSize = 31302},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18384, .MaxSize = 147456, .AvgSize = 34516},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18816, .MaxSize = 151552, .AvgSize = 22964},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18864, .MaxSize = 151552, .AvgSize = 35448},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18480, .MaxSize = 147456, .AvgSize = 38630},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18864, .MaxSize = 151552, .AvgSize = 23010},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18816, .MaxSize = 151552, .AvgSize = 31260},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18480, .MaxSize = 147456, .AvgSize = 34600},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18864, .MaxSize = 151552, .AvgSize = 27156},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18480, .MaxSize = 147456, .AvgSize = 30570},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18384, .MaxSize = 147456, .AvgSize = 38549},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18480, .MaxSize = 147456, .AvgSize = 22510},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18528, .MaxSize = 147456, .AvgSize = 38673},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18432, .MaxSize = 147456, .AvgSize = 34560},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18432, .MaxSize = 147456, .AvgSize = 22464},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18480, .MaxSize = 147456, .AvgSize = 26540},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18336, .MaxSize = 147456, .AvgSize = 38511},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18912, .MaxSize = 151552, .AvgSize = 23057},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18912, .MaxSize = 151552, .AvgSize = 27202},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18912, .MaxSize = 151552, .AvgSize = 31347},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18912, .MaxSize = 151552, .AvgSize = 35492},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18960, .MaxSize = 151552, .AvgSize = 31389},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18960, .MaxSize = 151552, .AvgSize = 27246},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18960, .MaxSize = 151552, .AvgSize = 23103},
+ ChunkedParams{.UseThreshold = false, .MinSize = 18960, .MaxSize = 151552, .AvgSize = 35532},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19008, .MaxSize = 151552, .AvgSize = 23150},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19008, .MaxSize = 151552, .AvgSize = 27292},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19008, .MaxSize = 151552, .AvgSize = 31434},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19008, .MaxSize = 151552, .AvgSize = 35576},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19056, .MaxSize = 151552, .AvgSize = 27336},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19056, .MaxSize = 151552, .AvgSize = 23196},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19056, .MaxSize = 151552, .AvgSize = 31476},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19056, .MaxSize = 151552, .AvgSize = 35616},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19344, .MaxSize = 155648, .AvgSize = 27862},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19344, .MaxSize = 155648, .AvgSize = 32121},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19344, .MaxSize = 155648, .AvgSize = 23603},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19344, .MaxSize = 155648, .AvgSize = 36380},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19392, .MaxSize = 155648, .AvgSize = 27908},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19392, .MaxSize = 155648, .AvgSize = 23650},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19392, .MaxSize = 155648, .AvgSize = 32166},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19392, .MaxSize = 155648, .AvgSize = 36424},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19440, .MaxSize = 155648, .AvgSize = 23696},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19488, .MaxSize = 155648, .AvgSize = 32253},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19440, .MaxSize = 155648, .AvgSize = 32208},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19488, .MaxSize = 155648, .AvgSize = 23743},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19536, .MaxSize = 155648, .AvgSize = 36548},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19536, .MaxSize = 155648, .AvgSize = 28042},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19536, .MaxSize = 155648, .AvgSize = 23789},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19536, .MaxSize = 155648, .AvgSize = 32295},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19488, .MaxSize = 155648, .AvgSize = 36508},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19440, .MaxSize = 155648, .AvgSize = 27952},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19488, .MaxSize = 155648, .AvgSize = 27998},
+ ChunkedParams{.UseThreshold = false, .MinSize = 19440, .MaxSize = 155648, .AvgSize = 36464}};
+
+ static const size_t ParamsCount = sizeof(Params) / sizeof(ChunkedParams);
+ std::vector<ChunkedInfoWithSource> Infos1(SourceFiles1.size());
+ std::vector<ChunkedInfoWithSource> Infos2(SourceFiles2.size());
+
+ WorkerThreadPool WorkerPool(32);
+
+ for (size_t I = 0; I < ParamsCount; I++)
+ {
+ for (int UseThreshold = 0; UseThreshold < 2; UseThreshold++)
+ {
+ Latch WorkLatch(1);
+ ChunkedParams Param = Params[I];
+ Param.UseThreshold = UseThreshold == 1;
+ Stopwatch Timer;
+ for (size_t F = 0; F < SourceFiles1.size(); F++)
+ {
+ WorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&WorkLatch, F, Param, &SourceFiles1, &Infos1]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ BasicFile SourceData1;
+ SourceData1.Open(SourceFiles1[F], BasicFile::Mode::kRead);
+ Infos1[F] = ChunkData(SourceData1, 0, SourceData1.FileSize(), Param);
+ });
+ }
+ for (size_t F = 0; F < SourceFiles2.size(); F++)
+ {
+ WorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&WorkLatch, F, Param, &SourceFiles2, &Infos2]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ BasicFile SourceData2;
+ SourceData2.Open(SourceFiles2[F], BasicFile::Mode::kRead);
+ Infos2[F] = ChunkData(SourceData2, 0, SourceData2.FileSize(), Param);
+ });
+ }
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+ uint64_t ChunkTimeMS = Timer.GetElapsedTimeMs();
+
+ uint64_t Raw1Size = 0;
+ tsl::robin_set<IoHash> Chunks1;
+ size_t ChunkedSize1 = 0;
+ for (size_t F = 0; F < SourceFiles1.size(); F++)
+ {
+ const ChunkedInfoWithSource& Info = Infos1[F];
+ Raw1Size += Info.Info.RawSize;
+ for (uint32_t Chunk1Index = 0; Chunk1Index < Info.Info.ChunkHashes.size(); ++Chunk1Index)
+ {
+ const IoHash ChunkHash = Info.Info.ChunkHashes[Chunk1Index];
+ if (Chunks1.insert(ChunkHash).second)
+ {
+ ChunkedSize1 += Info.ChunkSources[Chunk1Index].Size;
+ }
+ }
+ }
+
+ uint64_t Raw2Size = 0;
+ tsl::robin_set<IoHash> Chunks2;
+ size_t ChunkedSize2 = 0;
+ size_t DiffSize = 0;
+ for (size_t F = 0; F < SourceFiles2.size(); F++)
+ {
+ const ChunkedInfoWithSource& Info = Infos2[F];
+ Raw2Size += Info.Info.RawSize;
+ for (uint32_t Chunk2Index = 0; Chunk2Index < Info.Info.ChunkHashes.size(); ++Chunk2Index)
+ {
+ const IoHash ChunkHash = Info.Info.ChunkHashes[Chunk2Index];
+ if (Chunks2.insert(ChunkHash).second)
+ {
+ ChunkedSize2 += Info.ChunkSources[Chunk2Index].Size;
+ if (!Chunks1.contains(ChunkHash))
+ {
+ DiffSize += Info.ChunkSources[Chunk2Index].Size;
+ }
+ }
+ }
+ }
+
+ ZEN_INFO(
+ "Diff = {}, Chunks1 = {}, Chunks2 = {}, .UseThreshold = {}, .MinSize = {}, .MaxSize = {}, .AvgSize = {}, RawSize(1) = {}, "
+ "RawSize(2) = {}, "
+ "Saved(1) = {}, Saved(2) = {} in {}",
+ NiceBytes(DiffSize),
+ Chunks1.size(),
+ Chunks2.size(),
+ Param.UseThreshold,
+ Param.MinSize,
+ Param.MaxSize,
+ Param.AvgSize,
+ NiceBytes(Raw1Size),
+ NiceBytes(Raw2Size),
+ NiceBytes(Raw1Size - ChunkedSize1),
+ NiceBytes(Raw2Size - ChunkedSize2),
+ NiceTimeSpanMs(ChunkTimeMS));
+ }
+ }
+
+# if 0
+ for (int64_t MinSizeBase = (12u * 1024u); MinSizeBase <= (32u * 1024u); MinSizeBase += 512)
+ {
+ for (int64_t Wiggle = -132; Wiggle < 126; Wiggle += 2)
+ {
+ // size_t MinSize = 7 * 1024 - 61; // (size_t)(MinSizeBase + Wiggle);
+ // size_t MaxSize = 16 * (7 * 1024); // 8 * 7 * 1024;// MinSizeBase * 6;
+ // size_t AvgSize = MaxSize / 2; // 4 * 7 * 1024;// MinSizeBase * 3;
+ size_t MinSize = (size_t)(MinSizeBase + Wiggle);
+ //for (size_t MaxSize = (MinSize * 4) - 768; MaxSize < (MinSize * 5) + 768; MaxSize += 64)
+ size_t MaxSize = 8u * MinSizeBase;
+ {
+ for (size_t AvgSize = (MaxSize - MinSize) / 32 + MinSize; AvgSize < (MaxSize - MinSize) / 4 + MinSize; AvgSize += (MaxSize - MinSize) / 32)
+// size_t AvgSize = (MaxSize - MinSize) / 4 + MinSize;
+ {
+ WorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&WorkLatch, MinSize, MaxSize, AvgSize, SourcePath1, SourcePath2]()
+ {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ ChunkedParams Params{ .UseThreshold = true, .MinSize = MinSize, .MaxSize = MaxSize, .AvgSize = AvgSize };
+ BasicFile SourceData1;
+ SourceData1.Open(SourcePath1, BasicFile::Mode::kRead);
+ BasicFile SourceData2;
+ SourceData2.Open(SourcePath2, BasicFile::Mode::kRead);
+ ChunkedInfoWithSource Info1 = ChunkData(SourceData1, Params);
+ ChunkedInfoWithSource Info2 = ChunkData(SourceData2, Params);
+
+ tsl::robin_set<IoHash> Chunks1;
+ Chunks1.reserve(Info1.Info.ChunkHashes.size());
+ Chunks1.insert(Info1.Info.ChunkHashes.begin(), Info1.Info.ChunkHashes.end());
+ size_t ChunkedSize1 = 0;
+ for (uint32_t Chunk1Index = 0; Chunk1Index < Info1.Info.ChunkHashes.size(); ++Chunk1Index)
+ {
+ ChunkedSize1 += Info1.ChunkSources[Chunk1Index].Size;
+ }
+ size_t DiffSavedSize = 0;
+ size_t ChunkedSize2 = 0;
+ for (uint32_t Chunk2Index = 0; Chunk2Index < Info2.Info.ChunkHashes.size(); ++Chunk2Index)
+ {
+ ChunkedSize2 += Info2.ChunkSources[Chunk2Index].Size;
+ if (Chunks1.find(Info2.Info.ChunkHashes[Chunk2Index]) == Chunks1.end())
+ {
+ DiffSavedSize += Info2.ChunkSources[Chunk2Index].Size;
+ }
+ }
+ ZEN_INFO("Diff {}, Chunks1: {}, Chunks2: {}, Min: {}, Max: {}, Avg: {}, Saved(1) {}, Saved(2) {}",
+ NiceBytes(DiffSavedSize),
+ Info1.Info.ChunkHashes.size(),
+ Info2.Info.ChunkHashes.size(),
+ MinSize,
+ MaxSize,
+ AvgSize,
+ NiceBytes(Info1.Info.RawSize - ChunkedSize1),
+ NiceBytes(Info2.Info.RawSize - ChunkedSize2));
+ });
+ }
+ }
+ }
+ }
+# endif // 0
+
+ // WorkLatch.CountDown();
+ // WorkLatch.Wait();
+}
+# endif // 0
+
+void
+chunkedfile_forcelink()
+{
+}
+
+} // namespace zen
+
+#endif
diff --git a/src/zenstore/chunking.cpp b/src/zenstore/chunking.cpp
index 80674de0a..30edd322a 100644
--- a/src/zenstore/chunking.cpp
+++ b/src/zenstore/chunking.cpp
@@ -36,7 +36,7 @@ static const uint32_t BuzhashTable[] = {
};
// ROL operation (compiler turns this into a ROL when optimizing)
-static inline uint32_t
+ZEN_FORCEINLINE static uint32_t
Rotate32(uint32_t Value, size_t RotateCount)
{
RotateCount &= 31;
diff --git a/src/zenstore/include/zenstore/chunkedfile.h b/src/zenstore/include/zenstore/chunkedfile.h
new file mode 100644
index 000000000..c6330bdbd
--- /dev/null
+++ b/src/zenstore/include/zenstore/chunkedfile.h
@@ -0,0 +1,54 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/zencore.h>
+
+#include <functional>
+#include <vector>
+
+namespace zen {
+
+class BasicFile;
+
+struct ChunkedInfo
+{
+ uint64_t RawSize = 0;
+ IoHash RawHash;
+ std::vector<uint32_t> ChunkSequence;
+ std::vector<IoHash> ChunkHashes;
+};
+
+struct ChunkSource
+{
+ uint64_t Offset; // 8
+ uint32_t Size; // 4
+};
+
+struct ChunkedInfoWithSource
+{
+ ChunkedInfo Info;
+ std::vector<ChunkSource> ChunkSources;
+};
+
+struct ChunkedParams
+{
+ bool UseThreshold = true;
+ size_t MinSize = (2u * 1024u) - 128u;
+ size_t MaxSize = (16u * 1024u);
+ size_t AvgSize = (3u * 1024u);
+};
+
+static const ChunkedParams UShaderByteCodeParams = {.UseThreshold = true, .MinSize = 17280, .MaxSize = 139264, .AvgSize = 36340};
+
+ChunkedInfoWithSource ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Params = {});
+void Reconstruct(const ChunkedInfo& Info,
+ const std::filesystem::path& TargetPath,
+ std::function<IoBuffer(const IoHash& ChunkHash)> GetChunk);
+IoBuffer SerializeChunkedInfo(const ChunkedInfo& Info);
+ChunkedInfo DeserializeChunkedInfo(IoBuffer& Buffer);
+
+void chunkedfile_forcelink();
+} // namespace zen
diff --git a/src/zenutil/basicfile.cpp b/src/zenutil/basicfile.cpp
index ad98bf652..f553fe5a0 100644
--- a/src/zenutil/basicfile.cpp
+++ b/src/zenutil/basicfile.cpp
@@ -272,7 +272,7 @@ BasicFile::Write(CompositeBuffer Data, uint64_t FileOffset, std::error_code& Ec)
for (const SharedBuffer& Buffer : Data.GetSegments())
{
MemoryView BlockView = Buffer.GetView();
- Write(BlockView, FileOffset, Ec);
+ Write(BlockView, FileOffset + WrittenBytes, Ec);
if (Ec)
{
@@ -490,6 +490,14 @@ BasicFile::SetFileSize(uint64_t FileSize)
#endif
}
+void
+BasicFile::Attach(void* Handle)
+{
+ ZEN_ASSERT(Handle != nullptr);
+ ZEN_ASSERT(m_FileHandle == nullptr);
+ m_FileHandle = Handle;
+}
+
void*
BasicFile::Detach()
{
@@ -716,7 +724,7 @@ BasicFileWriter::~BasicFileWriter()
}
void
-BasicFileWriter::Write(void* Data, uint64_t Size, uint64_t FileOffset)
+BasicFileWriter::Write(const void* Data, uint64_t Size, uint64_t FileOffset)
{
if (m_Buffer == nullptr || (Size >= m_BufferSize))
{
diff --git a/src/zenutil/include/zenutil/basicfile.h b/src/zenutil/include/zenutil/basicfile.h
index f25d9f23c..0e4295ee3 100644
--- a/src/zenutil/include/zenutil/basicfile.h
+++ b/src/zenutil/include/zenutil/basicfile.h
@@ -65,6 +65,7 @@ public:
void SetFileSize(uint64_t FileSize);
IoBuffer ReadAll();
void WriteAll(IoBuffer Data, std::error_code& Ec);
+ void Attach(void* Handle);
void* Detach();
inline void* Handle() { return m_FileHandle; }
@@ -165,7 +166,7 @@ public:
BasicFileWriter(BasicFile& Base, uint64_t BufferSize);
~BasicFileWriter();
- void Write(void* Data, uint64_t Size, uint64_t FileOffset);
+ void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
void Flush();
private: