diff options
| -rw-r--r-- | CHANGELOG.md | 2 | ||||
| -rw-r--r-- | src/zenremotestore/builds/buildstoragecache.cpp | 63 | ||||
| -rw-r--r-- | src/zenremotestore/builds/buildstorageoperations.cpp | 264 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h | 4 |
4 files changed, 154 insertions, 179 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index c3ae9e1d8..b452cecfb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ ## - Bugfix: OAuth client credentials token request now sends correct `application/x-www-form-urlencoded` content type - Improvement: HTTP client Content-Type in additional headers now overrides the payload content type +- Bugfix: `builds download` with cache upload enabled no longer holds downloaded blobs in memory when boost-worker-memory is active; blobs are written to disk before upload +- Bugfix: Removed obsolete `--cache-prime-only` flag from `builds download` ## 5.8.4 - Feature: Hub bulk deprovision endpoint (`POST /hub/deprovision`) tears down all provisioned and hibernated modules in a single request diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp index 0e0b14dca..40ea757eb 100644 --- a/src/zenremotestore/builds/buildstoragecache.cpp +++ b/src/zenremotestore/builds/buildstoragecache.cpp @@ -96,7 +96,8 @@ public: ZEN_ASSERT(!IsFlushed); ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); - // Move all segments in Payload to be file handle based so if Payload is materialized it does not affect buffers in queue + // Move all segments in Payload to be file handle based unless they are very small so if Payload is materialized it does not affect + // buffers in queue std::vector<SharedBuffer> FileBasedSegments; std::span<const SharedBuffer> Segments = Payload.GetSegments(); FileBasedSegments.reserve(Segments.size()); @@ -104,42 +105,56 @@ public: tsl::robin_map<void*, std::filesystem::path> HandleToPath; for (const SharedBuffer& Segment : Segments) { - std::filesystem::path FilePath; - IoBufferFileReference Ref; - if (Segment.AsIoBuffer().GetFileReference(Ref)) + const uint64_t SegmentSize = Segment.GetSize(); + if (SegmentSize < 16u * 1024u) { - if (auto It = HandleToPath.find(Ref.FileHandle); It != HandleToPath.end()) - { - FilePath = It->second; - } - else + FileBasedSegments.push_back(Segment); + } + else + { + std::filesystem::path FilePath; + IoBufferFileReference Ref; + if (Segment.AsIoBuffer().GetFileReference(Ref)) { - std::error_code Ec; - std::filesystem::path Path = PathFromHandle(Ref.FileHandle, Ec); - if (!Ec && !Path.empty()) + if (auto It = HandleToPath.find(Ref.FileHandle); It != HandleToPath.end()) { - HandleToPath.insert_or_assign(Ref.FileHandle, Path); - FilePath = std::move(Path); + FilePath = It->second; + } + else + { + std::error_code Ec; + std::filesystem::path Path = PathFromHandle(Ref.FileHandle, Ec); + if (!Ec && !Path.empty()) + { + HandleToPath.insert_or_assign(Ref.FileHandle, Path); + FilePath = std::move(Path); + } + else + { + ZEN_WARN("Failed getting path for chunk to upload to cache. Skipping upload."); + return; + } } } - } - if (!FilePath.empty()) - { - IoBuffer BufferFromFile = IoBufferBuilder::MakeFromFile(FilePath, Ref.FileChunkOffset, Ref.FileChunkSize); - if (BufferFromFile) + if (!FilePath.empty()) { - FileBasedSegments.push_back(SharedBuffer(std::move(BufferFromFile))); + IoBuffer BufferFromFile = IoBufferBuilder::MakeFromFile(FilePath, Ref.FileChunkOffset, Ref.FileChunkSize); + if (BufferFromFile) + { + FileBasedSegments.push_back(SharedBuffer(std::move(BufferFromFile))); + } + else + { + ZEN_WARN("Failed opening file '{}' to upload to cache. Skipping upload.", FilePath); + return; + } } else { FileBasedSegments.push_back(Segment); } } - else - { - FileBasedSegments.push_back(Segment); - } } } diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index 21d693faa..389f8614d 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -382,6 +382,46 @@ namespace { return CompositeBuffer{}; } + std::filesystem::path TryMoveDownloadedChunk(IoBuffer& BlockBuffer, const std::filesystem::path& Path, bool ForceDiskBased) + { + uint64_t BlockSize = BlockBuffer.GetSize(); + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize)) + { + ZEN_TRACE_CPU("MoveTempFullBlock"); + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + RenameFile(TempBlobPath, Path, Ec); + if (Ec) + { + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } + else + { + return Path; + } + } + } + + if (ForceDiskBased) + { + // Could not be moved and rather large, lets store it on disk + ZEN_TRACE_CPU("WriteTempFullBlock"); + TemporaryFile::SafeWriteFile(Path, BlockBuffer); + BlockBuffer = {}; + return Path; + } + + return {}; + } + } // namespace class ReadFileCache @@ -1645,14 +1685,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } - if (m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - BlockDescription.BlockHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(BlockBuffer))); - } - uint64_t BlockSize = BlockBuffer.GetSize(); m_DownloadStats.DownloadedBlockCount++; m_DownloadStats.DownloadedBlockByteCount += BlockSize; @@ -1661,46 +1693,27 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) FilteredDownloadedBytesPerSecond.Stop(); } - { - std::filesystem::path BlockChunkPath; + const bool PutInCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache; - // Check if the dowloaded block is file based and we can move it directly without rewriting it - { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) - { - ZEN_TRACE_CPU("MoveTempFullBlock"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - RenameFile(TempBlobPath, BlockChunkPath, Ec); - if (Ec) - { - BlockChunkPath = std::filesystem::path{}; + std::filesystem::path BlockChunkPath = TryMoveDownloadedChunk( + BlockBuffer, + m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(), + /* ForceDiskBased */ PutInCache || (BlockSize > m_Options.MaximumInMemoryPayloadSize)); - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); - } - } - } - } - - if (BlockChunkPath.empty() && (BlockSize > m_Options.MaximumInMemoryPayloadSize)) + if (PutInCache) + { + ZEN_ASSERT(!BlockChunkPath.empty()); + IoBuffer CacheBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (CacheBuffer) { - ZEN_TRACE_CPU("WriteTempFullBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + BlockDescription.BlockHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(CacheBuffer))); } + } + { if (!m_AbortFlag) { Work.ScheduleWork( @@ -3161,10 +3174,9 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd RemoteChunkIndex, &FilteredWrittenBytesPerSecond, ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable { - IoBufferFileReference FileRef; - bool EnableBacklog = Payload.GetFileReference(FileRef); AsyncWriteDownloadedChunk(m_Options.ZenFolderPath, RemoteChunkIndex, + ExistsResult, std::move(ChunkTargetPtrs), WriteCache, Work, @@ -3172,8 +3184,7 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd SequenceIndexChunksLeftToWriteCounters, WritePartsComplete, TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - EnableBacklog); + FilteredWrittenBytesPerSecond); }); } }); @@ -3234,14 +3245,6 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde FilteredDownloadedBytesPerSecond.Stop(); } - if (Payload && m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(Payload))); - } - OnDownloaded(std::move(Payload)); }); } @@ -3261,17 +3264,11 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde } if (!m_AbortFlag) { - if (BuildBlob && m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(BuildBlob))); - } if (!BuildBlob) { throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); } + if (!m_AbortFlag) { uint64_t BlobSize = BuildBlob.GetSize(); @@ -3326,61 +3323,17 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( FilteredDownloadedBytesPerSecond.Stop(); } - std::filesystem::path BlockChunkPath; - - // Check if the dowloaded block is file based and we can move it directly without rewriting it + IoHashStream RangeId; + for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths) { - IoBufferFileReference FileRef; - if (BlockRangeBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockRangeBufferSize)) - { - ZEN_TRACE_CPU("MoveTempPartialBlock"); - - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - BlockRangeBuffer.SetDeleteOnClose(false); - BlockRangeBuffer = {}; - - IoHashStream RangeId; - for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths) - { - RangeId.Append(&Range.first, sizeof(uint64_t)); - RangeId.Append(&Range.second, sizeof(uint64_t)); - } - - BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()); - RenameFile(TempBlobPath, BlockChunkPath, Ec); - if (Ec) - { - BlockChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockRangeBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockRangeBufferSize, true); - BlockRangeBuffer.SetDeleteOnClose(true); - } - } - } + RangeId.Append(&Range.first, sizeof(uint64_t)); + RangeId.Append(&Range.second, sizeof(uint64_t)); } + std::filesystem::path BlockChunkPath = + TryMoveDownloadedChunk(BlockRangeBuffer, + m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()), + /* ForceDiskBased */ BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize); - if (BlockChunkPath.empty() && (BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize)) - { - ZEN_TRACE_CPU("WriteTempPartialBlock"); - - IoHashStream RangeId; - for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths) - { - RangeId.Append(&Range.first, sizeof(uint64_t)); - RangeId.Append(&Range.second, sizeof(uint64_t)); - } - - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockRangeBuffer); - BlockRangeBuffer = {}; - } if (!m_AbortFlag) { OnDownloaded(std::move(BlockRangeBuffer), std::move(BlockChunkPath), BlockRangeStartIndex, BlockOffsetAndLengths); @@ -3508,16 +3461,36 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3 // Upload to cache (if enabled) and use the whole payload for the remaining ranges - if (m_Storage.CacheStorage && m_Options.PopulateCache) + const uint64_t Size = RangeBuffers.PayloadBuffer.GetSize(); + + const bool PopulateCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache; + + std::filesystem::path BlockPath = + TryMoveDownloadedChunk(RangeBuffers.PayloadBuffer, + m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(), + /* ForceDiskBased */ PopulateCache || Size > m_Options.MaximumInMemoryPayloadSize); + if (!BlockPath.empty()) + { + RangeBuffers.PayloadBuffer = IoBufferBuilder::MakeFromFile(BlockPath); + if (!RangeBuffers.PayloadBuffer) + { + throw std::runtime_error( + fmt::format("Failed to read block {} from temporary path '{}'", BlockDescription.BlockHash, BlockPath)); + } + RangeBuffers.PayloadBuffer.SetDeleteOnClose(true); + } + + if (PopulateCache) { m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlockDescription.BlockHash, ZenContentType::kCompressedBinary, - CompositeBuffer(std::vector<IoBuffer>{RangeBuffers.PayloadBuffer})); - if (m_AbortFlag) - { - break; - } + CompositeBuffer(SharedBuffer(RangeBuffers.PayloadBuffer))); + } + + if (m_AbortFlag) + { + break; } SubRangeCount = Ranges.size() - SubRangeCountComplete; @@ -4269,6 +4242,7 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc void BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath, uint32_t RemoteChunkIndex, + const BlobsExistsResult& ExistsResult, std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, BufferedWriteFileCache& WriteCache, ParallelWork& Work, @@ -4276,8 +4250,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, std::atomic<uint64_t>& WritePartsComplete, const uint64_t TotalPartWriteCount, - FilteredRate& FilteredWrittenBytesPerSecond, - bool EnableBacklog) + FilteredRate& FilteredWrittenBytesPerSecond) { ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); @@ -4285,43 +4258,28 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa const uint64_t Size = Payload.GetSize(); - std::filesystem::path CompressedChunkPath; + const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); + + const bool PopulateCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache; - // Check if the dowloaded chunk is file based and we can move it directly without rewriting it + std::filesystem::path CompressedChunkPath = + TryMoveDownloadedChunk(Payload, + m_TempDownloadFolderPath / ChunkHash.ToHexString(), + /* ForceDiskBased */ PopulateCache || Size > m_Options.MaximumInMemoryPayloadSize); + if (PopulateCache) { - IoBufferFileReference FileRef; - if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == Size)) + IoBuffer CacheBlob = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (CacheBlob) { - ZEN_TRACE_CPU("MoveTempChunk"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - Payload.SetDeleteOnClose(false); - Payload = {}; - CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); - RenameFile(TempBlobPath, CompressedChunkPath, Ec); - if (Ec) - { - CompressedChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, Size, true); - Payload.SetDeleteOnClose(true); - } - } + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(CacheBlob))); } } - if (CompressedChunkPath.empty() && (Size > m_Options.MaximumInMemoryPayloadSize)) - { - ZEN_TRACE_CPU("WriteTempChunk"); - // Could not be moved and rather large, lets store it on disk - CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); - TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload); - Payload = {}; - } + IoBufferFileReference FileRef; + bool EnableBacklog = !CompressedChunkPath.empty() || Payload.GetFileReference(FileRef); Work.ScheduleWork( m_IOWorkerPool, diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index 5f052b265..b35231776 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -328,6 +328,7 @@ private: void AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath, uint32_t RemoteChunkIndex, + const BlobsExistsResult& ExistsResult, std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, BufferedWriteFileCache& WriteCache, ParallelWork& Work, @@ -335,8 +336,7 @@ private: std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, std::atomic<uint64_t>& WritePartsComplete, const uint64_t TotalPartWriteCount, - FilteredRate& FilteredWrittenBytesPerSecond, - bool EnableBacklog); + FilteredRate& FilteredWrittenBytesPerSecond); void VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, ParallelWork& Work); bool CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters); |