diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-05 18:53:44 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-06 00:53:44 +0200 |
| commit | 832a1b464633ec7a31a8aad386520e1990d0b6cb (patch) | |
| tree | a07ba97f28fbe90e5aac8ea5d086f687e7aa38bd /src/zenserver/projectstore/remoteprojectstore.cpp | |
| parent | retry file create (#383) (diff) | |
| download | zen-832a1b464633ec7a31a8aad386520e1990d0b6cb.tar.xz zen-832a1b464633ec7a31a8aad386520e1990d0b6cb.zip | |
stream oplog attachments from jupiter (#384)
* stream large downloads from jupiter to temporary file
* rework DeleteOnClose - top level marks file for delete and if lower level parts wants to keep it it clears that flag
* changelog
* log number of attachments to download
* add delay on jupiter request failure when retrying
* make sure we upload all attachments even if Needs are empty when ForceUpload is true
release TempAttachment as soon as it is used
* sort attachments so we get predictable blocks for the same oplog
Diffstat (limited to 'src/zenserver/projectstore/remoteprojectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 131 |
1 files changed, 75 insertions, 56 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index bbf3a9f32..f10d7da63 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -298,9 +298,9 @@ BuildContainer(CidStore& ChunkStore, Offset += Buffer.GetSize(); } void* FileHandle = BlockFile.Detach(); - AttachmentBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset); + AttachmentBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true); - AttachmentBuffer.MarkAsDeleteOnClose(); + AttachmentBuffer.SetDeleteOnClose(true); ZEN_DEBUG("Saved temp attachment {}, {}", DataHash, NiceBytes(PayloadSize)); } OutLooseAttachments->insert_or_assign(DataHash, AttachmentBuffer); @@ -389,13 +389,24 @@ BuildContainer(CidStore& ChunkStore, return ChunkStore.FindChunkByCid(AttachmentHash); }; - for (const auto& It : Attachments) + // Sort attachments so we get predictable blocks for the same oplog upload + std::vector<IoHash> SortedAttachments; + SortedAttachments.reserve(Attachments.size()); + for (const auto It : Attachments) { - const IoHash& AttachmentHash(It.first); - IoBuffer Payload = GetPayload(AttachmentHash); + SortedAttachments.push_back(It.first); + } + std::sort(SortedAttachments.begin(), SortedAttachments.end()); + + for (const IoHash& AttachmentHash : SortedAttachments) + { + IoBuffer Payload = GetPayload(AttachmentHash); if (!Payload) { - std::optional<CbObject> Op = Oplog.GetOpByIndex(It.second); + auto It = Attachments.find(AttachmentHash); + ZEN_ASSERT(It != Attachments.end()); + + std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second); ZEN_ASSERT(Op.has_value()); ExtendableStringBuilder<1024> Sb; Sb.Append("Failed to find attachment '"); @@ -672,7 +683,7 @@ SaveOplog(CidStore& ChunkStore, Offset += Buffer.GetSize(); } void* FileHandle = BlockFile.Detach(); - BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset); + BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true); } catch (std::exception& Ex) { @@ -682,7 +693,7 @@ SaveOplog(CidStore& ChunkStore, return; } - BlockBuffer.MarkAsDeleteOnClose(); + BlockBuffer.SetDeleteOnClose(true); { RwLock::ExclusiveLockScope __(AttachmentsLock); CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)}); @@ -757,7 +768,7 @@ SaveOplog(CidStore& ChunkStore, ZEN_DEBUG("Saved container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000))); } - if (!ContainerSaveResult.Needs.empty()) + if (!ContainerSaveResult.Needs.empty() || ForceUpload) { ZEN_INFO("Filtering needed attachments..."); std::vector<IoHash> NeededLargeAttachments; @@ -766,6 +777,7 @@ SaveOplog(CidStore& ChunkStore, NeededOtherAttachments.reserve(CreatedBlocks.size()); if (ForceUpload) { + // TODO: Check ForceUpload - it should add all attachments and blocks regardless if Needs is empty or not NeededLargeAttachments.insert(NeededLargeAttachments.end(), LargeAttachments.begin(), LargeAttachments.end()); } else @@ -791,59 +803,63 @@ SaveOplog(CidStore& ChunkStore, { break; } - SaveAttachmentsLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks, &TempAttachments]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } + IoBuffer Payload; + if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) + { + Payload = std::move(BlockIt->second); + } + else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end()) + { + Payload = LooseTmpFileIt->second; + TempAttachments.erase(LooseTmpFileIt); + } - IoBuffer Payload; - if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) - { - Payload = std::move(BlockIt->second); - } - else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end()) - { - Payload = std::move(LooseTmpFileIt->second); - } - else - { - Payload = ChunkStore.FindChunkByCid(RawHash); - } - if (!Payload) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to find attachment {}", RawHash), - {}); - ZEN_ERROR("Failed to build container ({}). Reason: '{}'", - RemoteResult.GetErrorReason(), - RemoteResult.GetError()); - return; - } + SaveAttachmentsLatch.AddCount(1); + WorkerPool.ScheduleWork([&ChunkStore, + &RemoteStore, + &SaveAttachmentsLatch, + &RemoteResult, + RawHash, + &CreatedBlocks, + TempPayload = std::move(Payload)]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash); + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {}", RawHash), + {}); + ZEN_ERROR("Failed to build container ({}). Reason: '{}'", + RemoteResult.GetErrorReason(), + RemoteResult.GetError()); + return; + } - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", - RawHash, - NiceBytes(Payload.GetSize()), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - ZEN_DEBUG("Saved attachment {}, {} in {}", + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", RawHash, NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); return; - }); + } + ZEN_DEBUG("Saved attachment {}, {} in {}", + RawHash, + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return; + }); } } + TempAttachments.clear(); if (!CreatedBlocks.empty()) { @@ -967,7 +983,6 @@ SaveOplog(CidStore& ChunkStore, ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining()); } SaveAttachmentsLatch.Wait(); - TempAttachments.clear(); } if (!RemoteResult.IsError()) @@ -1224,6 +1239,10 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O RemoteProjectStore::Result Result = SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); + if (!Attachments.empty()) + { + ZEN_INFO("Found {} attachments to download", Attachments.size()); + } AttachmentsWorkLatch.CountDown(); while (!AttachmentsWorkLatch.Wait(1000)) |