aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md2
-rw-r--r--src/zenremotestore/builds/buildstoragecache.cpp63
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp264
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h4
4 files changed, 154 insertions, 179 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c3ae9e1d8..b452cecfb 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,8 @@
##
- Bugfix: OAuth client credentials token request now sends correct `application/x-www-form-urlencoded` content type
- Improvement: HTTP client Content-Type in additional headers now overrides the payload content type
+- Bugfix: `builds download` with cache upload enabled no longer holds downloaded blobs in memory when boost-worker-memory is active; blobs are written to disk before upload
+- Bugfix: Removed obsolete `--cache-prime-only` flag from `builds download`
## 5.8.4
- Feature: Hub bulk deprovision endpoint (`POST /hub/deprovision`) tears down all provisioned and hibernated modules in a single request
diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp
index 0e0b14dca..40ea757eb 100644
--- a/src/zenremotestore/builds/buildstoragecache.cpp
+++ b/src/zenremotestore/builds/buildstoragecache.cpp
@@ -96,7 +96,8 @@ public:
ZEN_ASSERT(!IsFlushed);
ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
- // Move all segments in Payload to be file handle based so if Payload is materialized it does not affect buffers in queue
+ // Move all segments in Payload to be file handle based unless they are very small so if Payload is materialized it does not affect
+ // buffers in queue
std::vector<SharedBuffer> FileBasedSegments;
std::span<const SharedBuffer> Segments = Payload.GetSegments();
FileBasedSegments.reserve(Segments.size());
@@ -104,42 +105,56 @@ public:
tsl::robin_map<void*, std::filesystem::path> HandleToPath;
for (const SharedBuffer& Segment : Segments)
{
- std::filesystem::path FilePath;
- IoBufferFileReference Ref;
- if (Segment.AsIoBuffer().GetFileReference(Ref))
+ const uint64_t SegmentSize = Segment.GetSize();
+ if (SegmentSize < 16u * 1024u)
{
- if (auto It = HandleToPath.find(Ref.FileHandle); It != HandleToPath.end())
- {
- FilePath = It->second;
- }
- else
+ FileBasedSegments.push_back(Segment);
+ }
+ else
+ {
+ std::filesystem::path FilePath;
+ IoBufferFileReference Ref;
+ if (Segment.AsIoBuffer().GetFileReference(Ref))
{
- std::error_code Ec;
- std::filesystem::path Path = PathFromHandle(Ref.FileHandle, Ec);
- if (!Ec && !Path.empty())
+ if (auto It = HandleToPath.find(Ref.FileHandle); It != HandleToPath.end())
{
- HandleToPath.insert_or_assign(Ref.FileHandle, Path);
- FilePath = std::move(Path);
+ FilePath = It->second;
+ }
+ else
+ {
+ std::error_code Ec;
+ std::filesystem::path Path = PathFromHandle(Ref.FileHandle, Ec);
+ if (!Ec && !Path.empty())
+ {
+ HandleToPath.insert_or_assign(Ref.FileHandle, Path);
+ FilePath = std::move(Path);
+ }
+ else
+ {
+ ZEN_WARN("Failed getting path for chunk to upload to cache. Skipping upload.");
+ return;
+ }
}
}
- }
- if (!FilePath.empty())
- {
- IoBuffer BufferFromFile = IoBufferBuilder::MakeFromFile(FilePath, Ref.FileChunkOffset, Ref.FileChunkSize);
- if (BufferFromFile)
+ if (!FilePath.empty())
{
- FileBasedSegments.push_back(SharedBuffer(std::move(BufferFromFile)));
+ IoBuffer BufferFromFile = IoBufferBuilder::MakeFromFile(FilePath, Ref.FileChunkOffset, Ref.FileChunkSize);
+ if (BufferFromFile)
+ {
+ FileBasedSegments.push_back(SharedBuffer(std::move(BufferFromFile)));
+ }
+ else
+ {
+ ZEN_WARN("Failed opening file '{}' to upload to cache. Skipping upload.", FilePath);
+ return;
+ }
}
else
{
FileBasedSegments.push_back(Segment);
}
}
- else
- {
- FileBasedSegments.push_back(Segment);
- }
}
}
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index 21d693faa..389f8614d 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -382,6 +382,46 @@ namespace {
return CompositeBuffer{};
}
+ std::filesystem::path TryMoveDownloadedChunk(IoBuffer& BlockBuffer, const std::filesystem::path& Path, bool ForceDiskBased)
+ {
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize))
+ {
+ ZEN_TRACE_CPU("MoveTempFullBlock");
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ RenameFile(TempBlobPath, Path, Ec);
+ if (Ec)
+ {
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
+ }
+ else
+ {
+ return Path;
+ }
+ }
+ }
+
+ if (ForceDiskBased)
+ {
+ // Could not be moved and rather large, lets store it on disk
+ ZEN_TRACE_CPU("WriteTempFullBlock");
+ TemporaryFile::SafeWriteFile(Path, BlockBuffer);
+ BlockBuffer = {};
+ return Path;
+ }
+
+ return {};
+ }
+
} // namespace
class ReadFileCache
@@ -1645,14 +1685,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
- if (m_Storage.CacheStorage && m_Options.PopulateCache)
- {
- m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
- BlockDescription.BlockHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(BlockBuffer)));
- }
-
uint64_t BlockSize = BlockBuffer.GetSize();
m_DownloadStats.DownloadedBlockCount++;
m_DownloadStats.DownloadedBlockByteCount += BlockSize;
@@ -1661,46 +1693,27 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
FilteredDownloadedBytesPerSecond.Stop();
}
- {
- std::filesystem::path BlockChunkPath;
+ const bool PutInCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache;
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
- {
- IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockSize))
- {
- ZEN_TRACE_CPU("MoveTempFullBlock");
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
- {
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
- RenameFile(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
- {
- BlockChunkPath = std::filesystem::path{};
+ std::filesystem::path BlockChunkPath = TryMoveDownloadedChunk(
+ BlockBuffer,
+ m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(),
+ /* ForceDiskBased */ PutInCache || (BlockSize > m_Options.MaximumInMemoryPayloadSize));
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
- }
- }
- }
- }
-
- if (BlockChunkPath.empty() && (BlockSize > m_Options.MaximumInMemoryPayloadSize))
+ if (PutInCache)
+ {
+ ZEN_ASSERT(!BlockChunkPath.empty());
+ IoBuffer CacheBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (CacheBuffer)
{
- ZEN_TRACE_CPU("WriteTempFullBlock");
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ BlockDescription.BlockHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(CacheBuffer)));
}
+ }
+ {
if (!m_AbortFlag)
{
Work.ScheduleWork(
@@ -3161,10 +3174,9 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
RemoteChunkIndex,
&FilteredWrittenBytesPerSecond,
ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable {
- IoBufferFileReference FileRef;
- bool EnableBacklog = Payload.GetFileReference(FileRef);
AsyncWriteDownloadedChunk(m_Options.ZenFolderPath,
RemoteChunkIndex,
+ ExistsResult,
std::move(ChunkTargetPtrs),
WriteCache,
Work,
@@ -3172,8 +3184,7 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
SequenceIndexChunksLeftToWriteCounters,
WritePartsComplete,
TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- EnableBacklog);
+ FilteredWrittenBytesPerSecond);
});
}
});
@@ -3234,14 +3245,6 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde
FilteredDownloadedBytesPerSecond.Stop();
}
- if (Payload && m_Storage.CacheStorage && m_Options.PopulateCache)
- {
- m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
- ChunkHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(Payload)));
- }
-
OnDownloaded(std::move(Payload));
});
}
@@ -3261,17 +3264,11 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde
}
if (!m_AbortFlag)
{
- if (BuildBlob && m_Storage.CacheStorage && m_Options.PopulateCache)
- {
- m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
- ChunkHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(BuildBlob)));
- }
if (!BuildBlob)
{
throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
}
+
if (!m_AbortFlag)
{
uint64_t BlobSize = BuildBlob.GetSize();
@@ -3326,61 +3323,17 @@ BuildsOperationUpdateFolder::DownloadPartialBlock(
FilteredDownloadedBytesPerSecond.Stop();
}
- std::filesystem::path BlockChunkPath;
-
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ IoHashStream RangeId;
+ for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths)
{
- IoBufferFileReference FileRef;
- if (BlockRangeBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockRangeBufferSize))
- {
- ZEN_TRACE_CPU("MoveTempPartialBlock");
-
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
- {
- BlockRangeBuffer.SetDeleteOnClose(false);
- BlockRangeBuffer = {};
-
- IoHashStream RangeId;
- for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths)
- {
- RangeId.Append(&Range.first, sizeof(uint64_t));
- RangeId.Append(&Range.second, sizeof(uint64_t));
- }
-
- BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash());
- RenameFile(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
- {
- BlockChunkPath = std::filesystem::path{};
-
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockRangeBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockRangeBufferSize, true);
- BlockRangeBuffer.SetDeleteOnClose(true);
- }
- }
- }
+ RangeId.Append(&Range.first, sizeof(uint64_t));
+ RangeId.Append(&Range.second, sizeof(uint64_t));
}
+ std::filesystem::path BlockChunkPath =
+ TryMoveDownloadedChunk(BlockRangeBuffer,
+ m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()),
+ /* ForceDiskBased */ BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize);
- if (BlockChunkPath.empty() && (BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize))
- {
- ZEN_TRACE_CPU("WriteTempPartialBlock");
-
- IoHashStream RangeId;
- for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths)
- {
- RangeId.Append(&Range.first, sizeof(uint64_t));
- RangeId.Append(&Range.second, sizeof(uint64_t));
- }
-
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash());
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockRangeBuffer);
- BlockRangeBuffer = {};
- }
if (!m_AbortFlag)
{
OnDownloaded(std::move(BlockRangeBuffer), std::move(BlockChunkPath), BlockRangeStartIndex, BlockOffsetAndLengths);
@@ -3508,16 +3461,36 @@ BuildsOperationUpdateFolder::DownloadPartialBlock(
// Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3
// Upload to cache (if enabled) and use the whole payload for the remaining ranges
- if (m_Storage.CacheStorage && m_Options.PopulateCache)
+ const uint64_t Size = RangeBuffers.PayloadBuffer.GetSize();
+
+ const bool PopulateCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache;
+
+ std::filesystem::path BlockPath =
+ TryMoveDownloadedChunk(RangeBuffers.PayloadBuffer,
+ m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(),
+ /* ForceDiskBased */ PopulateCache || Size > m_Options.MaximumInMemoryPayloadSize);
+ if (!BlockPath.empty())
+ {
+ RangeBuffers.PayloadBuffer = IoBufferBuilder::MakeFromFile(BlockPath);
+ if (!RangeBuffers.PayloadBuffer)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed to read block {} from temporary path '{}'", BlockDescription.BlockHash, BlockPath));
+ }
+ RangeBuffers.PayloadBuffer.SetDeleteOnClose(true);
+ }
+
+ if (PopulateCache)
{
m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
BlockDescription.BlockHash,
ZenContentType::kCompressedBinary,
- CompositeBuffer(std::vector<IoBuffer>{RangeBuffers.PayloadBuffer}));
- if (m_AbortFlag)
- {
- break;
- }
+ CompositeBuffer(SharedBuffer(RangeBuffers.PayloadBuffer)));
+ }
+
+ if (m_AbortFlag)
+ {
+ break;
}
SubRangeCount = Ranges.size() - SubRangeCountComplete;
@@ -4269,6 +4242,7 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc
void
BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath,
uint32_t RemoteChunkIndex,
+ const BlobsExistsResult& ExistsResult,
std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs,
BufferedWriteFileCache& WriteCache,
ParallelWork& Work,
@@ -4276,8 +4250,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
std::atomic<uint64_t>& WritePartsComplete,
const uint64_t TotalPartWriteCount,
- FilteredRate& FilteredWrittenBytesPerSecond,
- bool EnableBacklog)
+ FilteredRate& FilteredWrittenBytesPerSecond)
{
ZEN_TRACE_CPU("AsyncWriteDownloadedChunk");
@@ -4285,43 +4258,28 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
const uint64_t Size = Payload.GetSize();
- std::filesystem::path CompressedChunkPath;
+ const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
+
+ const bool PopulateCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache;
- // Check if the dowloaded chunk is file based and we can move it directly without rewriting it
+ std::filesystem::path CompressedChunkPath =
+ TryMoveDownloadedChunk(Payload,
+ m_TempDownloadFolderPath / ChunkHash.ToHexString(),
+ /* ForceDiskBased */ PopulateCache || Size > m_Options.MaximumInMemoryPayloadSize);
+ if (PopulateCache)
{
- IoBufferFileReference FileRef;
- if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == Size))
+ IoBuffer CacheBlob = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (CacheBlob)
{
- ZEN_TRACE_CPU("MoveTempChunk");
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
- {
- Payload.SetDeleteOnClose(false);
- Payload = {};
- CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString();
- RenameFile(TempBlobPath, CompressedChunkPath, Ec);
- if (Ec)
- {
- CompressedChunkPath = std::filesystem::path{};
-
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, Size, true);
- Payload.SetDeleteOnClose(true);
- }
- }
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(CacheBlob)));
}
}
- if (CompressedChunkPath.empty() && (Size > m_Options.MaximumInMemoryPayloadSize))
- {
- ZEN_TRACE_CPU("WriteTempChunk");
- // Could not be moved and rather large, lets store it on disk
- CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString();
- TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload);
- Payload = {};
- }
+ IoBufferFileReference FileRef;
+ bool EnableBacklog = !CompressedChunkPath.empty() || Payload.GetFileReference(FileRef);
Work.ScheduleWork(
m_IOWorkerPool,
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
index 5f052b265..b35231776 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
@@ -328,6 +328,7 @@ private:
void AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath,
uint32_t RemoteChunkIndex,
+ const BlobsExistsResult& ExistsResult,
std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs,
BufferedWriteFileCache& WriteCache,
ParallelWork& Work,
@@ -335,8 +336,7 @@ private:
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
std::atomic<uint64_t>& WritePartsComplete,
const uint64_t TotalPartWriteCount,
- FilteredRate& FilteredWrittenBytesPerSecond,
- bool EnableBacklog);
+ FilteredRate& FilteredWrittenBytesPerSecond);
void VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, ParallelWork& Work);
bool CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters);