aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-02-12 12:50:37 +0100
committerGitHub <[email protected]>2024-02-12 12:50:37 +0100
commit4a7e46cadb0c0665a88a01cbc0a679ce4027abc6 (patch)
treef43c747764009ead4628b0f17aaecaff1a5cf746 /src
parentminor docs updates (diff)
downloadzen-4a7e46cadb0c0665a88a01cbc0a679ce4027abc6.tar.xz
zen-4a7e46cadb0c0665a88a01cbc0a679ce4027abc6.zip
Save compressed large attachments to temporary files on disk (#650)
* Save large compressed large attachments to temporary files on disk * bump oplog block max size up to 64Mb again * Make sure CompositeBuffer::AppendBuffers actually moves inputs when it should * removed parallell execution of fetching payload for block assembly it was not actually helping and added complexity * make sure we move/release payload buffers as soon as possible * make sure we don't read in full large attachments to memory when computing hash
Diffstat (limited to 'src')
-rw-r--r--src/zencore/include/zencore/compositebuffer.h8
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp398
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h2
3 files changed, 219 insertions, 189 deletions
diff --git a/src/zencore/include/zencore/compositebuffer.h b/src/zencore/include/zencore/compositebuffer.h
index cc03dd156..1b6944fc0 100644
--- a/src/zencore/include/zencore/compositebuffer.h
+++ b/src/zencore/include/zencore/compositebuffer.h
@@ -125,11 +125,7 @@ private:
{
m_Segments.insert(m_Segments.end(), begin(Buffer.m_Segments), end(Buffer.m_Segments));
}
- inline void AppendBuffers(CompositeBuffer&& Buffer)
- {
- // TODO: this operates just like the by-reference version above
- m_Segments.insert(m_Segments.end(), begin(Buffer.m_Segments), end(Buffer.m_Segments));
- }
+ inline void AppendBuffers(CompositeBuffer&& Buffer) { AppendBuffers(std::move(Buffer.m_Segments)); }
static inline size_t GetBufferCount(const SharedBuffer&) { return 1; }
inline void AppendBuffers(const SharedBuffer& Buffer) { m_Segments.push_back(Buffer); }
@@ -138,7 +134,7 @@ private:
static inline size_t GetBufferCount(std::vector<SharedBuffer>&& Container) { return Container.size(); }
inline void AppendBuffers(std::vector<SharedBuffer>&& Container)
{
- m_Segments.insert(m_Segments.end(), begin(Container), end(Container));
+ m_Segments.insert(m_Segments.end(), std::make_move_iterator(Container.begin()), std::make_move_iterator(Container.end()));
}
private:
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index a8aa22526..672292290 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -163,10 +163,10 @@ GenerateBlock(std::vector<SharedBuffer>&& Chunks)
ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr);
SizeBuffer = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength)));
}
- CompositeBuffer AllBuffers(std::move(SizeBuffer), CompositeBuffer(std::move(Chunks)));
-
CompressedBuffer CompressedBlock =
- CompressedBuffer::Compress(std::move(AllBuffers), OodleCompressor::Mermaid, OodleCompressionLevel::None);
+ CompressedBuffer::Compress(CompositeBuffer(std::move(SizeBuffer), CompositeBuffer(std::move(Chunks))),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::None);
return CompressedBlock;
}
@@ -187,8 +187,6 @@ CreateBlock(WorkerThreadPool& WorkerPool,
const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
AsyncRemoteResult& RemoteResult)
{
- ZEN_INFO("Generating block with {} attachments", ChunksInBlock.size());
-
OpSectionsLatch.AddCount(1);
WorkerPool.ScheduleWork(
[&Blocks, &SectionsLock, &OpSectionsLatch, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable {
@@ -198,18 +196,25 @@ CreateBlock(WorkerThreadPool& WorkerPool,
return;
}
ZEN_ASSERT(!Chunks.empty());
- CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); // Move to callback and return IoHash
+ size_t ChunkCount = Chunks.size();
+ Stopwatch Timer;
+ CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks));
IoHash BlockHash = CompressedBlock.DecodeRawHash();
{
// We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
RwLock::SharedLockScope __(SectionsLock);
Blocks[BlockIndex].BlockHash = BlockHash;
}
+ uint64_t BlockSize = CompressedBlock.GetCompressedSize();
AsyncOnBlock(std::move(CompressedBlock), BlockHash);
+ ZEN_INFO("Generated block with {} attachments in {} ({})",
+ ChunkCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ NiceBytes(BlockSize));
});
}
-size_t
+static size_t
AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks)
{
size_t BlockIndex;
@@ -221,6 +226,44 @@ AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks)
return BlockIndex;
}
+static IoBuffer
+WriteToTempFile(CompressedBuffer&& CompressedBuffer, std::filesystem::path Path)
+{
+ if (std::filesystem::is_regular_file(Path))
+ {
+ IoBuffer ExistingTempFile = IoBuffer(IoBufferBuilder::MakeFromFile(Path));
+ if (ExistingTempFile && ExistingTempFile.GetSize() == CompressedBuffer.GetCompressedSize())
+ {
+ ExistingTempFile.SetDeleteOnClose(true);
+ return ExistingTempFile;
+ }
+ }
+ IoBuffer BlockBuffer;
+ BasicFile BlockFile;
+ uint32_t RetriesLeft = 3;
+ BlockFile.Open(Path, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) {
+ if (RetriesLeft == 0)
+ {
+ return false;
+ }
+ ZEN_WARN("Failed to create temporary oplog block '{}': '{}', retries left: {}.", Path, Ec.message(), RetriesLeft);
+ Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms
+ RetriesLeft--;
+ return true;
+ });
+
+ uint64_t Offset = 0;
+ for (const SharedBuffer& Buffer : CompressedBuffer.GetCompressed().GetSegments())
+ {
+ BlockFile.Write(Buffer.GetView(), Offset);
+ Offset += Buffer.GetSize();
+ }
+ void* FileHandle = BlockFile.Detach();
+ BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true);
+ BlockBuffer.SetDeleteOnClose(true);
+ return BlockBuffer;
+}
+
CbObject
BuildContainer(CidStore& ChunkStore,
ProjectStore::Project& Project,
@@ -255,6 +298,10 @@ BuildContainer(CidStore& ChunkStore,
std::unordered_map<IoHash, int, IoHash::Hasher> Attachments;
+ std::filesystem::path AttachmentTempPath = Oplog.TempPath();
+ AttachmentTempPath.append(".pending");
+ CreateDirectories(AttachmentTempPath);
+
auto RewriteOp = [&](int LSN, CbObjectView Op, const std::function<void(CbObjectView)>& CB) {
bool OpRewritten = false;
CbArrayView Files = Op["files"sv].AsArrayView();
@@ -298,16 +345,34 @@ BuildContainer(CidStore& ChunkStore,
}
}
SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath));
- // Large file, just hash it for now and leave compression for later via callback
- // We don't want to spend time compressing a large attachment if we don't need to upload it
- DataHash = IoHash::HashBuffer(DataBuffer);
- if (DataBuffer.GetSize() > MaxChunkEmbedSize)
+ // Loose file, just hash it for now and leave compression for later via callback
+
+ const uint64_t RawSize = DataBuffer.GetSize();
+ if (RawSize > MaxChunkEmbedSize)
{
+ IoHashStream Hasher;
+ CompositeBuffer RawData(DataBuffer);
+ UniqueBuffer RawBlockCopy;
+ CompositeBuffer::Iterator It = RawData.GetIterator(0);
+ const uint64_t BlockSize = MaxChunkEmbedSize;
+ for (uint64_t RawOffset = 0; RawOffset < RawSize;)
+ {
+ const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize);
+ const MemoryView RawBlock = RawData.ViewOrCopyRange(It, RawBlockSize, RawBlockCopy);
+ Hasher.Append(RawBlock);
+ RawOffset += RawBlockSize;
+ }
+
+ DataHash = Hasher.GetHash();
LargeChunkHashes.insert(DataHash);
}
+ else
+ {
+ DataHash = IoHash::HashBuffer(DataBuffer);
+ }
LooseAttachments.insert_or_assign(
DataHash,
- [AttachmentBuffer = std::move(DataBuffer)](const IoHash& DataHash) -> IoBuffer {
+ [AttachmentBuffer = std::move(DataBuffer), &Oplog, AttachmentTempPath](const IoHash& DataHash) -> IoBuffer {
Stopwatch AttachmentTimer;
uint64_t RawSize = AttachmentBuffer.GetSize();
CompressedBuffer Compressed =
@@ -319,7 +384,13 @@ BuildContainer(CidStore& ChunkStore,
NiceBytes(RawSize),
NiceBytes(PayloadSize),
NiceTimeSpanMs(static_cast<uint64_t>(AttachmentTimer.GetElapsedTimeMs())));
- return Compressed.GetCompressed().Flatten().AsIoBuffer();
+
+ std::filesystem::path AttachmentPath = AttachmentTempPath;
+ AttachmentPath.append(DataHash.ToHexString());
+
+ IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed), AttachmentPath);
+ ZEN_INFO("Saved temp attachment to '{}', {}", AttachmentPath, NiceBytes(TempAttachmentBuffer.GetSize()));
+ return TempAttachmentBuffer;
});
}
@@ -584,159 +655,138 @@ BuildContainer(CidStore& ChunkStore,
uint32_t ResolvedFailedCount = 0;
uint32_t ComposedBlocks = 0;
- for (auto HashIt = SortedAttachments.begin(); HashIt != SortedAttachments.end();)
+ uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs();
+
+ for (auto HashIt = SortedAttachments.begin(); HashIt != SortedAttachments.end(); HashIt++)
{
if (IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
break;
}
- ReportProgress(OptionalContext,
- fmt::format("Resolving attachments: {} large, {} small, {} blocks built",
- ResolvedLargeCount,
- ResolvedSmallCount,
- ComposedBlocks),
- SortedAttachments.size(),
- SortedAttachments.size() - (ResolvedLargeCount + ResolvedSmallCount + ResolvedFailedCount));
-
- std::vector<IoHash> SmallChunkHashes;
- SmallChunkHashes.reserve(8);
- while (HashIt != SortedAttachments.end() && SmallChunkHashes.size() < 8)
+ if ((ResolvedLargeCount + ResolvedSmallCount) % 1000 == 0)
{
- if (LargeChunkHashes.contains(*HashIt))
- {
- ResolvedLargeCount++;
- }
- else
- {
- SmallChunkHashes.push_back(*HashIt);
- }
- ++HashIt;
+ ReportProgress(OptionalContext,
+ fmt::format("Resolving attachments: {} large, {} small, {} blocks assembled",
+ ResolvedLargeCount,
+ ResolvedSmallCount,
+ ComposedBlocks),
+ SortedAttachments.size(),
+ SortedAttachments.size() - (ResolvedLargeCount + ResolvedSmallCount + ResolvedFailedCount));
}
-
- std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> ResolvedSmallChunks;
- RwLock ResolvedSmallChunksLock;
- Latch ResolvedSmallChunksLatch(1);
- for (const IoHash& SmallChunkHash : SmallChunkHashes)
+ const IoHash& AttachmentHash(*HashIt);
+ if (LargeChunkHashes.contains(AttachmentHash))
{
- ResolvedSmallChunksLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&GetPayload, SmallChunkHash, &ResolvedSmallChunksLatch, &ResolvedSmallChunksLock, &ResolvedSmallChunks]() {
- auto _ = MakeGuard([&ResolvedSmallChunksLatch] { ResolvedSmallChunksLatch.CountDown(); });
-
- IoBuffer Payload = GetPayload(SmallChunkHash);
- {
- RwLock::ExclusiveLockScope __(ResolvedSmallChunksLock);
- ResolvedSmallChunks.insert_or_assign(SmallChunkHash, std::move(Payload));
- }
- });
+ ResolvedLargeCount++;
+ continue;
}
- ResolvedSmallChunksLatch.CountDown();
- ResolvedSmallChunksLatch.Wait();
-
- for (const IoHash& AttachmentHash : SmallChunkHashes)
+ IoBuffer Payload = GetPayload(AttachmentHash);
+ if (!Payload)
{
- auto ResolvedIt = ResolvedSmallChunks.find(AttachmentHash);
- ZEN_ASSERT(ResolvedIt != ResolvedSmallChunks.end());
- IoBuffer Payload = std::move(ResolvedIt->second);
- auto It = Attachments.find(AttachmentHash);
+ auto It = Attachments.find(AttachmentHash);
ZEN_ASSERT(It != Attachments.end());
- if (!Payload)
- {
- std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second);
- ZEN_ASSERT(Op.has_value());
- ExtendableStringBuilder<1024> Sb;
- Sb.Append("Failed to find attachment '");
- Sb.Append(AttachmentHash.ToHexString());
- Sb.Append("' for op: \n");
- Op.value().ToJson(Sb);
-
- ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView()));
+ std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second);
+ ZEN_ASSERT(Op.has_value());
+ ExtendableStringBuilder<1024> Sb;
+ Sb.Append("Failed to find attachment '");
+ Sb.Append(AttachmentHash.ToHexString());
+ Sb.Append("' for op: \n");
+ Op.value().ToJson(Sb);
- if (IgnoreMissingAttachments)
- {
- ResolvedFailedCount++;
- continue;
- }
- else
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
- BlockCreateLatch.CountDown();
- while (!BlockCreateLatch.Wait(1000))
- {
- ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
- }
- return {};
- }
- }
+ ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView()));
- uint64_t PayloadSize = Payload.GetSize();
- if (PayloadSize > MaxChunkEmbedSize)
+ if (IgnoreMissingAttachments)
{
- if (LargeChunkHashes.insert(AttachmentHash).second)
- {
- OnLargeAttachment(AttachmentHash, [Payload = std::move(Payload)](const IoHash&) { return std::move(Payload); });
- LargeAttachmentCount++;
- }
- ResolvedLargeCount++;
+ ResolvedFailedCount++;
continue;
}
else
{
- ResolvedSmallCount++;
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
+ }
+ return {};
}
+ }
- if (!BlockAttachmentHashes.insert(AttachmentHash).second)
+ uint64_t PayloadSize = Payload.GetSize();
+ if (PayloadSize > MaxChunkEmbedSize)
+ {
+ if (LargeChunkHashes.insert(AttachmentHash).second)
{
- continue;
+ OnLargeAttachment(AttachmentHash, [Payload = std::move(Payload)](const IoHash&) { return std::move(Payload); });
+ LargeAttachmentCount++;
}
+ ResolvedLargeCount++;
+ continue;
+ }
+ else
+ {
+ ResolvedSmallCount++;
+ }
- const int CurrentOpLSN = It->second;
+ if (!BlockAttachmentHashes.insert(AttachmentHash).second)
+ {
+ continue;
+ }
- BlockSize += PayloadSize;
+ auto It = Attachments.find(AttachmentHash);
+ const int CurrentOpLSN = It->second;
+
+ BlockSize += PayloadSize;
+ if (BuildBlocks)
+ {
+ ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload)));
+ }
+ else
+ {
+ Payload = {};
+ }
+
+ if (BlockSize >= MaxBlockSize && (CurrentOpLSN != LastLSNOp))
+ {
+ size_t BlockIndex = AddBlock(BlocksLock, Blocks);
+ size_t ChunkCount = ChunksInBlock.size();
if (BuildBlocks)
{
- ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload)));
+ CreateBlock(WorkerPool,
+ BlockCreateLatch,
+ std::move(ChunksInBlock),
+ BlocksLock,
+ Blocks,
+ BlockIndex,
+ AsyncOnBlock,
+ RemoteResult);
+ ComposedBlocks++;
}
else
{
- Payload = {};
+ ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size());
+ OnBlockChunks(BlockAttachmentHashes);
}
-
- if (BlockSize >= MaxBlockSize && (CurrentOpLSN != LastLSNOp))
{
- size_t BlockIndex = AddBlock(BlocksLock, Blocks);
- if (BuildBlocks)
- {
- CreateBlock(WorkerPool,
- BlockCreateLatch,
- std::move(ChunksInBlock),
- BlocksLock,
- Blocks,
- BlockIndex,
- AsyncOnBlock,
- RemoteResult);
- ComposedBlocks++;
- }
- else
- {
- ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size());
- OnBlockChunks(BlockAttachmentHashes);
- }
- {
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope _(BlocksLock);
- Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
- BlockAttachmentHashes.begin(),
- BlockAttachmentHashes.end());
- }
- BlockAttachmentHashes.clear();
- ChunksInBlock.clear();
- BlockSize = 0;
- GeneratedBlockCount++;
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope _(BlocksLock);
+ Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
+ BlockAttachmentHashes.begin(),
+ BlockAttachmentHashes.end());
}
- LastLSNOp = CurrentOpLSN;
+ uint64_t NowMS = Timer.GetElapsedTimeMs();
+ ZEN_INFO("Assembled block {} with {} chunks in {} ({})",
+ BlockIndex,
+ ChunkCount,
+ NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS),
+ NiceBytes(BlockSize));
+ FetchAttachmentsStartMS = NowMS;
+ BlockAttachmentHashes.clear();
+ ChunksInBlock.clear();
+ BlockSize = 0;
+ GeneratedBlockCount++;
}
+ LastLSNOp = CurrentOpLSN;
}
if (BlockSize > 0)
@@ -744,6 +794,7 @@ BuildContainer(CidStore& ChunkStore,
if (!IsCancelled(OptionalContext))
{
size_t BlockIndex = AddBlock(BlocksLock, Blocks);
+ size_t ChunkCount = ChunksInBlock.size();
if (BuildBlocks)
{
CreateBlock(WorkerPool,
@@ -768,6 +819,14 @@ BuildContainer(CidStore& ChunkStore,
BlockAttachmentHashes.begin(),
BlockAttachmentHashes.end());
}
+ uint64_t NowMS = Timer.GetElapsedTimeMs();
+ ZEN_INFO("Assembled block {} with {} chunks in {} ({})",
+ BlockIndex,
+ ChunkCount,
+ NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS),
+ NiceBytes(BlockSize));
+ FetchAttachmentsStartMS = NowMS;
+
BlockAttachmentHashes.clear();
ChunksInBlock.clear();
BlockSize = 0;
@@ -775,7 +834,7 @@ BuildContainer(CidStore& ChunkStore,
}
}
ReportProgress(OptionalContext,
- fmt::format("Resolving attachments: {} large, {} small, {} blocks built",
+ fmt::format("Resolving attachments: {} large, {} small, {} blocks assembled",
ResolvedLargeCount,
ResolvedSmallCount,
ComposedBlocks),
@@ -1088,25 +1147,26 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
ZEN_WARN("Failed to save attachment '{}' ({}): {}", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason());
return;
}
-
- RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
+ size_t PayloadSize = Payload.GetSize();
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash);
if (Result.ErrorCode)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
ReportMessage(OptionalContext,
fmt::format("Failed to save attachment '{}', {} ({}): {}",
RawHash,
- NiceBytes(Payload.GetSize()),
+ NiceBytes(PayloadSize),
RemoteResult.GetError(),
RemoteResult.GetErrorReason()));
return;
}
Info.AttachmentsUploaded.fetch_add(1);
- Info.AttachmentBytesUploaded.fetch_add(Payload.GetSize());
+ Info.AttachmentBytesUploaded.fetch_add(PayloadSize);
ZEN_INFO("Saved large attachment '{}' in {} ({})",
RawHash,
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(Payload.GetSize()));
+ NiceBytes(PayloadSize));
return;
});
}
@@ -1149,26 +1209,28 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
return;
}
- RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
+ size_t PayloadSize = Payload.GetSize();
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash);
if (Result.ErrorCode)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
ReportMessage(OptionalContext,
fmt::format("Failed to save attachment '{}', {} ({}): {}",
RawHash,
- NiceBytes(Payload.GetSize()),
+ NiceBytes(PayloadSize),
RemoteResult.GetError(),
RemoteResult.GetErrorReason()));
return;
}
Info.AttachmentBlocksUploaded.fetch_add(1);
- Info.AttachmentBlockBytesUploaded.fetch_add(Payload.GetSize());
+ Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize);
ZEN_INFO("Saved block attachment '{}' in {} ({})",
RawHash,
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(Payload.GetSize()));
+ NiceBytes(PayloadSize));
return;
});
}
@@ -1317,47 +1379,19 @@ SaveOplog(CidStore& ChunkStore,
const IoHash& BlockHash) {
std::filesystem::path BlockPath = AttachmentTempPath;
BlockPath.append(BlockHash.ToHexString());
- if (!std::filesystem::exists(BlockPath))
+ try
{
- IoBuffer BlockBuffer;
- try
- {
- BasicFile BlockFile;
- uint32_t RetriesLeft = 3;
- BlockFile.Open(BlockPath, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) {
- if (RetriesLeft == 0)
- {
- return false;
- }
- ZEN_WARN("Failed to create temporary oplog block '{}': '{}', retries left: {}.", BlockPath, Ec.message(), RetriesLeft);
- Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms
- RetriesLeft--;
- return true;
- });
-
- uint64_t Offset = 0;
- for (const SharedBuffer& Buffer : CompressedBlock.GetCompressed().GetSegments())
- {
- BlockFile.Write(Buffer.GetView(), Offset);
- Offset += Buffer.GetSize();
- }
- void* FileHandle = BlockFile.Detach();
- BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true);
- }
- catch (std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- Ex.what(),
- "Unable to create temp block file");
- return;
- }
-
- BlockBuffer.SetDeleteOnClose(true);
- {
- RwLock::ExclusiveLockScope __(AttachmentsLock);
- CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)});
- }
- ZEN_DEBUG("Saved temp block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()));
+ IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock), BlockPath);
+ RwLock::ExclusiveLockScope __(AttachmentsLock);
+ CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)});
+ ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize()));
+ }
+ catch (std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ Ex.what(),
+ "Unable to create temp block file");
+ return;
}
};
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index c5ebca646..7254b9d3f 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -84,7 +84,7 @@ public:
struct RemoteStoreOptions
{
- static const size_t DefaultMaxBlockSize = 32u * 1024u * 1024u;
+ static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u;
static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u;
size_t MaxBlockSize = DefaultMaxBlockSize;