aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/remoteprojectstore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/projectstore/remoteprojectstore.cpp')
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp162
1 files changed, 90 insertions, 72 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index e922fcf1c..0aa8df362 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -1928,7 +1928,6 @@ SaveOplog(CidStore& ChunkStore,
RemoteStoreInfo.CreateBlocks,
IgnoreMissingAttachments,
RemoteStoreInfo.AllowChunking,
-
KnownBlocks,
WorkerPool,
OnBlock,
@@ -2103,6 +2102,7 @@ SaveOplog(CidStore& ChunkStore,
RemoteProjectStore::Result
ParseOplogContainer(const CbObject& ContainerObject,
+ const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
const std::function<bool(const IoHash& RawHash)>& HasAttachment,
const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
@@ -2114,6 +2114,71 @@ ParseOplogContainer(const CbObject& ContainerObject,
Stopwatch Timer;
+ MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
+ IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
+ IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer();
+
+ {
+ CbObject SectionObject = LoadCompactBinaryObject(SectionPayload);
+ if (!SectionObject)
+ {
+ ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type"));
+ return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest),
+ Timer.GetElapsedTimeMs() / 1000.0,
+ "Section has unexpected data type",
+ "Failed to save oplog container"};
+ }
+ OutOplogSection = SectionObject;
+ }
+ std::unordered_set<IoHash, IoHash::Hasher> OpsAttachments;
+ {
+ CbArrayView OpsArray = OutOplogSection["ops"sv].AsArrayView();
+ for (CbFieldView OpEntry : OpsArray)
+ {
+ OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); });
+ }
+ }
+ {
+ std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end());
+ OnReferencedAttachments(ReferencedAttachments);
+ }
+ ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size()));
+
+ CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView();
+ for (CbFieldView ChunkedFileField : ChunkedFilesArray)
+ {
+ CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView();
+ IoHash RawHash = ChunkedFileView["rawhash"sv].AsHash();
+ if (OpsAttachments.contains(RawHash) && (!HasAttachment(RawHash)))
+ {
+ ChunkedInfo Chunked;
+ Chunked.RawHash = RawHash;
+ Chunked.RawSize = ChunkedFileView["rawsize"sv].AsUInt64();
+ CbArrayView ChunksArray = ChunkedFileView["chunks"sv].AsArrayView();
+ Chunked.ChunkHashes.reserve(ChunksArray.Num());
+ for (CbFieldView ChunkField : ChunksArray)
+ {
+ const IoHash ChunkHash = ChunkField.AsHash();
+ Chunked.ChunkHashes.emplace_back(ChunkHash);
+ }
+ OnReferencedAttachments(Chunked.ChunkHashes);
+ OpsAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end());
+ CbArrayView SequenceArray = ChunkedFileView["sequence"sv].AsArrayView();
+ Chunked.ChunkSequence.reserve(SequenceArray.Num());
+ for (CbFieldView SequenceField : SequenceArray)
+ {
+ uint32_t SequenceIndex = SequenceField.AsUInt32();
+ ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size());
+ Chunked.ChunkSequence.push_back(SequenceIndex);
+ }
+ OnChunkedAttachment(Chunked);
+ ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks",
+ Chunked.RawHash,
+ NiceBytes(Chunked.RawSize),
+ Chunked.ChunkHashes.size());
+ }
+ }
+
size_t NeedBlockCount = 0;
CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
for (CbFieldView BlockField : BlocksArray)
@@ -2130,11 +2195,10 @@ ParseOplogContainer(const CbObject& ContainerObject,
for (CbFieldView ChunkField : ChunksArray)
{
IoHash ChunkHash = ChunkField.AsBinaryAttachment();
- if (HasAttachment(ChunkHash))
+ if (OpsAttachments.contains(ChunkHash) && !HasAttachment(ChunkHash))
{
- continue;
+ NeededChunks.emplace_back(ChunkHash);
}
- NeededChunks.emplace_back(ChunkHash);
}
}
else
@@ -2142,11 +2206,10 @@ ParseOplogContainer(const CbObject& ContainerObject,
for (CbFieldView ChunkField : ChunksArray)
{
const IoHash ChunkHash = ChunkField.AsHash();
- if (HasAttachment(ChunkHash))
+ if (OpsAttachments.contains(ChunkHash) && !HasAttachment(ChunkHash))
{
- continue;
+ NeededChunks.emplace_back(ChunkHash);
}
- NeededChunks.emplace_back(ChunkHash);
}
}
@@ -2166,60 +2229,14 @@ ParseOplogContainer(const CbObject& ContainerObject,
for (CbFieldView LargeChunksField : LargeChunksArray)
{
IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
- if (HasAttachment(AttachmentHash))
+ if (OpsAttachments.contains(AttachmentHash) && !HasAttachment(AttachmentHash))
{
- continue;
+ OnNeedAttachment(AttachmentHash);
+ NeedAttachmentCount++;
}
- OnNeedAttachment(AttachmentHash);
- NeedAttachmentCount++;
};
ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num()));
- CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView();
- for (CbFieldView ChunkedFileField : ChunkedFilesArray)
- {
- CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView();
- ChunkedInfo Chunked;
- Chunked.RawHash = ChunkedFileView["rawhash"sv].AsHash();
- Chunked.RawSize = ChunkedFileView["rawsize"sv].AsUInt64();
- CbArrayView ChunksArray = ChunkedFileView["chunks"sv].AsArrayView();
- Chunked.ChunkHashes.reserve(ChunksArray.Num());
- for (CbFieldView ChunkField : ChunksArray)
- {
- const IoHash ChunkHash = ChunkField.AsHash();
- Chunked.ChunkHashes.emplace_back(ChunkHash);
- }
- CbArrayView SequenceArray = ChunkedFileView["sequence"sv].AsArrayView();
- Chunked.ChunkSequence.reserve(SequenceArray.Num());
- for (CbFieldView SequenceField : SequenceArray)
- {
- uint32_t SequenceIndex = SequenceField.AsUInt32();
- ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size());
- Chunked.ChunkSequence.push_back(SequenceIndex);
- }
- OnChunkedAttachment(Chunked);
- ZEN_INFO("Found chunked attachment '{}' ({}) built from {} chunks",
- Chunked.RawHash,
- NiceBytes(Chunked.RawSize),
- Chunked.ChunkHashes.size());
- }
-
- MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
- IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
- IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer();
-
- {
- CbObject SectionObject = LoadCompactBinaryObject(SectionPayload);
- if (!SectionObject)
- {
- ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type"));
- return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest),
- Timer.GetElapsedTimeMs() / 1000.0,
- "Section has unexpected data type",
- "Failed to save oplog container"};
- }
- OutOplogSection = SectionObject;
- }
return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0};
}
@@ -2283,6 +2300,7 @@ WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject,
RemoteProjectStore::Result
SaveOplogContainer(ProjectStore::Oplog& Oplog,
const CbObject& ContainerObject,
+ const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
const std::function<bool(const IoHash& RawHash)>& HasAttachment,
const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
@@ -2294,6 +2312,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
Stopwatch Timer;
CbObject OplogSection;
RemoteProjectStore::Result Result = ParseOplogContainer(ContainerObject,
+ OnReferencedAttachments,
HasAttachment,
OnNeedBlock,
OnNeedAttachment,
@@ -2374,7 +2393,7 @@ LoadOplog(CidStore& ChunkStore,
Stopwatch LoadAttachmentsTimer;
std::atomic_uint64_t DownloadStartMS = (std::uint64_t)-1;
- auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) {
+ auto HasAttachment = [&Oplog, &ChunkStore, ForceDownload](const IoHash& RawHash) {
if (ForceDownload)
{
return false;
@@ -2386,7 +2405,6 @@ LoadOplog(CidStore& ChunkStore,
return false;
};
auto OnNeedBlock = [&RemoteStore,
- &Oplog,
&ChunkStore,
&NetworkWorkerPool,
&WorkerPool,
@@ -2405,8 +2423,6 @@ LoadOplog(CidStore& ChunkStore,
return;
}
- Oplog.CaptureAddedAttachments(std::vector<IoHash>{Chunks});
-
BlockCountToDownload++;
if (BlockHash == IoHash::Zero)
{
@@ -2645,8 +2661,6 @@ LoadOplog(CidStore& ChunkStore,
return;
}
- Oplog.CaptureAddedAttachments(std::vector<IoHash>{RawHash});
-
AttachmentsDownloadLatch.AddCount(1);
AttachmentCount.fetch_add(1);
NetworkWorkerPool.ScheduleWork([&RemoteStore,
@@ -2721,23 +2735,27 @@ LoadOplog(CidStore& ChunkStore,
std::vector<ChunkedInfo> FilesToDechunk;
auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) {
- Oplog.CaptureAddedAttachments(std::vector<IoHash>{Chunked.RawHash});
if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash))
{
- Oplog.CaptureAddedAttachments(Chunked.ChunkHashes);
FilesToDechunk.push_back(Chunked);
}
};
+ auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog.CaptureAddedAttachments(RawHashes); };
+
+ // Make sure we retain any attachments we download before writing the oplog
+ Oplog.EnableUpdateCapture();
+ auto _ = MakeGuard([&Oplog]() { Oplog.DisableUpdateCapture(); });
+
CbObject OplogSection;
- RemoteProjectStore::Result Result = // SaveOplogContainer(Oplog,
- ParseOplogContainer(LoadContainerResult.ContainerObject,
- HasAttachment,
- OnNeedBlock,
- OnNeedAttachment,
- OnChunkedAttachment,
- OplogSection,
- OptionalContext);
+ RemoteProjectStore::Result Result = ParseOplogContainer(LoadContainerResult.ContainerObject,
+ OnReferencedAttachments,
+ HasAttachment,
+ OnNeedBlock,
+ OnNeedAttachment,
+ OnChunkedAttachment,
+ OplogSection,
+ OptionalContext);
if (Result.ErrorCode != 0)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);