aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/remoteprojectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-05 18:53:44 -0400
committerGitHub <[email protected]>2023-09-06 00:53:44 +0200
commit832a1b464633ec7a31a8aad386520e1990d0b6cb (patch)
treea07ba97f28fbe90e5aac8ea5d086f687e7aa38bd /src/zenserver/projectstore/remoteprojectstore.cpp
parentretry file create (#383) (diff)
downloadzen-832a1b464633ec7a31a8aad386520e1990d0b6cb.tar.xz
zen-832a1b464633ec7a31a8aad386520e1990d0b6cb.zip
stream oplog attachments from jupiter (#384)
* stream large downloads from jupiter to temporary file * rework DeleteOnClose - top level marks file for delete and if lower level parts wants to keep it it clears that flag * changelog * log number of attachments to download * add delay on jupiter request failure when retrying * make sure we upload all attachments even if Needs are empty when ForceUpload is true release TempAttachment as soon as it is used * sort attachments so we get predictable blocks for the same oplog
Diffstat (limited to 'src/zenserver/projectstore/remoteprojectstore.cpp')
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp131
1 files changed, 75 insertions, 56 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index bbf3a9f32..f10d7da63 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -298,9 +298,9 @@ BuildContainer(CidStore& ChunkStore,
Offset += Buffer.GetSize();
}
void* FileHandle = BlockFile.Detach();
- AttachmentBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset);
+ AttachmentBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true);
- AttachmentBuffer.MarkAsDeleteOnClose();
+ AttachmentBuffer.SetDeleteOnClose(true);
ZEN_DEBUG("Saved temp attachment {}, {}", DataHash, NiceBytes(PayloadSize));
}
OutLooseAttachments->insert_or_assign(DataHash, AttachmentBuffer);
@@ -389,13 +389,24 @@ BuildContainer(CidStore& ChunkStore,
return ChunkStore.FindChunkByCid(AttachmentHash);
};
- for (const auto& It : Attachments)
+ // Sort attachments so we get predictable blocks for the same oplog upload
+ std::vector<IoHash> SortedAttachments;
+ SortedAttachments.reserve(Attachments.size());
+ for (const auto It : Attachments)
{
- const IoHash& AttachmentHash(It.first);
- IoBuffer Payload = GetPayload(AttachmentHash);
+ SortedAttachments.push_back(It.first);
+ }
+ std::sort(SortedAttachments.begin(), SortedAttachments.end());
+
+ for (const IoHash& AttachmentHash : SortedAttachments)
+ {
+ IoBuffer Payload = GetPayload(AttachmentHash);
if (!Payload)
{
- std::optional<CbObject> Op = Oplog.GetOpByIndex(It.second);
+ auto It = Attachments.find(AttachmentHash);
+ ZEN_ASSERT(It != Attachments.end());
+
+ std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second);
ZEN_ASSERT(Op.has_value());
ExtendableStringBuilder<1024> Sb;
Sb.Append("Failed to find attachment '");
@@ -672,7 +683,7 @@ SaveOplog(CidStore& ChunkStore,
Offset += Buffer.GetSize();
}
void* FileHandle = BlockFile.Detach();
- BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset);
+ BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true);
}
catch (std::exception& Ex)
{
@@ -682,7 +693,7 @@ SaveOplog(CidStore& ChunkStore,
return;
}
- BlockBuffer.MarkAsDeleteOnClose();
+ BlockBuffer.SetDeleteOnClose(true);
{
RwLock::ExclusiveLockScope __(AttachmentsLock);
CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)});
@@ -757,7 +768,7 @@ SaveOplog(CidStore& ChunkStore,
ZEN_DEBUG("Saved container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000)));
}
- if (!ContainerSaveResult.Needs.empty())
+ if (!ContainerSaveResult.Needs.empty() || ForceUpload)
{
ZEN_INFO("Filtering needed attachments...");
std::vector<IoHash> NeededLargeAttachments;
@@ -766,6 +777,7 @@ SaveOplog(CidStore& ChunkStore,
NeededOtherAttachments.reserve(CreatedBlocks.size());
if (ForceUpload)
{
+ // TODO: Check ForceUpload - it should add all attachments and blocks regardless if Needs is empty or not
NeededLargeAttachments.insert(NeededLargeAttachments.end(), LargeAttachments.begin(), LargeAttachments.end());
}
else
@@ -791,59 +803,63 @@ SaveOplog(CidStore& ChunkStore,
{
break;
}
- SaveAttachmentsLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks, &TempAttachments]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
+ IoBuffer Payload;
+ if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
+ {
+ Payload = std::move(BlockIt->second);
+ }
+ else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end())
+ {
+ Payload = LooseTmpFileIt->second;
+ TempAttachments.erase(LooseTmpFileIt);
+ }
- IoBuffer Payload;
- if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
- {
- Payload = std::move(BlockIt->second);
- }
- else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end())
- {
- Payload = std::move(LooseTmpFileIt->second);
- }
- else
- {
- Payload = ChunkStore.FindChunkByCid(RawHash);
- }
- if (!Payload)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to find attachment {}", RawHash),
- {});
- ZEN_ERROR("Failed to build container ({}). Reason: '{}'",
- RemoteResult.GetErrorReason(),
- RemoteResult.GetError());
- return;
- }
+ SaveAttachmentsLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&ChunkStore,
+ &RemoteStore,
+ &SaveAttachmentsLatch,
+ &RemoteResult,
+ RawHash,
+ &CreatedBlocks,
+ TempPayload = std::move(Payload)]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash);
+ if (!Payload)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to find attachment {}", RawHash),
+ {});
+ ZEN_ERROR("Failed to build container ({}). Reason: '{}'",
+ RemoteResult.GetErrorReason(),
+ RemoteResult.GetError());
+ return;
+ }
- RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
- if (Result.ErrorCode)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'",
- RawHash,
- NiceBytes(Payload.GetSize()),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
- return;
- }
- ZEN_DEBUG("Saved attachment {}, {} in {}",
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'",
RawHash,
NiceBytes(Payload.GetSize()),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
return;
- });
+ }
+ ZEN_DEBUG("Saved attachment {}, {} in {}",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ return;
+ });
}
}
+ TempAttachments.clear();
if (!CreatedBlocks.empty())
{
@@ -967,7 +983,6 @@ SaveOplog(CidStore& ChunkStore,
ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining());
}
SaveAttachmentsLatch.Wait();
- TempAttachments.clear();
}
if (!RemoteResult.IsError())
@@ -1224,6 +1239,10 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O
RemoteProjectStore::Result Result =
SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment);
+ if (!Attachments.empty())
+ {
+ ZEN_INFO("Found {} attachments to download", Attachments.size());
+ }
AttachmentsWorkLatch.CountDown();
while (!AttachmentsWorkLatch.Wait(1000))