diff options
| author | Stefan Boberg <[email protected]> | 2025-03-07 15:23:36 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2025-03-07 15:23:36 +0100 |
| commit | adbf61a119e5aabbcec274b846f1109de0093bee (patch) | |
| tree | e320053d112b4e56218d7e29310c76b43f442697 | |
| parent | Tactical check-in to simplify merge from old branch (diff) | |
| parent | partial block fetch (#298) (diff) | |
| download | zen-adbf61a119e5aabbcec274b846f1109de0093bee.tar.xz zen-adbf61a119e5aabbcec274b846f1109de0093bee.zip | |
Merge remote-tracking branch 'origin/main' into sb/build-cache
36 files changed, 1293 insertions, 601 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index f8858e4b9..b2559ddaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,10 @@ ## +- **EXPERIMENTAL** `zen builds` + - Improvement: Do partial requests of blocks if not all of the block is needed + - Improvement: Better progress/statistics on download + - Bugfix: Ensure that temporary folder for Jupiter downloads exists during verify phase + +## 5.6.0 - Feature: Added support for `--trace`, `--tracehost` and `--tracefile` options to zen CLI command - Improvement: When logging HTTP responses, the body is now sanity checked to ensure it is human readable, and the length of the output is capped to prevent inadvertent log bloat - Improvement: Instrumented `zen builds download` command code so we get more useful Insights output diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index eb0650c4d..1c9476b96 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -231,7 +231,10 @@ namespace { return AuthToken; } - CompositeBuffer WriteToTempFileIfNeeded(const CompositeBuffer& Buffer, const std::filesystem::path& TempFolderPath, const IoHash& Hash) + CompositeBuffer WriteToTempFileIfNeeded(const CompositeBuffer& Buffer, + const std::filesystem::path& TempFolderPath, + const IoHash& Hash, + const std::string& Suffix = {}) { // If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory std::span<const SharedBuffer> Segments = Buffer.GetSegments(); @@ -241,7 +244,7 @@ namespace { { return Buffer; } - std::filesystem::path TempFilePath = (TempFolderPath / Hash.ToHexString()).make_preferred(); + std::filesystem::path TempFilePath = (TempFolderPath / (Hash.ToHexString() + Suffix)).make_preferred(); return CompositeBuffer(WriteToTempFile(Buffer, TempFilePath)); } @@ -302,7 +305,7 @@ namespace { return FilteredPerSecond; } - uint64_t GetElapsedTime() const + uint64_t GetElapsedTimeUS() const { if (StartTimeUS == (uint64_t)-1) { @@ -1738,8 +1741,8 @@ namespace { ProgressBar.Finish(); - GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTime(); - UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTime(); + GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS(); + UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); } } @@ -2069,9 +2072,9 @@ namespace { }); ProgressBar.Finish(); - UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTime(); - GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTime(); - LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTime(); + UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); + GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTimeUS(); + LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS(); } } @@ -3358,17 +3361,8 @@ namespace { return ChunkTargetPtrs; }; - bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath, - const ChunkedFolderContent& RemoteContent, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - const CompositeBuffer& DecompressedBlockBuffer, - const ChunkedContentLookup& Lookup, - std::atomic<bool>* RemoteChunkIndexNeedsCopyFromSourceFlags, - std::atomic<uint32_t>& OutChunksComplete, - std::atomic<uint64_t>& OutBytesWritten) + struct BlockWriteOps { - ZEN_TRACE_CPU("WriteBlockToDisk"); - std::vector<CompositeBuffer> ChunkBuffers; struct WriteOpData { @@ -3376,7 +3370,79 @@ namespace { size_t ChunkBufferIndex; }; std::vector<WriteOpData> WriteOps; + }; + void WriteBlockChunkOps(const std::filesystem::path& CacheFolderPath, + const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& Lookup, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + const BlockWriteOps Ops, + std::atomic<uint32_t>& OutChunksComplete, + std::atomic<uint64_t>& OutBytesWritten) + { + ZEN_TRACE_CPU("WriteBlockChunkOps"); + { + WriteFileCache OpenFileCache; + for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) + { + if (AbortFlag) + { + break; + } + const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex]; + const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex; + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= + RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); + const uint64_t ChunkSize = Chunk.GetSize(); + const uint64_t FileOffset = WriteOp.Target->Offset; + const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; + ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]); + + OpenFileCache.WriteToFile<CompositeBuffer>( + SequenceIndex, + [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { + return GetTempChunkedSequenceFileName(CacheFolderPath, + RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + }, + Chunk, + FileOffset, + RemoteContent.RawSizes[PathIndex]); + OutBytesWritten += ChunkSize; + } + } + if (!AbortFlag) + { + // Write tracking, updating this must be done without any files open (WriteFileCache) + for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) + { + const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex; + if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) + { + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + const IoHash VerifyChunkHash = + IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); + if (VerifyChunkHash != SequenceRawHash) + { + throw std::runtime_error( + fmt::format("Written hunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); + } + std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), + GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); + } + } + OutChunksComplete += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size()); + } + } + + bool GetBlockWriteOps(const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& Lookup, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + const CompositeBuffer& DecompressedBlockBuffer, + BlockWriteOps& OutOps) + { + ZEN_TRACE_CPU("GetBlockWriteOps"); SharedBuffer BlockBuffer = DecompressedBlockBuffer.Flatten(); uint64_t HeaderSize = 0; if (IterateChunkBlock( @@ -3402,91 +3468,207 @@ namespace { ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) { - WriteOps.push_back(WriteOpData{.Target = Target, .ChunkBufferIndex = ChunkBuffers.size()}); + OutOps.WriteOps.push_back( + BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()}); } - ChunkBuffers.emplace_back(std::move(Decompressed)); + OutOps.ChunkBuffers.emplace_back(std::move(Decompressed)); } } } }, HeaderSize)) { - if (!WriteOps.empty()) + std::sort(OutOps.WriteOps.begin(), + OutOps.WriteOps.end(), + [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) { + if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) + { + return true; + } + if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) + { + return false; + } + return Lhs.Target->Offset < Rhs.Target->Offset; + }); + + return true; + } + else + { + return false; + } + } + + bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath, + const ChunkedFolderContent& RemoteContent, + const ChunkBlockDescription& BlockDescription, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + const CompositeBuffer& BlockBuffer, + const ChunkedContentLookup& Lookup, + std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + std::atomic<uint32_t>& OutChunksComplete, + std::atomic<uint64_t>& OutBytesWritten) + { + ZEN_TRACE_CPU("WriteBlockToDisk"); + + IoHash BlockRawHash; + uint64_t BlockRawSize; + CompressedBuffer CompressedBlockBuffer = CompressedBuffer::FromCompressed(BlockBuffer, BlockRawHash, BlockRawSize); + if (!CompressedBlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", BlockDescription.BlockHash)); + } + + if (BlockRawHash != BlockDescription.BlockHash) + { + throw std::runtime_error( + fmt::format("Block {} header has a mismatching raw hash {}", BlockDescription.BlockHash, BlockRawHash)); + } + + CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite(); + if (!DecompressedBlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} failed to decompress", BlockDescription.BlockHash)); + } + + ZEN_ASSERT_SLOW(BlockDescription.BlockHash == IoHash::HashBuffer(DecompressedBlockBuffer)); + + BlockWriteOps Ops; + if (GetBlockWriteOps(RemoteContent, + Lookup, + SequenceIndexChunksLeftToWriteCounters, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DecompressedBlockBuffer, + Ops)) + { + WriteBlockChunkOps(CacheFolderPath, + RemoteContent, + Lookup, + SequenceIndexChunksLeftToWriteCounters, + Ops, + OutChunksComplete, + OutBytesWritten); + return true; + } + return false; + } + + bool GetPartialBlockWriteOps(const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& Lookup, + const ChunkBlockDescription& BlockDescription, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + const CompositeBuffer& PartialBlockBuffer, + uint32_t FirstIncludedBlockChunkIndex, + uint32_t LastIncludedBlockChunkIndex, + BlockWriteOps& OutOps) + { + ZEN_TRACE_CPU("GetPartialBlockWriteOps"); + uint32_t OffsetInBlock = 0; + for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++) + { + const uint32_t ChunkCompressedSize = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; + if (auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); It != Lookup.ChunkHashToChunkIndex.end()) { - if (!AbortFlag) + const uint32_t ChunkIndex = It->second; + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, Lookup, ChunkIndex); + + if (!ChunkTargetPtrs.empty()) { - std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOpData& Lhs, const WriteOpData& Rhs) { - if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) + bool NeedsWrite = true; + if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false)) + { + CompositeBuffer Chunk = PartialBlockBuffer.Mid(OffsetInBlock, ChunkCompressedSize); + IoHash VerifyChunkHash; + uint64_t VerifyRawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, VerifyChunkHash, VerifyRawSize); + if (!Compressed) { - return true; + ZEN_ASSERT(false); } - if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) + if (VerifyChunkHash != ChunkHash) { - return false; + ZEN_ASSERT(false); } - return Lhs.Target->Offset < Rhs.Target->Offset; - }); - - { - WriteFileCache OpenFileCache; - for (const WriteOpData& WriteOp : WriteOps) + if (VerifyRawSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex]) { - if (AbortFlag) - { - break; - } - const CompositeBuffer& Chunk = ChunkBuffers[WriteOp.ChunkBufferIndex]; - const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex; - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= - RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); - const uint64_t ChunkSize = Chunk.GetSize(); - const uint64_t FileOffset = WriteOp.Target->Offset; - const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; - ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]); - - OpenFileCache.WriteToFile<CompositeBuffer>( - SequenceIndex, - [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { - return GetTempChunkedSequenceFileName(CacheFolderPath, - RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - }, - Chunk, - FileOffset, - RemoteContent.RawSizes[PathIndex]); - OutBytesWritten += ChunkSize; + ZEN_ASSERT(false); } - } - if (!AbortFlag) - { - ZEN_TRACE_CPU("WriteBlockToDisk_VerifyHash"); - - // Write tracking, updating this must be done without any files open (WriteFileCache) - for (const WriteOpData& WriteOp : WriteOps) + CompositeBuffer Decompressed = Compressed.DecompressToComposite(); + if (!Decompressed) { - const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex; - if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) - { - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - const IoHash VerifyChunkHash = IoHash::HashBuffer( - IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) - { - throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}", - VerifyChunkHash, - SequenceRawHash)); - } - std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), - GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); - } + throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash)); } + ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); + ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) + { + OutOps.WriteOps.push_back( + BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()}); + } + OutOps.ChunkBuffers.emplace_back(std::move(Decompressed)); } - OutChunksComplete += gsl::narrow<uint32_t>(ChunkBuffers.size()); } } + + OffsetInBlock += ChunkCompressedSize; + } + std::sort(OutOps.WriteOps.begin(), + OutOps.WriteOps.end(), + [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) { + if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) + { + return true; + } + if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) + { + return false; + } + return Lhs.Target->Offset < Rhs.Target->Offset; + }); + return true; + } + + bool WritePartialBlockToDisk(const std::filesystem::path& CacheFolderPath, + const ChunkedFolderContent& RemoteContent, + const ChunkBlockDescription& BlockDescription, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + const CompositeBuffer& PartialBlockBuffer, + uint32_t FirstIncludedBlockChunkIndex, + uint32_t LastIncludedBlockChunkIndex, + const ChunkedContentLookup& Lookup, + std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + std::atomic<uint32_t>& OutChunksComplete, + std::atomic<uint64_t>& OutBytesWritten) + { + ZEN_TRACE_CPU("WritePartialBlockToDisk"); + BlockWriteOps Ops; + if (GetPartialBlockWriteOps(RemoteContent, + Lookup, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + RemoteChunkIndexNeedsCopyFromSourceFlags, + PartialBlockBuffer, + FirstIncludedBlockChunkIndex, + LastIncludedBlockChunkIndex, + Ops)) + { + WriteBlockChunkOps(CacheFolderPath, + RemoteContent, + Lookup, + SequenceIndexChunksLeftToWriteCounters, + Ops, + OutChunksComplete, + OutBytesWritten); return true; } - return false; + else + { + return false; + } } SharedBuffer Decompress(const CompositeBuffer& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize) @@ -3571,6 +3753,7 @@ namespace { CompositeBuffer&& CompressedPart, std::atomic<uint64_t>& WriteToDiskBytes) { + ZEN_TRACE_CPU("StreamDecompress"); const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash); TemporaryFile DecompressedTemp; std::error_code Ec; @@ -3633,10 +3816,9 @@ namespace { WorkerThreadPool& NetworkPool, std::atomic<uint64_t>& WriteToDiskBytes, std::atomic<uint64_t>& BytesDownloaded, - std::atomic<uint64_t>& LooseChunksBytes, - std::atomic<uint64_t>& DownloadedChunks, - std::atomic<uint32_t>& ChunksComplete, - std::atomic<uint64_t>& MultipartAttachmentCount) + std::atomic<uint64_t>& MultipartAttachmentCount, + std::function<void(uint64_t DowloadedBytes)>&& OnDownloadComplete, + std::function<void()>&& OnWriteComplete) { ZEN_TRACE_CPU("DownloadLargeBlob"); @@ -3665,22 +3847,20 @@ namespace { Workload, ChunkHash, &BytesDownloaded, - &LooseChunksBytes, + OnDownloadComplete = std::move(OnDownloadComplete), + OnWriteComplete = std::move(OnWriteComplete), &WriteToDiskBytes, - &DownloadedChunks, - &ChunksComplete, SequenceIndexChunksLeftToWriteCounters, ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>( ChunkTargetPtrs)](uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining) { BytesDownloaded += Chunk.GetSize(); - LooseChunksBytes += Chunk.GetSize(); if (!AbortFlag.load()) { Workload->TempFile.Write(Chunk.GetView(), Offset); if (Chunk.GetSize() == BytesRemaining) { - DownloadedChunks++; + OnDownloadComplete(Workload->TempFile.FileSize()); Work.ScheduleWork( WritePool, // GetSyncWorkerPool(),// @@ -3690,8 +3870,7 @@ namespace { ChunkHash, Workload, Offset, - BytesRemaining, - &ChunksComplete, + OnWriteComplete = std::move(OnWriteComplete), &WriteToDiskBytes, SequenceIndexChunksLeftToWriteCounters, ChunkTargetPtrs](std::atomic<bool>&) { @@ -3725,7 +3904,7 @@ namespace { CompositeBuffer(std::move(CompressedPart)), WriteToDiskBytes); NeedHashVerify = false; - ChunksComplete++; + OnWriteComplete(); } else { @@ -3748,7 +3927,7 @@ namespace { CompositeBuffer(Chunk), OpenFileCache, WriteToDiskBytes); - ChunksComplete++; + OnWriteComplete(); } } @@ -3760,12 +3939,12 @@ namespace { const uint32_t RemoteSequenceIndex = Location->SequenceIndex; if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) { - ZEN_TRACE_CPU("VerifyChunkHash"); - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; if (NeedHashVerify) { + ZEN_TRACE_CPU("VerifyChunkHash"); + const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); if (VerifyChunkHash != ChunkHash) @@ -3816,6 +3995,7 @@ namespace { const ChunkedFolderContent& RemoteContent, const std::vector<ChunkBlockDescription>& BlockDescriptions, const std::vector<IoHash>& LooseChunkHashes, + bool AllowPartialBlockRequests, bool WipeTargetFolder, FolderContent& OutLocalFolderState) { @@ -3967,11 +4147,17 @@ namespace { } } - std::atomic<uint32_t> ChunkCountWritten = 0; + uint64_t TotalRequestCount = 0; + std::atomic<uint64_t> RequestsComplete = 0; + std::atomic<uint32_t> ChunkCountWritten = 0; + std::atomic<size_t> TotalPartWriteCount = 0; + std::atomic<size_t> WritePartsComplete = 0; { ZEN_TRACE_CPU("HandleChunks"); + Stopwatch WriteTimer; + FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredWrittenBytesPerSecond; @@ -4010,6 +4196,8 @@ namespace { } else { + TotalRequestCount++; + TotalPartWriteCount++; Work.ScheduleWork( NetworkPool, // GetSyncWorkerPool(),// [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) { @@ -4020,25 +4208,39 @@ namespace { FilteredDownloadedBytesPerSecond.Start(); if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) { - DownloadLargeBlob(Storage, - Path / ZenTempChunkFolderName, - CacheFolderPath, - RemoteContent, - RemoteLookup, - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - ChunkTargetPtrs, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - NetworkPool, - WriteToDiskBytes, - BytesDownloaded, - LooseChunksBytes, - DownloadedChunks, - ChunkCountWritten, - MultipartAttachmentCount); + DownloadLargeBlob( + Storage, + Path / ZenTempChunkFolderName, + CacheFolderPath, + RemoteContent, + RemoteLookup, + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + ChunkTargetPtrs, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + NetworkPool, + WriteToDiskBytes, + BytesDownloaded, + MultipartAttachmentCount, + [&](uint64_t BytesDownloaded) { + LooseChunksBytes += BytesDownloaded; + RequestsComplete++; + if (RequestsComplete == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + }, + [&]() { + ChunkCountWritten++; + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + }); } else { @@ -4049,6 +4251,11 @@ namespace { } BytesDownloaded += CompressedPart.GetSize(); LooseChunksBytes += CompressedPart.GetSize(); + RequestsComplete++; + if (RequestsComplete == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(CompressedPart)), Path / ZenTempChunkFolderName, ChunkHash); @@ -4077,6 +4284,11 @@ namespace { CompositeBuffer(CompressedPart), WriteToDiskBytes); ChunkCountWritten++; + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } NeedHashVerify = false; } else @@ -4096,10 +4308,17 @@ namespace { OpenFileCache, WriteToDiskBytes); ChunkCountWritten++; + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } } if (!AbortFlag) { + WritePartsComplete++; + // Write tracking, updating this must be done without any files open // (WriteFileCache) for (const ChunkedContentLookup::ChunkSequenceLocation* Location : @@ -4109,12 +4328,12 @@ namespace { if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub( 1) == 1) { - ZEN_TRACE_CPU("UpdateFolder_VerifyHash"); - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; if (NeedHashVerify) { + ZEN_TRACE_CPU("UpdateFolder_VerifyHash"); + const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( GetTempChunkedSequenceFileName(CacheFolderPath, @@ -4155,6 +4374,8 @@ namespace { break; } + TotalPartWriteCount++; + Work.ScheduleWork( WritePool, // GetSyncWorkerPool(),// [&, CopyDataIndex](std::atomic<bool>&) { @@ -4166,245 +4387,428 @@ namespace { const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex]; const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.LocalSequenceIndex]; const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); - if (!CopyData.TargetChunkLocationPtrs.empty()) - { - uint64_t CacheLocalFileBytesRead = 0; + ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty()); - size_t TargetStart = 0; - const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets( - CopyData.TargetChunkLocationPtrs); + uint64_t CacheLocalFileBytesRead = 0; - struct WriteOp - { - const ChunkedContentLookup::ChunkSequenceLocation* Target; - uint64_t CacheFileOffset; - uint64_t ChunkSize; - }; + size_t TargetStart = 0; + const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets( + CopyData.TargetChunkLocationPtrs); + + struct WriteOp + { + const ChunkedContentLookup::ChunkSequenceLocation* Target; + uint64_t CacheFileOffset; + uint64_t ChunkSize; + }; - std::vector<WriteOp> WriteOps; - WriteOps.reserve(AllTargets.size()); + std::vector<WriteOp> WriteOps; + WriteOps.reserve(AllTargets.size()); - for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) + for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) + { + std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange = + AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); + for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) { - std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange = - AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); - for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) - { - WriteOps.push_back(WriteOp{.Target = Target, - .CacheFileOffset = ChunkTarget.CacheFileOffset, - .ChunkSize = ChunkTarget.ChunkRawSize}); - } - TargetStart += ChunkTarget.TargetChunkLocationCount; + WriteOps.push_back(WriteOp{.Target = Target, + .CacheFileOffset = ChunkTarget.CacheFileOffset, + .ChunkSize = ChunkTarget.ChunkRawSize}); } + TargetStart += ChunkTarget.TargetChunkLocationCount; + } - std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { - if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) - { - return true; - } - else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) - { - return false; - } - if (Lhs.Target->Offset < Rhs.Target->Offset) - { - return true; - } + std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { + if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) + { + return true; + } + else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) + { return false; - }); + } + if (Lhs.Target->Offset < Rhs.Target->Offset) + { + return true; + } + return false; + }); - if (!AbortFlag) + if (!AbortFlag) + { + BufferedOpenFile SourceFile(LocalFilePath); + WriteFileCache OpenFileCache; + for (const WriteOp& Op : WriteOps) { - BufferedOpenFile SourceFile(LocalFilePath); - WriteFileCache OpenFileCache; - for (const WriteOp& Op : WriteOps) + if (AbortFlag) { - if (AbortFlag) - { - break; - } - const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <= - RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]); - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0); - const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; - const uint64_t ChunkSize = Op.ChunkSize; - CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize); - - ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); - - OpenFileCache.WriteToFile<CompositeBuffer>( - RemoteSequenceIndex, - [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { - return GetTempChunkedSequenceFileName( - CacheFolderPath, - RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - }, - ChunkSource, - Op.Target->Offset, - RemoteContent.RawSizes[RemotePathIndex]); - WriteToDiskBytes += ChunkSize; - CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes? + break; } + const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <= + RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]); + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0); + const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; + const uint64_t ChunkSize = Op.ChunkSize; + CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize); + + ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); + + OpenFileCache.WriteToFile<CompositeBuffer>( + RemoteSequenceIndex, + [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { + return GetTempChunkedSequenceFileName( + CacheFolderPath, + RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + }, + ChunkSource, + Op.Target->Offset, + RemoteContent.RawSizes[RemotePathIndex]); + WriteToDiskBytes += ChunkSize; + CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes? } - if (!AbortFlag) + } + if (!AbortFlag) + { + // Write tracking, updating this must be done without any files open (WriteFileCache) + for (const WriteOp& Op : WriteOps) { - if (!AbortFlag) + ZEN_TRACE_CPU("UpdateFolder_Copy_VerifyHash"); + + const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; + if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) { - // Write tracking, updating this must be done without any files open (WriteFileCache) - for (const WriteOp& Op : WriteOps) + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( + GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); + if (VerifyChunkHash != SequenceRawHash) { - ZEN_TRACE_CPU("UpdateFolder_Copy_VerifyHash"); - - const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; - if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) - { - const IoHash& SequenceRawHash = - RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( - GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) - { - throw std::runtime_error( - fmt::format("Written chunk sequence {} hash does not match expected hash {}", - VerifyChunkHash, - SequenceRawHash)); - } - - ZEN_TRACE_CPU("UpdateFolder_Copy_rename"); - std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), - GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); - } + throw std::runtime_error( + fmt::format("Written chunk sequence {} hash does not match expected hash {}", + VerifyChunkHash, + SequenceRawHash)); } - } - ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size()); - ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]); + ZEN_TRACE_CPU("UpdateFolder_Copy_rename"); + std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), + GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); + } } + + ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size()); + ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]); + } + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); } } }, Work.DefaultErrorFunction()); } - size_t BlockCount = BlockDescriptions.size(); - std::atomic<size_t> BlocksComplete = 0; - - auto IsBlockNeeded = [&RemoteContent, &RemoteLookup, &RemoteChunkIndexNeedsCopyFromSourceFlags]( - const ChunkBlockDescription& BlockDescription) -> bool { - for (const IoHash& ChunkHash : BlockDescription.ChunkRawHashes) - { - if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end()) - { - const uint32_t RemoteChunkIndex = It->second; - if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]) - { - return true; - } - } - } - return false; + size_t BlockCount = BlockDescriptions.size(); + + std::vector<bool> ChunkIsPickedUpByBlock(RemoteContent.ChunkedContent.ChunkHashes.size(), false); + auto GetNeededChunkBlockIndexes = [&RemoteContent, + &RemoteLookup, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &ChunkIsPickedUpByBlock](const ChunkBlockDescription& BlockDescription) { + std::vector<uint32_t> NeededBlockChunkIndexes; + for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++) + { + const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; + if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end()) + { + const uint32_t RemoteChunkIndex = It->second; + if (!ChunkIsPickedUpByBlock[RemoteChunkIndex]) + { + if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]) + { + ChunkIsPickedUpByBlock[RemoteChunkIndex] = true; + NeededBlockChunkIndexes.push_back(ChunkBlockIndex); + } + } + } + } + return NeededBlockChunkIndexes; }; - size_t BlocksNeededCount = 0; + size_t BlocksNeededCount = 0; + uint64_t AllBlocksSize = 0; + uint64_t AllBlocksFetch = 0; + uint64_t AllBlocksSlack = 0; + uint64_t AllBlockRequests = 0; + uint64_t AllBlockChunksSize = 0; for (size_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++) { if (Work.IsAborted()) { break; } - if (IsBlockNeeded(BlockDescriptions[BlockIndex])) + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + const std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription); + if (!BlockChunkIndexNeeded.empty()) { - BlocksNeededCount++; - Work.ScheduleWork( - NetworkPool, - [&, BlockIndex](std::atomic<bool>&) { - if (!AbortFlag) + bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size(); + bool CanDoPartialBlockDownload = (BlockDescription.HeaderSize > 0) && (BlockDescription.ChunkCompressedLengths.size() == + BlockDescription.ChunkRawHashes.size()); + if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) + { + struct BlockRangeDescriptor + { + uint64_t RangeStart = 0; + uint64_t RangeLength = 0; + uint32_t ChunkBlockIndexStart = 0; + uint32_t ChunkBlockIndexCount = 0; + }; + std::vector<BlockRangeDescriptor> BlockRanges; + + ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalisys"); + + uint32_t NeedBlockChunkIndexOffset = 0; + uint32_t ChunkBlockIndex = 0; + uint32_t CurrentOffset = + gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + + BlockRangeDescriptor NextRange; + while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && + ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) + { + const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) { - ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Read"); - - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescriptions[BlockIndex].BlockHash); - if (!BlockBuffer) + if (NextRange.RangeLength > 0) { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescriptions[BlockIndex].BlockHash)); + BlockRanges.push_back(NextRange); + NextRange = {}; } - BytesDownloaded += BlockBuffer.GetSize(); - BlockBytes += BlockBuffer.GetSize(); - DownloadedBlocks++; - CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)), - Path / ZenTempBlockFolderName, - BlockDescriptions[BlockIndex].BlockHash); - - if (!AbortFlag) + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + } + else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + { + AllBlockChunksSize += ChunkCompressedLength; + if (NextRange.RangeLength == 0) { - Work.ScheduleWork( - WritePool, - [&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Write"); - - FilteredWrittenBytesPerSecond.Start(); - IoHash BlockRawHash; - uint64_t BlockRawSize; - CompressedBuffer CompressedBlockBuffer = - CompressedBuffer::FromCompressed(std::move(BlockBuffer), BlockRawHash, BlockRawSize); - if (!CompressedBlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", - BlockDescriptions[BlockIndex].BlockHash)); - } + NextRange.RangeStart = CurrentOffset; + NextRange.ChunkBlockIndexStart = ChunkBlockIndex; + } + NextRange.RangeLength += ChunkCompressedLength; + NextRange.ChunkBlockIndexCount++; + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + NeedBlockChunkIndexOffset++; + } + else + { + ZEN_ASSERT(false); + } + } + AllBlocksSize += CurrentOffset; + if (NextRange.RangeLength > 0) + { + BlockRanges.push_back(NextRange); + NextRange = {}; + } - if (BlockRawHash != BlockDescriptions[BlockIndex].BlockHash) - { - throw std::runtime_error(fmt::format("Block {} header has a mismatching raw hash {}", - BlockDescriptions[BlockIndex].BlockHash, - BlockRawHash)); - } + ZEN_ASSERT(!BlockRanges.empty()); + std::vector<BlockRangeDescriptor> CollapsedBlockRanges; + auto It = BlockRanges.begin(); + CollapsedBlockRanges.push_back(*It++); + uint64_t TotalSlack = 0; + while (It != BlockRanges.end()) + { + BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); + uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); + uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength; + if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out + { + LastRange.ChunkBlockIndexCount = + (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; + LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart; + TotalSlack += Slack; + } + else + { + CollapsedBlockRanges.push_back(*It); + } + ++It; + } + + uint64_t TotalFetch = 0; + for (const BlockRangeDescriptor& Range : CollapsedBlockRanges) + { + TotalFetch += Range.RangeLength; + } + + AllBlocksFetch += TotalFetch; + AllBlocksSlack += TotalSlack; + BlocksNeededCount++; + AllBlockRequests += CollapsedBlockRanges.size(); + + for (size_t BlockRangeIndex = 0; BlockRangeIndex < CollapsedBlockRanges.size(); BlockRangeIndex++) + { + TotalRequestCount++; + TotalPartWriteCount++; + const BlockRangeDescriptor BlockRange = CollapsedBlockRanges[BlockRangeIndex]; + // Partial block schedule + Work.ScheduleWork( + NetworkPool, // NetworkPool, // GetSyncWorkerPool() + [&, BlockIndex, BlockRange](std::atomic<bool>&) { + ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialGet"); + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + } + BytesDownloaded += BlockBuffer.GetSize(); + BlockBytes += BlockBuffer.GetSize(); + DownloadedBlocks++; + CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)), + Path / ZenTempBlockFolderName, + BlockDescription.BlockHash, + fmt::format("_{}", BlockRange.RangeStart)); + RequestsComplete++; + if (RequestsComplete == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } - CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite(); - if (!DecompressedBlockBuffer) + if (!AbortFlag) + { + Work.ScheduleWork( + WritePool, // WritePool, // GetSyncWorkerPool(), + [&, BlockIndex, BlockRange, BlockPartialBuffer = std::move(Payload)](std::atomic<bool>&) { + if (!AbortFlag) { - throw std::runtime_error(fmt::format("Block {} failed to decompress", - BlockDescriptions[BlockIndex].BlockHash)); + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + + FilteredWrittenBytesPerSecond.Start(); + + if (!WritePartialBlockToDisk( + CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + BlockPartialBuffer, + BlockRange.ChunkBlockIndexStart, + BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + ChunkCountWritten, + WriteToDiskBytes)) + { + throw std::runtime_error( + fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); + } + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } + }, + [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) { + ZEN_ERROR("Failed writing block {}. Reason: {}", + BlockDescriptions[BlockIndex].BlockHash, + Ex.what()); + AbortFlag = true; + }); + } + }, + Work.DefaultErrorFunction()); + } + } + else + { + BlocksNeededCount++; + TotalRequestCount++; + TotalPartWriteCount++; + + Work.ScheduleWork( + NetworkPool, // GetSyncWorkerPool(), // NetworkPool, + [&, BlockIndex](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Get"); + + // Full block schedule - ZEN_ASSERT_SLOW(BlockDescriptions[BlockIndex].BlockHash == - IoHash::HashBuffer(DecompressedBlockBuffer)); - - if (!WriteBlockToDisk(CacheFolderPath, - RemoteContent, - SequenceIndexChunksLeftToWriteCounters, - DecompressedBlockBuffer, - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags.data(), - ChunkCountWritten, - WriteToDiskBytes)) + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + } + BytesDownloaded += BlockBuffer.GetSize(); + BlockBytes += BlockBuffer.GetSize(); + DownloadedBlocks++; + RequestsComplete++; + if (RequestsComplete == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + + CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)), + Path / ZenTempBlockFolderName, + BlockDescription.BlockHash); + if (!AbortFlag) + { + Work.ScheduleWork( + WritePool, + [&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic<bool>&) { + if (!AbortFlag) { - throw std::runtime_error( - fmt::format("Block {} is malformed", BlockDescriptions[BlockIndex].BlockHash)); + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + + FilteredWrittenBytesPerSecond.Start(); + if (!WriteBlockToDisk(CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + BlockBuffer, + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + ChunkCountWritten, + WriteToDiskBytes)) + { + throw std::runtime_error( + fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } - BlocksComplete++; - } - }, - [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) { - ZEN_ERROR("Failed writing block {}. Reason: {}", - BlockDescriptions[BlockIndex].BlockHash, - Ex.what()); - AbortFlag = true; - }); + }, + Work.DefaultErrorFunction()); + } } - } - }, - Work.DefaultErrorFunction()); + }, + Work.DefaultErrorFunction()); + } } else { ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash); } } - + ZEN_DEBUG("Fetching {} with {} slack (ideal {}) out of {} using {} requests for {} blocks", + NiceBytes(AllBlocksFetch), + NiceBytes(AllBlocksSlack), + NiceBytes(AllBlockChunksSize), + NiceBytes(AllBlocksSize), + AllBlockRequests, + BlocksNeededCount); ZEN_TRACE_CPU("HandleChunks_Wait"); Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { @@ -4412,13 +4816,13 @@ namespace { ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load()); FilteredWrittenBytesPerSecond.Update(WriteToDiskBytes.load()); FilteredDownloadedBytesPerSecond.Update(BytesDownloaded.load()); - std::string Details = fmt::format("{}/{} chunks. {}/{} blocks. {} {}bits/s downloaded. {} {}B/s written", - ChunkCountWritten.load(), - ChunkCountToWrite, - BlocksComplete.load(), - BlocksNeededCount, + std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.", + RequestsComplete.load(), + TotalRequestCount, NiceBytes(BytesDownloaded.load()), NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), + ChunkCountWritten.load(), + ChunkCountToWrite, NiceBytes(WriteToDiskBytes.load()), NiceNum(FilteredWrittenBytesPerSecond.GetCurrent())); WriteProgressBar.UpdateState({.Task = "Writing chunks ", @@ -4437,6 +4841,13 @@ namespace { } WriteProgressBar.Finish(); + + ZEN_CONSOLE("Downloaded {} ({}bits/s). Wrote {} ({}B/s). Completed in {}", + NiceBytes(BytesDownloaded.load()), + NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), BytesDownloaded * 8)), + NiceBytes(WriteToDiskBytes.load()), + NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), WriteToDiskBytes.load())), + NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs())); } for (const auto& SequenceIndexChunksLeftToWriteCounter : SequenceIndexChunksLeftToWriteCounters) @@ -4455,6 +4866,7 @@ namespace { }); // Move all files we will reuse to cache folder + // TODO: If WipeTargetFolder is false we could check which files are already correct and leave them in place for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++) { const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex]; @@ -5186,6 +5598,7 @@ namespace { std::span<const std::string> BuildPartNames, const std::filesystem::path& Path, bool AllowMultiparts, + bool AllowPartialBlockRequests, bool WipeTargetFolder, bool PostDownloadVerify) { @@ -5202,8 +5615,8 @@ namespace { CreateDirectories(Path / ZenTempBlockFolderName); CreateDirectories(Path / ZenTempChunkFolderName); // TODO: Don't clear this - pick up files -> chunks to use CreateDirectories(Path / - ZenTempCacheFolderName); // TODO: Don't clear this - pick up files and use as sequences (non .tmp extension) and - // delete .tmp (maybe?) - chunk them? How do we know the file is worth chunking? + ZenTempCacheFolderName); // TODO: Don't clear this - pick up files and use as sequences (non .tmp extension) + // and delete .tmp (maybe?) - chunk them? How do we know the file is worth chunking? std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; @@ -5311,6 +5724,7 @@ namespace { RemoteContent, BlockDescriptions, LooseChunkHashes, + AllowPartialBlockRequests, WipeTargetFolder, LocalFolderState); @@ -5705,6 +6119,12 @@ BuildsCommand::BuildsCommand() "Allow large attachments to be transfered using multipart protocol. Defaults to true.", cxxopts::value(m_AllowMultiparts), "<allowmultipart>"); + m_DownloadOptions.add_option("", + "", + "allow-partial-block-requests", + "Allow request for partial chunk blocks. Defaults to true.", + cxxopts::value(m_AllowPartialBlockRequests), + "<allowpartialblockrequests>"); m_DownloadOptions .add_option("", "", "verify", "Enable post download verify of all tracked files", cxxopts::value(m_PostDownloadVerify), "<verify>"); m_DownloadOptions.parse_positional({"local-path", "build-id", "build-part-name"}); @@ -5728,6 +6148,18 @@ BuildsCommand::BuildsCommand() AddOutputOptions(m_TestOptions); m_TestOptions.add_options()("h,help", "Print help"); m_TestOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>"); + m_TestOptions.add_option("", + "", + "allow-multipart", + "Allow large attachments to be transfered using multipart protocol. Defaults to true.", + cxxopts::value(m_AllowMultiparts), + "<allowmultipart>"); + m_TestOptions.add_option("", + "", + "allow-partial-block-requests", + "Allow request for partial chunk blocks. Defaults to true.", + cxxopts::value(m_AllowPartialBlockRequests), + "<allowpartialblockrequests>"); m_TestOptions.parse_positional({"local-path"}); m_TestOptions.positional_help("local-path"); @@ -6037,7 +6469,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_Bucket, GeneratedBuildId ? "Generated " : "", BuildId); - CreateDirectories(m_Path / ZenTempStorageFolderName); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); StorageName = "Cloud DDC"; } @@ -6175,7 +6606,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_Namespace, m_Bucket, BuildId); - CreateDirectories(m_Path / ZenTempStorageFolderName); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); StorageName = "Cloud DDC"; } @@ -6190,7 +6620,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } - DownloadFolder(*Storage, BuildId, BuildPartIds, m_BuildPartNames, m_Path, m_AllowMultiparts, m_Clean, m_PostDownloadVerify); + DownloadFolder(*Storage, + BuildId, + BuildPartIds, + m_BuildPartNames, + m_Path, + m_AllowMultiparts, + m_AllowPartialBlockRequests, + m_Clean, + m_PostDownloadVerify); if (false) { @@ -6275,7 +6713,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_Namespace, m_Bucket, BuildId); - CreateDirectories(m_Path / ZenTempStorageFolderName); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); StorageName = "Cloud DDC"; } @@ -6335,7 +6772,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) const std::filesystem::path DownloadPath = m_Path.parent_path() / (m_BuildPartName + "_download"); ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, true, true); + DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true); if (AbortFlag) { ZEN_CONSOLE("Download failed."); @@ -6347,7 +6784,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true); + DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (identical target)"); @@ -6449,7 +6886,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true); + DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (scrambled target)"); @@ -6487,7 +6924,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true); + DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); @@ -6495,7 +6932,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false, true); + DownloadFolder(*Storage, + BuildId2, + {BuildPartId2}, + {}, + DownloadPath, + m_AllowMultiparts, + m_AllowPartialBlockRequests, + false, + true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); @@ -6503,7 +6948,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false, true); + DownloadFolder(*Storage, + BuildId2, + {BuildPartId2}, + {}, + DownloadPath, + m_AllowMultiparts, + m_AllowPartialBlockRequests, + false, + true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); @@ -6549,7 +7002,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_Namespace, m_Bucket, BuildId); - CreateDirectories(m_Path / ZenTempStorageFolderName); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); StorageName = "Cloud DDC"; } @@ -6617,7 +7069,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_Namespace, m_Bucket, BuildId); - CreateDirectories(m_Path / ZenTempStorageFolderName); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); StorageName = "Cloud DDC"; } diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h index c54fb4db1..838a17807 100644 --- a/src/zen/cmds/builds_cmd.h +++ b/src/zen/cmds/builds_cmd.h @@ -49,6 +49,7 @@ private: bool m_Clean = false; uint8_t m_BlockReuseMinPercentLimit = 85; bool m_AllowMultiparts = true; + bool m_AllowPartialBlockRequests = true; std::filesystem::path m_ManifestPath; // Direct access token (may expire) diff --git a/src/zencore/compactbinary.cpp b/src/zencore/compactbinary.cpp index adccaba70..b43cc18f1 100644 --- a/src/zencore/compactbinary.cpp +++ b/src/zencore/compactbinary.cpp @@ -15,6 +15,8 @@ #include <zencore/testing.h> #include <zencore/uid.h> +#include <EASTL/fixed_vector.h> + #include <fmt/format.h> #include <string_view> @@ -1376,9 +1378,9 @@ TryMeasureCompactBinary(MemoryView View, CbFieldType& OutType, uint64_t& OutSize CbField LoadCompactBinary(BinaryReader& Ar, BufferAllocator Allocator) { - std::vector<uint8_t> HeaderBytes; - CbFieldType FieldType; - uint64_t FieldSize = 1; + eastl::fixed_vector<uint8_t, 32> HeaderBytes; + CbFieldType FieldType; + uint64_t FieldSize = 1; for (const int64_t StartPos = Ar.CurrentOffset(); FieldSize > 0;) { @@ -1393,7 +1395,7 @@ LoadCompactBinary(BinaryReader& Ar, BufferAllocator Allocator) HeaderBytes.resize(ReadOffset + ReadSize); Ar.Read(HeaderBytes.data() + ReadOffset, ReadSize); - if (TryMeasureCompactBinary(MakeMemoryView(HeaderBytes), FieldType, FieldSize)) + if (TryMeasureCompactBinary(MakeMemoryView(HeaderBytes.data(), HeaderBytes.size()), FieldType, FieldSize)) { if (FieldSize <= uint64_t(Ar.Size() - StartPos)) { diff --git a/src/zencore/compactbinarybuilder.cpp b/src/zencore/compactbinarybuilder.cpp index a60de023d..63c0b9c5c 100644 --- a/src/zencore/compactbinarybuilder.cpp +++ b/src/zencore/compactbinarybuilder.cpp @@ -15,23 +15,21 @@ namespace zen { -template<typename T> uint64_t -AddUninitialized(std::vector<T>& Vector, uint64_t Count) +AddUninitialized(CbWriter::CbWriterData_t& Vector, uint64_t Count) { const uint64_t Offset = Vector.size(); Vector.resize(Offset + Count); return Offset; } -template<typename T> uint64_t -Append(std::vector<T>& Vector, const T* Data, uint64_t Count) +Append(CbWriter::CbWriterData_t& Vector, const uint8_t* Data, uint64_t Count) { const uint64_t Offset = Vector.size(); Vector.resize(Offset + Count); - memcpy(Vector.data() + Offset, Data, sizeof(T) * Count); + memcpy(Vector.data() + Offset, Data, sizeof(uint8_t) * Count); return Offset; } @@ -76,7 +74,7 @@ IsUniformType(const CbFieldType Type) /** Append the payload from the compact binary value to the array and return its type. */ static inline CbFieldType -AppendCompactBinary(const CbFieldView& Value, std::vector<uint8_t>& OutData) +AppendCompactBinary(const CbFieldView& Value, CbWriter::CbWriterData_t& OutData) { struct FCopy : public CbFieldView { @@ -93,7 +91,6 @@ AppendCompactBinary(const CbFieldView& Value, std::vector<uint8_t>& OutData) CbWriter::CbWriter() { - States.reserve(4); States.emplace_back(); } diff --git a/src/zencore/compactbinarypackage.cpp b/src/zencore/compactbinarypackage.cpp index 7de161845..ffe64f2e9 100644 --- a/src/zencore/compactbinarypackage.cpp +++ b/src/zencore/compactbinarypackage.cpp @@ -3,10 +3,13 @@ #include "zencore/compactbinarypackage.h" #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryvalidation.h> +#include <zencore/eastlutil.h> #include <zencore/endian.h> #include <zencore/stream.h> #include <zencore/testing.h> +#include <EASTL/span.h> + namespace zen { /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -341,6 +344,12 @@ CbPackage::SetObject(CbObject InObject, const IoHash* InObjectHash, AttachmentRe } void +CbPackage::ReserveAttachments(size_t Count) +{ + Attachments.reserve(Count); +} + +void CbPackage::AddAttachment(const CbAttachment& Attachment, AttachmentResolver* Resolver) { if (!Attachment.IsNull()) @@ -374,17 +383,18 @@ CbPackage::AddAttachments(std::span<const CbAttachment> InAttachments) { ZEN_ASSERT(!Attachment.IsNull()); } + // Assume we have no duplicates! Attachments.insert(Attachments.end(), InAttachments.begin(), InAttachments.end()); std::sort(Attachments.begin(), Attachments.end()); - ZEN_ASSERT_SLOW(std::unique(Attachments.begin(), Attachments.end()) == Attachments.end()); + ZEN_ASSERT_SLOW(eastl::unique(Attachments.begin(), Attachments.end()) == Attachments.end()); } int32_t CbPackage::RemoveAttachment(const IoHash& Hash) { return gsl::narrow_cast<int32_t>( - std::erase_if(Attachments, [&Hash](const CbAttachment& Attachment) -> bool { return Attachment.GetHash() == Hash; })); + erase_if(Attachments, [&Hash](const CbAttachment& Attachment) -> bool { return Attachment.GetHash() == Hash; })); } bool diff --git a/src/zencore/include/zencore/compactbinarybuilder.h b/src/zencore/include/zencore/compactbinarybuilder.h index 1c625cacc..f11717453 100644 --- a/src/zencore/include/zencore/compactbinarybuilder.h +++ b/src/zencore/include/zencore/compactbinarybuilder.h @@ -18,6 +18,8 @@ #include <type_traits> #include <vector> +#include <EASTL/fixed_vector.h> + #include <gsl/gsl-lite.hpp> namespace zen { @@ -367,6 +369,8 @@ public: /** Private flags that are public to work with ENUM_CLASS_FLAGS. */ enum class StateFlags : uint8_t; + typedef eastl::fixed_vector<uint8_t, 2048> CbWriterData_t; + protected: /** Reserve the specified size up front until the format is optimized. */ ZENCORE_API explicit CbWriter(int64_t InitialSize); @@ -409,8 +413,8 @@ private: // provided externally, such as on the stack. That format will store the offsets that require // object or array sizes to be inserted and field types to be removed, and will perform those // operations only when saving to a buffer. - std::vector<uint8_t> Data; - std::vector<WriterState> States; + eastl::fixed_vector<WriterState, 4> States; + CbWriterData_t Data; }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/zencore/include/zencore/compactbinarypackage.h b/src/zencore/include/zencore/compactbinarypackage.h index 12fcc41b7..9ec12cb0f 100644 --- a/src/zencore/include/zencore/compactbinarypackage.h +++ b/src/zencore/include/zencore/compactbinarypackage.h @@ -12,6 +12,8 @@ #include <span> #include <variant> +#include <EASTL/fixed_vector.h> + #ifdef GetObject # error "windows.h pollution" # undef GetObject @@ -265,7 +267,10 @@ public: } /** Returns the attachments in this package. */ - inline std::span<const CbAttachment> GetAttachments() const { return Attachments; } + inline std::span<const CbAttachment> GetAttachments() const + { + return std::span<const CbAttachment>(begin(Attachments), end(Attachments)); + } /** * Find an attachment by its hash. @@ -286,6 +291,8 @@ public: void AddAttachments(std::span<const CbAttachment> Attachments); + void ReserveAttachments(size_t Count); + /** * Remove an attachment by hash. * @@ -324,9 +331,9 @@ private: void GatherAttachments(const CbObject& Object, AttachmentResolver Resolver); /** Attachments ordered by their hash. */ - std::vector<CbAttachment> Attachments; - CbObject Object; - IoHash ObjectHash; + eastl::fixed_vector<CbAttachment, 32> Attachments; + CbObject Object; + IoHash ObjectHash; }; namespace legacy { diff --git a/src/zencore/include/zencore/compositebuffer.h b/src/zencore/include/zencore/compositebuffer.h index b435c5e74..1e1611de9 100644 --- a/src/zencore/include/zencore/compositebuffer.h +++ b/src/zencore/include/zencore/compositebuffer.h @@ -2,6 +2,7 @@ #pragma once +#include <zencore/eastlutil.h> #include <zencore/sharedbuffer.h> #include <zencore/zencore.h> @@ -9,6 +10,8 @@ #include <span> #include <vector> +#include <EASTL/fixed_vector.h> + namespace zen { /** @@ -35,7 +38,7 @@ public: { m_Segments.reserve((GetBufferCount(std::forward<BufferTypes>(Buffers)) + ...)); (AppendBuffers(std::forward<BufferTypes>(Buffers)), ...); - std::erase_if(m_Segments, [](const SharedBuffer& It) { return It.IsNull(); }); + erase_if(m_Segments, [](const SharedBuffer& It) { return It.IsNull(); }); } } @@ -46,7 +49,10 @@ public: [[nodiscard]] ZENCORE_API uint64_t GetSize() const; /** Returns the segments that the buffer is composed from. */ - [[nodiscard]] inline std::span<const SharedBuffer> GetSegments() const { return std::span<const SharedBuffer>{m_Segments}; } + [[nodiscard]] inline std::span<const SharedBuffer> GetSegments() const + { + return std::span<const SharedBuffer>{begin(m_Segments), end(m_Segments)}; + } /** Returns true if the composite buffer is not null. */ [[nodiscard]] inline explicit operator bool() const { return !IsNull(); } @@ -120,6 +126,8 @@ public: static const CompositeBuffer Null; private: + typedef eastl::fixed_vector<SharedBuffer, 4> SharedBufferVector_t; + static inline size_t GetBufferCount(const CompositeBuffer& Buffer) { return Buffer.m_Segments.size(); } inline void AppendBuffers(const CompositeBuffer& Buffer) { @@ -134,12 +142,25 @@ private: inline void AppendBuffers(SharedBuffer&& Buffer) { m_Segments.push_back(std::move(Buffer)); } inline void AppendBuffers(IoBuffer&& Buffer) { m_Segments.push_back(SharedBuffer(std::move(Buffer))); } + static inline size_t GetBufferCount(std::span<IoBuffer>&& Container) { return Container.size(); } + inline void AppendBuffers(std::span<IoBuffer>&& Container) + { + m_Segments.reserve(m_Segments.size() + Container.size()); + for (IoBuffer& Buffer : Container) + { + m_Segments.emplace_back(SharedBuffer(std::move(Buffer))); + } + } + static inline size_t GetBufferCount(std::vector<SharedBuffer>&& Container) { return Container.size(); } static inline size_t GetBufferCount(std::vector<IoBuffer>&& Container) { return Container.size(); } inline void AppendBuffers(std::vector<SharedBuffer>&& Container) { m_Segments.reserve(m_Segments.size() + Container.size()); - m_Segments.insert(m_Segments.end(), std::make_move_iterator(Container.begin()), std::make_move_iterator(Container.end())); + for (SharedBuffer& Buffer : Container) + { + m_Segments.emplace_back(std::move(Buffer)); + } } inline void AppendBuffers(std::vector<IoBuffer>&& Container) { @@ -150,8 +171,17 @@ private: } } + inline void AppendBuffers(SharedBufferVector_t&& Container) + { + m_Segments.reserve(m_Segments.size() + Container.size()); + for (SharedBuffer& Buffer : Container) + { + m_Segments.emplace_back(std::move(Buffer)); + } + } + private: - std::vector<SharedBuffer> m_Segments; + SharedBufferVector_t m_Segments; }; void compositebuffer_forcelink(); // internal diff --git a/src/zencore/include/zencore/eastlutil.h b/src/zencore/include/zencore/eastlutil.h new file mode 100644 index 000000000..642321dae --- /dev/null +++ b/src/zencore/include/zencore/eastlutil.h @@ -0,0 +1,20 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <algorithm> + +namespace zen { + +size_t +erase_if(auto& _Cont, auto Predicate) +{ + auto _First = _Cont.begin(); + const auto _Last = _Cont.end(); + const auto _Old_size = _Cont.size(); + _First = std::remove_if(_First, _Last, Predicate); + _Cont.erase(_First, _Last); + return _Old_size - _Cont.size(); +} + +} // namespace zen diff --git a/src/zencore/include/zencore/memory/newdelete.h b/src/zencore/include/zencore/memory/newdelete.h index d22c8604f..059f1d5ea 100644 --- a/src/zencore/include/zencore/memory/newdelete.h +++ b/src/zencore/include/zencore/memory/newdelete.h @@ -153,3 +153,29 @@ operator new[](std::size_t n, std::align_val_t al, const std::nothrow_t&) noexce return zen_new_aligned_nothrow(n, static_cast<size_t>(al)); } #endif + +// EASTL operator new + +void* +operator new[](size_t size, const char* pName, int flags, unsigned debugFlags, const char* file, int line) +{ + ZEN_UNUSED(pName, flags, debugFlags, file, line); + return zen_new(size); +} + +void* +operator new[](size_t size, + size_t alignment, + size_t alignmentOffset, + const char* pName, + int flags, + unsigned debugFlags, + const char* file, + int line) +{ + ZEN_UNUSED(alignmentOffset, pName, flags, debugFlags, file, line); + + ZEN_ASSERT_SLOW(alignmentOffset == 0); // currently not supported + + return zen_new_aligned(size, alignment); +} diff --git a/src/zencore/xmake.lua b/src/zencore/xmake.lua index 2efa3fdb8..b8b14084c 100644 --- a/src/zencore/xmake.lua +++ b/src/zencore/xmake.lua @@ -55,6 +55,7 @@ target('zencore') add_packages( "vcpkg::doctest", + "vcpkg::eastl", "vcpkg::fmt", "vcpkg::gsl-lite", "vcpkg::lz4", diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index e4c6d243d..30711a432 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -1148,6 +1148,30 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold return true; }; + uint64_t RequestedContentLength = (uint64_t)-1; + if (auto RangeIt = AdditionalHeader.Entries.find("Range"); RangeIt != AdditionalHeader.Entries.end()) + { + if (RangeIt->second.starts_with("bytes")) + { + size_t RangeStartPos = RangeIt->second.find('=', 5); + if (RangeStartPos != std::string::npos) + { + RangeStartPos++; + size_t RangeSplitPos = RangeIt->second.find('-', RangeStartPos); + if (RangeSplitPos != std::string::npos) + { + std::optional<size_t> RequestedRangeStart = + ParseInt<size_t>(RangeIt->second.substr(RangeStartPos, RangeSplitPos - RangeStartPos)); + std::optional<size_t> RequestedRangeEnd = ParseInt<size_t>(RangeIt->second.substr(RangeStartPos + 1)); + if (RequestedRangeStart.has_value() && RequestedRangeEnd.has_value()) + { + RequestedContentLength = RequestedRangeEnd.value() - 1; + } + } + } + } + } + cpr::Response Response; { std::vector<std::pair<std::string, std::string>> ReceivedHeaders; @@ -1155,10 +1179,10 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold std::pair<std::string, std::string> Header = GetHeader(header); if (Header.first == "Content-Length"sv) { - std::optional<size_t> ContentSize = ParseInt<size_t>(Header.second); - if (ContentSize.has_value()) + std::optional<size_t> ContentLength = ParseInt<size_t>(Header.second); + if (ContentLength.has_value()) { - if (ContentSize.value() > 1024 * 1024) + if (ContentLength.value() > 1024 * 1024) { PayloadFile = std::make_unique<detail::TempPayloadFile>(); std::error_code Ec = PayloadFile->Open(TempFolderPath); @@ -1172,7 +1196,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold } else { - PayloadString.reserve(ContentSize.value()); + PayloadString.reserve(ContentLength.value()); } } } @@ -1218,85 +1242,90 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold auto It = Response.header.find("Content-Length"); if (It != Response.header.end()) { - std::optional<int64_t> ContentLength = ParseInt<int64_t>(It->second); - if (ContentLength) - { - std::vector<std::pair<std::string, std::string>> ReceivedHeaders; + std::vector<std::pair<std::string, std::string>> ReceivedHeaders; - auto HeaderCallback = [&](std::string header, intptr_t) { - std::pair<std::string, std::string> Header = GetHeader(header); - if (!Header.first.empty()) - { - ReceivedHeaders.emplace_back(std::move(Header)); - } + auto HeaderCallback = [&](std::string header, intptr_t) { + std::pair<std::string, std::string> Header = GetHeader(header); + if (!Header.first.empty()) + { + ReceivedHeaders.emplace_back(std::move(Header)); + } - if (Header.first == "Content-Range"sv) + if (Header.first == "Content-Range"sv) + { + if (Header.second.starts_with("bytes "sv)) { - if (Header.second.starts_with("bytes "sv)) + size_t RangeStartEnd = Header.second.find('-', 6); + if (RangeStartEnd != std::string::npos) { - size_t RangeStartEnd = Header.second.find('-', 6); - if (RangeStartEnd != std::string::npos) + const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6)); + if (Start) { - const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6)); - if (Start) + uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); + if (Start.value() == DownloadedSize) { - uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); - if (Start.value() == DownloadedSize) - { - return 1; - } - else if (Start.value() > DownloadedSize) - { - return 0; - } - if (PayloadFile) - { - PayloadFile->ResetWritePos(Start.value()); - } - else - { - PayloadString = PayloadString.substr(0, Start.value()); - } return 1; } + else if (Start.value() > DownloadedSize) + { + return 0; + } + if (PayloadFile) + { + PayloadFile->ResetWritePos(Start.value()); + } + else + { + PayloadString = PayloadString.substr(0, Start.value()); + } + return 1; } } - return 0; } - return 1; - }; + return 0; + } + return 1; + }; - KeyValueMap HeadersWithRange(AdditionalHeader); - do - { - uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); + KeyValueMap HeadersWithRange(AdditionalHeader); + do + { + uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); - std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength.value() - 1); - if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end()) + uint64_t ContentLength = RequestedContentLength; + if (ContentLength == uint64_t(-1)) + { + if (auto ParsedContentLength = ParseInt<int64_t>(It->second); ParsedContentLength.has_value()) { - if (RangeIt->second == Range) - { - // If we didn't make any progress, abort - break; - } + ContentLength = ParsedContentLength.value(); } - HeadersWithRange.Entries.insert_or_assign("Range", Range); - - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, - Url, - m_ConnectionSettings, - HeadersWithRange, - {}, - m_SessionId, - GetAccessToken()); - Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback}); - for (const std::pair<std::string, std::string>& H : ReceivedHeaders) + } + + std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength - 1); + if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end()) + { + if (RangeIt->second == Range) { - Response.header.insert_or_assign(H.first, H.second); + // If we didn't make any progress, abort + break; } - ReceivedHeaders.clear(); - } while (ShouldResume(Response)); - } + } + HeadersWithRange.Entries.insert_or_assign("Range", Range); + + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, + Url, + m_ConnectionSettings, + HeadersWithRange, + {}, + m_SessionId, + GetAccessToken()); + Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback}); + for (const std::pair<std::string, std::string>& H : ReceivedHeaders) + { + Response.header.insert_or_assign(H.first, H.second); + } + ReceivedHeaders.clear(); + } while (ShouldResume(Response)); } } } diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp index 1fbe22628..27a09f339 100644 --- a/src/zenhttp/httpserver.cpp +++ b/src/zenhttp/httpserver.cpp @@ -31,6 +31,8 @@ #include <span> #include <string_view> +#include <EASTL/fixed_vector.h> + namespace zen { using namespace std::literals; @@ -529,7 +531,7 @@ HttpServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType { std::span<const SharedBuffer> Segments = Payload.GetSegments(); - std::vector<IoBuffer> Buffers; + eastl::fixed_vector<IoBuffer, 64> Buffers; Buffers.reserve(Segments.size()); for (auto& Segment : Segments) @@ -537,7 +539,7 @@ HttpServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType Buffers.push_back(Segment.AsIoBuffer()); } - WriteResponse(ResponseCode, ContentType, Buffers); + WriteResponse(ResponseCode, ContentType, std::span<IoBuffer>(begin(Buffers), end(Buffers))); } std::string diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h index 7b87cb84b..217455dba 100644 --- a/src/zenhttp/include/zenhttp/httpserver.h +++ b/src/zenhttp/include/zenhttp/httpserver.h @@ -208,7 +208,7 @@ class HttpRouterRequest public: HttpRouterRequest(HttpServerRequest& Request) : m_HttpRequest(Request) {} - ZENCORE_API std::string GetCapture(uint32_t Index) const; + std::string_view GetCapture(uint32_t Index) const; inline HttpServerRequest& ServerRequest() { return m_HttpRequest; } private: @@ -220,12 +220,14 @@ private: friend class HttpRequestRouter; }; -inline std::string +inline std::string_view HttpRouterRequest::GetCapture(uint32_t Index) const { ZEN_ASSERT(Index < m_Match.size()); - return m_Match[Index]; + const auto& Match = m_Match[Index]; + + return std::string_view(&*Match.first, Match.second - Match.first); } /** HTTP request router helper diff --git a/src/zenhttp/packageformat.cpp b/src/zenhttp/packageformat.cpp index 676fc73fd..ae80851e4 100644 --- a/src/zenhttp/packageformat.cpp +++ b/src/zenhttp/packageformat.cpp @@ -19,6 +19,8 @@ #include <span> #include <vector> +#include <EASTL/fixed_vector.h> + #if ZEN_PLATFORM_WINDOWS # include <zencore/windows.h> #endif @@ -31,6 +33,10 @@ namespace zen { const std::string_view HandlePrefix(":?#:"); +typedef eastl::fixed_vector<IoBuffer, 16> IoBufferVec_t; + +IoBufferVec_t FormatPackageMessageInternal(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle); + std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, void* TargetProcessHandle) { @@ -42,10 +48,18 @@ FormatPackageMessageBuffer(const CbPackage& Data, void* TargetProcessHandle) return FormatPackageMessageBuffer(Data, FormatFlags::kDefault, TargetProcessHandle); } +std::vector<IoBuffer> +FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle) +{ + auto Vec = FormatPackageMessageInternal(Data, Flags, TargetProcessHandle); + return std::vector<IoBuffer>(begin(Vec), end(Vec)); +} + CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle) { - return CompositeBuffer(FormatPackageMessage(Data, Flags, TargetProcessHandle)); + auto Vec = FormatPackageMessageInternal(Data, Flags, TargetProcessHandle); + return CompositeBuffer(std::span{begin(Vec), end(Vec)}); } static void @@ -54,7 +68,7 @@ MarshalLocal(CbAttachmentEntry*& AttachmentInfo, CbAttachmentReferenceHeader& LocalRef, const IoHash& AttachmentHash, bool IsCompressed, - std::vector<IoBuffer>& ResponseBuffers) + IoBufferVec_t& ResponseBuffers) { IoBuffer RefBuffer(sizeof(CbAttachmentReferenceHeader) + Path8.size()); @@ -146,8 +160,8 @@ IsLocalRef(tsl::robin_map<void*, std::string>& FileNameMap, return true; }; -std::vector<IoBuffer> -FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle) +IoBufferVec_t +FormatPackageMessageInternal(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle) { ZEN_TRACE_CPU("FormatPackageMessage"); @@ -177,7 +191,7 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, void* TargetProce #endif // ZEN_PLATFORM_WINDOWS const std::span<const CbAttachment>& Attachments = Data.GetAttachments(); - std::vector<IoBuffer> ResponseBuffers; + IoBufferVec_t ResponseBuffers; ResponseBuffers.reserve(2 + Attachments.size()); // TODO: may want to use an additional fudge factor here to avoid growing since each // attachment is likely to consist of several buffers diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index 87128c0c9..3bdcdf098 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -16,6 +16,8 @@ #include <zencore/trace.h> #include <zenhttp/packageformat.h> +#include <EASTL/fixed_vector.h> + #if ZEN_WITH_HTTPSYS # define _WINSOCKAPI_ # include <zencore/windows.h> @@ -381,14 +383,14 @@ public: void SuppressResponseBody(); // typically used for HEAD requests private: - std::vector<HTTP_DATA_CHUNK> m_HttpDataChunks; - uint64_t m_TotalDataSize = 0; // Sum of all chunk sizes - uint16_t m_ResponseCode = 0; - uint32_t m_NextDataChunkOffset = 0; // Cursor used for very large chunk lists - uint32_t m_RemainingChunkCount = 0; // Backlog for multi-call sends - bool m_IsInitialResponse = true; - HttpContentType m_ContentType = HttpContentType::kBinary; - std::vector<IoBuffer> m_DataBuffers; + eastl::fixed_vector<HTTP_DATA_CHUNK, 16> m_HttpDataChunks; + uint64_t m_TotalDataSize = 0; // Sum of all chunk sizes + uint16_t m_ResponseCode = 0; + uint32_t m_NextDataChunkOffset = 0; // Cursor used for very large chunk lists + uint32_t m_RemainingChunkCount = 0; // Backlog for multi-call sends + bool m_IsInitialResponse = true; + HttpContentType m_ContentType = HttpContentType::kBinary; + eastl::fixed_vector<IoBuffer, 16> m_DataBuffers; void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> Blobs); }; diff --git a/src/zenserver/objectstore/objectstore.cpp b/src/zenserver/objectstore/objectstore.cpp index 5d96de225..e757ef84e 100644 --- a/src/zenserver/objectstore/objectstore.cpp +++ b/src/zenserver/objectstore/objectstore.cpp @@ -269,9 +269,9 @@ HttpObjectStoreService::Inititalize() m_Router.RegisterRoute( "bucket/{path}", [this](zen::HttpRouterRequest& Request) { - const std::string Path = Request.GetCapture(1); - const auto Sep = Path.find_last_of('.'); - const bool IsObject = Sep != std::string::npos && Path.size() - Sep > 0; + const std::string_view Path = Request.GetCapture(1); + const auto Sep = Path.find_last_of('.'); + const bool IsObject = Sep != std::string::npos && Path.size() - Sep > 0; if (IsObject) { @@ -337,18 +337,18 @@ HttpObjectStoreService::CreateBucket(zen::HttpRouterRequest& Request) } void -HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::string& Path) +HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::string_view Path) { namespace fs = std::filesystem; - const auto Sep = Path.find_first_of('/'); - const std::string BucketName = Sep == std::string::npos ? Path : Path.substr(0, Sep); + const auto Sep = Path.find_first_of('/'); + const std::string BucketName{Sep == std::string::npos ? Path : Path.substr(0, Sep)}; if (BucketName.empty()) { return Request.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); } - std::string BucketPrefix = Sep == std::string::npos || Sep == Path.size() - 1 ? std::string() : Path.substr(BucketName.size() + 1); + std::string BucketPrefix{Sep == std::string::npos || Sep == Path.size() - 1 ? std::string() : Path.substr(BucketName.size() + 1)}; if (BucketPrefix.empty()) { const auto QueryParms = Request.ServerRequest().GetQueryParams(); @@ -450,14 +450,13 @@ HttpObjectStoreService::DeleteBucket(zen::HttpRouterRequest& Request) } void -HttpObjectStoreService::GetObject(zen::HttpRouterRequest& Request, const std::string& Path) +HttpObjectStoreService::GetObject(zen::HttpRouterRequest& Request, const std::string_view Path) { namespace fs = std::filesystem; - const auto Sep = Path.find_first_of('/'); - const std::string BucketName = Sep == std::string::npos ? Path : Path.substr(0, Sep); - const std::string BucketPrefix = - Sep == std::string::npos || Sep == Path.size() - 1 ? std::string() : Path.substr(BucketName.size() + 1); + const auto Sep = Path.find_first_of('/'); + const std::string BucketName{Sep == std::string::npos ? Path : Path.substr(0, Sep)}; + const std::string BucketPrefix{Sep == std::string::npos || Sep == Path.size() - 1 ? std::string() : Path.substr(BucketName.size() + 1)}; const fs::path BucketDir = GetBucketDirectory(BucketName); @@ -554,8 +553,8 @@ HttpObjectStoreService::PutObject(zen::HttpRouterRequest& Request) { namespace fs = std::filesystem; - const std::string& BucketName = Request.GetCapture(1); - const fs::path BucketDir = GetBucketDirectory(BucketName); + const std::string_view BucketName = Request.GetCapture(1); + const fs::path BucketDir = GetBucketDirectory(BucketName); if (BucketDir.empty()) { diff --git a/src/zenserver/objectstore/objectstore.h b/src/zenserver/objectstore/objectstore.h index c905ceab3..dae979c4c 100644 --- a/src/zenserver/objectstore/objectstore.h +++ b/src/zenserver/objectstore/objectstore.h @@ -36,9 +36,9 @@ private: void Inititalize(); std::filesystem::path GetBucketDirectory(std::string_view BucketName); void CreateBucket(zen::HttpRouterRequest& Request); - void ListBucket(zen::HttpRouterRequest& Request, const std::string& Path); + void ListBucket(zen::HttpRouterRequest& Request, const std::string_view Path); void DeleteBucket(zen::HttpRouterRequest& Request); - void GetObject(zen::HttpRouterRequest& Request, const std::string& Path); + void GetObject(zen::HttpRouterRequest& Request, const std::string_view Path); void PutObject(zen::HttpRouterRequest& Request); ObjectStoreConfig m_Cfg; diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index 0b8e5f13b..47748dd90 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -983,15 +983,19 @@ HttpProjectService::HandleOplogOpPrepRequest(HttpRouterRequest& Req) IoBuffer Payload = HttpReq.ReadPayload(); CbObject RequestObject = LoadCompactBinaryObject(Payload); - std::vector<IoHash> ChunkList; - CbArrayView HaveList = RequestObject["have"sv].AsArrayView(); - ChunkList.reserve(HaveList.Num()); - for (auto& Entry : HaveList) + std::vector<IoHash> NeedList; + { - ChunkList.push_back(Entry.AsHash()); - } + eastl::fixed_vector<IoHash, 16> ChunkList; + CbArrayView HaveList = RequestObject["have"sv].AsArrayView(); + ChunkList.reserve(HaveList.Num()); + for (auto& Entry : HaveList) + { + ChunkList.push_back(Entry.AsHash()); + } - std::vector<IoHash> NeedList = FoundLog->CheckPendingChunkReferences(ChunkList, std::chrono::minutes(2)); + NeedList = FoundLog->CheckPendingChunkReferences(std::span(begin(ChunkList), end(ChunkList)), std::chrono::minutes(2)); + } CbObjectWriter Cbo(1 + 1 + 5 + NeedList.size() * (1 + sizeof(IoHash::Hash)) + 1); Cbo.BeginArray("need"); @@ -1151,7 +1155,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified"); } - std::vector<IoHash> ReferencedChunks; + eastl::fixed_vector<IoHash, 16> ReferencedChunks; Core.IterateAttachments([&ReferencedChunks](CbFieldView View) { ReferencedChunks.push_back(View.AsAttachment()); }); // Write core to oplog @@ -1169,7 +1173,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) // Once we stored the op, we no longer need to retain any chunks this op references if (!ReferencedChunks.empty()) { - FoundLog->RemovePendingChunkReferences(ReferencedChunks); + FoundLog->RemovePendingChunkReferences(std::span(begin(ReferencedChunks), end(ReferencedChunks))); } m_ProjectStats.OpWriteCount++; @@ -1301,9 +1305,9 @@ HttpProjectService::HandleOpLogOpRequest(HttpRouterRequest& Req) HttpServerRequest& HttpReq = Req.ServerRequest(); - const std::string& ProjectId = Req.GetCapture(1); - const std::string& OplogId = Req.GetCapture(2); - const std::string& OpIdString = Req.GetCapture(3); + const std::string_view ProjectId = Req.GetCapture(1); + const std::string_view OplogId = Req.GetCapture(2); + const std::string_view OpIdString = Req.GetCapture(3); Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) @@ -1690,8 +1694,8 @@ HttpProjectService::HandleProjectRequest(HttpRouterRequest& Req) using namespace std::literals; - HttpServerRequest& HttpReq = Req.ServerRequest(); - const std::string ProjectId = Req.GetCapture(1); + HttpServerRequest& HttpReq = Req.ServerRequest(); + const std::string_view ProjectId = Req.GetCapture(1); switch (HttpReq.RequestVerb()) { diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 53df12b14..86791e29a 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -423,9 +423,13 @@ ComputeOpKey(const CbObjectView& Op) { using namespace std::literals; - BinaryWriter KeyStream; + eastl::fixed_vector<uint8_t, 256> KeyData; - Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyStream.Write(Data, Size); }); + Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { + auto Begin = reinterpret_cast<const uint8_t*>(Data); + auto End = Begin + Size; + KeyData.insert(KeyData.end(), Begin, End); + }); XXH3_128 KeyHash128; @@ -434,15 +438,15 @@ ComputeOpKey(const CbObjectView& Op) // path but longer paths are evaluated properly. In the future all key lengths // should be evaluated using the proper path, this is a temporary workaround to // maintain compatibility with existing disk state. - if (KeyStream.GetSize() < 240) + if (KeyData.size() < 240) { XXH3_128Stream_deprecated KeyHasher; - KeyHasher.Append(KeyStream.Data(), KeyStream.Size()); + KeyHasher.Append(KeyData.data(), KeyData.size()); KeyHash128 = KeyHasher.GetHash(); } else { - KeyHash128 = XXH3_128::HashMemory(KeyStream.GetView()); + KeyHash128 = XXH3_128::HashMemory(KeyData.data(), KeyData.size()); } Oid KeyHash; @@ -2735,7 +2739,7 @@ ProjectStore::Oplog::CheckPendingChunkReferences(std::span<const IoHash> ChunkHa MissingChunks.reserve(ChunkHashes.size()); for (const IoHash& FileHash : ChunkHashes) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(FileHash); !Payload) + if (!m_CidStore.ContainsChunk(FileHash)) { MissingChunks.push_back(FileHash); } @@ -3359,7 +3363,6 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bo ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OpenOplog"); - std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); { RwLock::SharedLockScope ProjectLock(m_ProjectLock); @@ -3367,21 +3370,35 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bo if (OplogIt != m_Oplogs.end()) { - if (!VerifyPathOnDisk || Oplog::ExistsAt(OplogBasePath)) + bool ReOpen = false; + + if (VerifyPathOnDisk) { - return OplogIt->second.get(); + std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); + + if (!Oplog::ExistsAt(OplogBasePath)) + { + // Somebody deleted the oplog on disk behind our back + ProjectLock.ReleaseNow(); + std::filesystem::path DeletePath; + if (!RemoveOplog(OplogId, DeletePath)) + { + ZEN_WARN("Failed to clean up deleted oplog {}/{}", Identifier, OplogId, OplogBasePath); + } + + ReOpen = true; + } } - // Somebody deleted the oplog on disk behind our back - ProjectLock.ReleaseNow(); - std::filesystem::path DeletePath; - if (!RemoveOplog(OplogId, DeletePath)) + if (!ReOpen) { - ZEN_WARN("Failed to clean up deleted oplog {}/{}", Identifier, OplogId, OplogBasePath); + return OplogIt->second.get(); } } } + std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); + RwLock::ExclusiveLockScope Lock(m_ProjectLock); if (auto It = m_Oplogs.find(std::string{OplogId}); It != m_Oplogs.end()) { diff --git a/src/zenserver/workspaces/httpworkspaces.cpp b/src/zenserver/workspaces/httpworkspaces.cpp index 905ba5ab2..8a4b977ad 100644 --- a/src/zenserver/workspaces/httpworkspaces.cpp +++ b/src/zenserver/workspaces/httpworkspaces.cpp @@ -589,7 +589,7 @@ void HttpWorkspacesService::ShareAliasFilesRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, @@ -608,7 +608,7 @@ void HttpWorkspacesService::ShareAliasChunkInfoRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, @@ -635,7 +635,7 @@ void HttpWorkspacesService::ShareAliasBatchRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, @@ -654,7 +654,7 @@ void HttpWorkspacesService::ShareAliasEntriesRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, @@ -673,7 +673,7 @@ void HttpWorkspacesService::ShareAliasChunkRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, @@ -700,7 +700,7 @@ void HttpWorkspacesService::ShareAliasRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 25f68330a..61552fafc 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -708,11 +708,11 @@ namespace zen { ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, - std::string BucketName, + std::string_view BucketName, const BucketConfiguration& Config) : m_Gc(Gc) , m_OuterCacheMemoryUsage(OuterCacheMemoryUsage) -, m_BucketName(std::move(BucketName)) +, m_BucketName(BucketName) , m_Configuration(Config) , m_BucketId(Oid::Zero) { @@ -1329,7 +1329,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle { - GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults) + GetBatchHandle(ZenCacheValueVec_t& OutResults) : OutResults(OutResults) { Keys.reserve(OutResults.capacity()); ResultIndexes.reserve(OutResults.capacity()); @@ -1340,11 +1340,11 @@ struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle std::vector<IoHash> Keys; std::vector<size_t> ResultIndexes; - std::vector<ZenCacheValue>& OutResults; + ZenCacheValueVec_t& OutResults; }; ZenCacheDiskLayer::CacheBucket::GetBatchHandle* -ZenCacheDiskLayer::CacheBucket::BeginGetBatch(std::vector<ZenCacheValue>& OutResult) +ZenCacheDiskLayer::CacheBucket::BeginGetBatch(ZenCacheValueVec_t& OutResult) { ZEN_TRACE_CPU("Z$::Bucket::BeginGetBatch"); return new GetBatchHandle(OutResult); @@ -1364,13 +1364,13 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept if (!Batch->ResultIndexes.empty()) { - std::vector<DiskLocation> StandaloneDiskLocations; - std::vector<size_t> StandaloneKeyIndexes; - std::vector<size_t> MemCachedKeyIndexes; - std::vector<DiskLocation> InlineDiskLocations; - std::vector<BlockStoreLocation> InlineBlockLocations; - std::vector<size_t> InlineKeyIndexes; - std::vector<bool> FillRawHashAndRawSize(Batch->Keys.size(), false); + eastl::fixed_vector<DiskLocation, 16> StandaloneDiskLocations; + eastl::fixed_vector<size_t, 16> StandaloneKeyIndexes; + eastl::fixed_vector<size_t, 16> MemCachedKeyIndexes; + eastl::fixed_vector<DiskLocation, 16> InlineDiskLocations; + eastl::fixed_vector<BlockStoreLocation, 16> InlineBlockLocations; + eastl::fixed_vector<size_t, 16> InlineKeyIndexes; + eastl::fixed_vector<bool, 16> FillRawHashAndRawSize(Batch->Keys.size(), false); { RwLock::SharedLockScope IndexLock(m_IndexLock); for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++) @@ -1526,33 +1526,35 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept if (!InlineDiskLocations.empty()) { ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline"); - m_BlockStore.IterateChunks(InlineBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool { - // Only read into memory the IoBuffers we could potentially add to memcache - const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 1u * 1024u); - m_BlockStore.IterateBlock( - InlineBlockLocations, - ChunkIndexes, - [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, - const void* Data, - uint64_t Size) -> bool { - if (Data != nullptr) - { - FillOne(InlineDiskLocations[ChunkIndex], - InlineKeyIndexes[ChunkIndex], - IoBufferBuilder::MakeCloneFromMemory(Data, Size)); - } - return true; - }, - [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, - BlockStoreFile& File, - uint64_t Offset, - uint64_t Size) -> bool { - FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size)); - return true; - }, - LargeChunkSizeLimit); - return true; - }); + m_BlockStore.IterateChunks( + std::span{begin(InlineBlockLocations), end(InlineBlockLocations)}, + [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool { + // Only read into memory the IoBuffers we could potentially add to memcache + const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 1u * 1024u); + m_BlockStore.IterateBlock( + std::span{begin(InlineBlockLocations), end(InlineBlockLocations)}, + ChunkIndexes, + [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, + const void* Data, + uint64_t Size) -> bool { + if (Data != nullptr) + { + FillOne(InlineDiskLocations[ChunkIndex], + InlineKeyIndexes[ChunkIndex], + IoBufferBuilder::MakeCloneFromMemory(Data, Size)); + } + return true; + }, + [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex, + BlockStoreFile& File, + uint64_t Offset, + uint64_t Size) -> bool { + FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size)); + return true; + }, + LargeChunkSizeLimit); + return true; + }); } if (!StandaloneDiskLocations.empty()) @@ -3581,15 +3583,29 @@ ZenCacheDiskLayer::~ZenCacheDiskLayer() } } +template<typename T, typename U> +struct equal_to_2 : public eastl::binary_function<T, U, bool> +{ + constexpr bool operator()(const T& a, const U& b) const { return a == b; } + + template<typename T_ = T, + typename U_ = U, + typename = eastl::enable_if_t<!eastl::is_same_v<eastl::remove_const_t<T_>, eastl::remove_const_t<U_>>>> + constexpr bool operator()(const U& b, const T& a) const + { + return b == a; + } +}; + ZenCacheDiskLayer::CacheBucket* ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) { ZEN_TRACE_CPU("Z$::GetOrCreateBucket"); - const auto BucketName = std::string(InBucket); { RwLock::SharedLockScope SharedLock(m_Lock); - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) + if (auto It = m_Buckets.find_as(InBucket, std::hash<std::string_view>(), equal_to_2<std::string, std::string_view>()); + It != m_Buckets.end()) { return It->second.get(); } @@ -3597,31 +3613,32 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) // We create the bucket without holding a lock since contructor calls GcManager::AddGcReferencer which takes an exclusive lock. // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock - std::unique_ptr<CacheBucket> Bucket( - std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); + std::unique_ptr<CacheBucket> Bucket(std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, InBucket, m_Configuration.BucketConfig)); RwLock::ExclusiveLockScope Lock(m_Lock); - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) + if (auto It = m_Buckets.find_as(InBucket, std::hash<std::string_view>(), equal_to_2<std::string, std::string_view>()); + It != m_Buckets.end()) { return It->second.get(); } std::filesystem::path BucketPath = m_RootDir; - BucketPath /= BucketName; + BucketPath /= InBucket; try { if (!Bucket->OpenOrCreate(BucketPath)) { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", InBucket, m_RootDir); return nullptr; } } catch (const std::exception& Err) { - ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); + ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", InBucket, BucketPath, Err.what()); throw; } + std::string BucketName{InBucket}; CacheBucket* Result = Bucket.get(); m_Buckets.emplace(BucketName, std::move(Bucket)); if (m_CapturedBuckets) @@ -3720,7 +3737,7 @@ ZenCacheDiskLayer::EndPutBatch(PutBatchHandle* Batch) noexcept struct ZenCacheDiskLayer::GetBatchHandle { - GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults) {} + GetBatchHandle(ZenCacheValueVec_t& OutResults) : OutResults(OutResults) {} struct BucketHandle { CacheBucket* Bucket; @@ -3780,13 +3797,13 @@ struct ZenCacheDiskLayer::GetBatchHandle return NewBucketHandle; } - RwLock Lock; - std::vector<BucketHandle> BucketHandles; - std::vector<ZenCacheValue>& OutResults; + RwLock Lock; + eastl::fixed_vector<BucketHandle, 4> BucketHandles; + ZenCacheValueVec_t& OutResults; }; ZenCacheDiskLayer::GetBatchHandle* -ZenCacheDiskLayer::BeginGetBatch(std::vector<ZenCacheValue>& OutResults) +ZenCacheDiskLayer::BeginGetBatch(ZenCacheValueVec_t& OutResults) { return new GetBatchHandle(OutResults); } diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index cca51e63e..97e26a38d 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -20,6 +20,8 @@ #include <zencore/memory/llm.h> +#include <EASTL/fixed_vector.h> + ////////////////////////////////////////////////////////////////////////// namespace zen { @@ -89,7 +91,7 @@ GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key) return false; } IoHash Hash = HashField.AsHash(); - Key = CacheKey::Create(*Bucket, Hash); + Key = CacheKey::CreateValidated(std::move(*Bucket), Hash); return true; } @@ -305,7 +307,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co } DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; - std::vector<bool> Results; + eastl::fixed_vector<bool, 32> Results; CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); for (CbFieldView RequestField : RequestsArray) @@ -481,16 +483,15 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb bool Exists = false; bool ReadFromUpstream = false; }; - struct RecordRequestData + struct RecordRequestData : public CacheKeyRequest { - CacheKeyRequest Upstream; - CbObjectView RecordObject; - IoBuffer RecordCacheValue; - CacheRecordPolicy DownstreamPolicy; - std::vector<ValueRequestData> Values; - bool Complete = false; - const UpstreamEndpointInfo* Source = nullptr; - uint64_t ElapsedTimeUs; + CbObjectView RecordObject; + IoBuffer RecordCacheValue; + CacheRecordPolicy DownstreamPolicy; + eastl::fixed_vector<ValueRequestData, 4> Values; + bool Complete = false; + const UpstreamEndpointInfo* Source = nullptr; + uint64_t ElapsedTimeUs; }; std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); @@ -503,8 +504,8 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb const bool HasUpstream = m_UpstreamCache.IsActive(); - std::vector<RecordRequestData> Requests; - std::vector<size_t> UpstreamIndexes; + eastl::fixed_vector<RecordRequestData, 16> Requests; + eastl::fixed_vector<size_t, 16> UpstreamIndexes; auto ParseValues = [](RecordRequestData& Request) { CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView(); @@ -535,7 +536,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - CacheKey& Key = Request.Upstream.Key; + CacheKey& Key = Request.Key; if (!GetRpcRequestCacheKey(KeyObject, Key)) { return CbPackage{}; @@ -707,7 +708,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb for (size_t Index : UpstreamIndexes) { RecordRequestData& Request = Requests[Index]; - UpstreamRequests.push_back(&Request.Upstream); + UpstreamRequests.push_back(&Request); if (Request.Values.size()) { @@ -721,13 +722,13 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None; Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy); } - Request.Upstream.Policy = Builder.Build(); + Request.Policy = Builder.Build(); } else { // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants, // and convert the CacheRecordPolicy to an upstream policy - Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream(); + Request.Policy = Request.DownstreamPolicy.ConvertToUpstream(); } } @@ -737,10 +738,9 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb return; } - RecordRequestData& Request = - *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream)); + RecordRequestData& Request = *static_cast<RecordRequestData*>(&Params.Request); Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); - const CacheKey& Key = Request.Upstream.Key; + const CacheKey& Key = Request.Key; Stopwatch Timer; auto TimeGuard = MakeGuard([&Timer, &Request]() { Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); }); if (!Request.RecordObject) @@ -832,10 +832,12 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb CbPackage ResponsePackage; CbObjectWriter ResponseObject{2048}; + ResponsePackage.ReserveAttachments(Requests.size()); + ResponseObject.BeginArray("Result"sv); for (RecordRequestData& Request : Requests) { - const CacheKey& Key = Request.Upstream.Key; + const CacheKey& Key = Request.Key; if (Request.Complete || (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) { @@ -910,11 +912,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector<bool> BatchResults; - std::vector<size_t> BatchResultIndexes; - std::vector<bool> Results; - std::vector<CacheKey> UpstreamCacheKeys; - uint64_t RequestCount = RequestsArray.Num(); + std::vector<bool> BatchResults; + eastl::fixed_vector<size_t, 32> BatchResultIndexes; + eastl::fixed_vector<bool, 32> Results; + eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys; + + uint64_t RequestCount = RequestsArray.Num(); { Results.reserve(RequestCount); std::unique_ptr<ZenCacheStore::PutBatch> Batch; @@ -1099,15 +1102,15 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO uint64_t RawSize = 0; CompressedBuffer Result; }; - std::vector<RequestData> Requests; + eastl::fixed_vector<RequestData, 16> Requests; - std::vector<size_t> RemoteRequestIndexes; + eastl::fixed_vector<size_t, 16> RemoteRequestIndexes; const bool HasUpstream = m_UpstreamCache.IsActive(); - CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector<ZenCacheValue> CacheValues; - const uint64_t RequestCount = RequestsArray.Num(); + CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); + ZenCacheValueVec_t CacheValues; + const uint64_t RequestCount = RequestsArray.Num(); CacheValues.reserve(RequestCount); { std::unique_ptr<ZenCacheStore::GetBatch> Batch; @@ -1136,7 +1139,6 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO CacheKey& Key = Request.Key; CachePolicy Policy = Request.Policy; - ZenCacheValue CacheValue; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { if (Batch) @@ -1276,6 +1278,9 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Response"); CbPackage RpcResponse; CbObjectWriter ResponseObject{1024}; + + RpcResponse.ReserveAttachments(Requests.size()); + ResponseObject.BeginArray("Result"sv); for (const RequestData& Request : Requests) { @@ -1642,7 +1647,7 @@ CacheRpcHandler::GetLocalCacheValues(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); - std::vector<ZenCacheValue> Chunks; + ZenCacheValueVec_t Chunks; Chunks.reserve(ValueRequests.size()); { std::unique_ptr<ZenCacheStore::GetBatch> Batch; @@ -1796,6 +1801,8 @@ CacheRpcHandler::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequest CbPackage RpcResponse; CbObjectWriter Writer{1024}; + RpcResponse.ReserveAttachments(Requests.size()); + Writer.BeginArray("Result"sv); for (ChunkRequest& Request : Requests) { diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index 133cb42d7..7d277329e 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -178,13 +178,13 @@ ZenCacheNamespace::EndPutBatch(PutBatchHandle* Batch) noexcept struct ZenCacheNamespace::GetBatchHandle { - GetBatchHandle(std::vector<ZenCacheValue>& OutResult) : Results(OutResult) {} - std::vector<ZenCacheValue>& Results; + GetBatchHandle(ZenCacheValueVec_t& OutResult) : Results(OutResult) {} + ZenCacheValueVec_t& Results; ZenCacheDiskLayer::GetBatchHandle* DiskLayerHandle = nullptr; }; ZenCacheNamespace::GetBatchHandle* -ZenCacheNamespace::BeginGetBatch(std::vector<ZenCacheValue>& OutResult) +ZenCacheNamespace::BeginGetBatch(ZenCacheValueVec_t& OutResult) { ZenCacheNamespace::GetBatchHandle* Handle = new ZenCacheNamespace::GetBatchHandle(OutResult); Handle->DiskLayerHandle = m_DiskLayer.BeginGetBatch(OutResult); @@ -580,7 +580,7 @@ ZenCacheStore::PutBatch::~PutBatch() } } -ZenCacheStore::GetBatch::GetBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<ZenCacheValue>& OutResult) +ZenCacheStore::GetBatch::GetBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, ZenCacheValueVec_t& OutResult) : m_CacheStore(CacheStore) , Results(OutResult) { diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index b0b4f22cb..05400c784 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -12,8 +12,9 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> ZEN_THIRD_PARTY_INCLUDES_END +#include <EASTL/string.h> +#include <EASTL/unordered_map.h> #include <filesystem> -#include <unordered_map> namespace zen { @@ -169,7 +170,7 @@ public: ~ZenCacheDiskLayer(); struct GetBatchHandle; - GetBatchHandle* BeginGetBatch(std::vector<ZenCacheValue>& OutResult); + GetBatchHandle* BeginGetBatch(ZenCacheValueVec_t& OutResult); void EndGetBatch(GetBatchHandle* Batch) noexcept; bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& BatchHandle); @@ -216,13 +217,16 @@ public: */ struct CacheBucket : public GcReferencer { - CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config); + CacheBucket(GcManager& Gc, + std::atomic_uint64_t& OuterCacheMemoryUsage, + std::string_view BucketName, + const BucketConfiguration& Config); ~CacheBucket(); bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); struct GetBatchHandle; - GetBatchHandle* BeginGetBatch(std::vector<ZenCacheValue>& OutResult); + GetBatchHandle* BeginGetBatch(ZenCacheValueVec_t& OutResult); void EndGetBatch(GetBatchHandle* Batch) noexcept; bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle); @@ -486,18 +490,20 @@ private: bool StartAsyncMemCacheTrim(); void MemCacheTrim(); - GcManager& m_Gc; - JobQueue& m_JobQueue; - std::filesystem::path m_RootDir; - Configuration m_Configuration; - std::atomic_uint64_t m_TotalMemCachedSize{}; - std::atomic_bool m_IsMemCacheTrimming = false; - std::atomic<GcClock::Tick> m_NextAllowedTrimTick; - mutable RwLock m_Lock; - std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; - std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; - uint32_t m_UpdateCaptureRefCounter = 0; - std::unique_ptr<std::vector<std::string>> m_CapturedBuckets; + typedef eastl::unordered_map<std::string, std::unique_ptr<CacheBucket>, std::hash<std::string>, std::equal_to<std::string>> BucketMap_t; + + GcManager& m_Gc; + JobQueue& m_JobQueue; + std::filesystem::path m_RootDir; + Configuration m_Configuration; + std::atomic_uint64_t m_TotalMemCachedSize{}; + std::atomic_bool m_IsMemCacheTrimming = false; + std::atomic<GcClock::Tick> m_NextAllowedTrimTick; + mutable RwLock m_Lock; + BucketMap_t m_Buckets; + std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; + uint32_t m_UpdateCaptureRefCounter = 0; + std::unique_ptr<std::vector<std::string>> m_CapturedBuckets; ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete; ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete; diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h index 9b45c7b21..521c78bb1 100644 --- a/src/zenstore/include/zenstore/cache/cacheshared.h +++ b/src/zenstore/include/zenstore/cache/cacheshared.h @@ -6,6 +6,8 @@ #include <zencore/iohash.h> #include <zenstore/gc.h> +#include <EASTL/fixed_vector.h> + #include <gsl/gsl-lite.hpp> #include <unordered_map> @@ -32,6 +34,8 @@ struct ZenCacheValue IoHash RawHash = IoHash::Zero; }; +typedef eastl::fixed_vector<ZenCacheValue, 16> ZenCacheValueVec_t; + struct CacheValueDetails { struct ValueDetails diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h index 82fec9b0e..5e056cf2d 100644 --- a/src/zenstore/include/zenstore/cache/structuredcachestore.h +++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h @@ -86,7 +86,7 @@ public: void EndPutBatch(PutBatchHandle* Batch) noexcept; struct GetBatchHandle; - GetBatchHandle* BeginGetBatch(std::vector<ZenCacheValue>& OutResults); + GetBatchHandle* BeginGetBatch(ZenCacheValueVec_t& OutResults); void EndGetBatch(GetBatchHandle* Batch) noexcept; bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); @@ -220,14 +220,14 @@ public: class GetBatch { public: - GetBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<ZenCacheValue>& OutResult); + GetBatch(ZenCacheStore& CacheStore, std::string_view Namespace, ZenCacheValueVec_t& OutResult); ~GetBatch(); private: ZenCacheStore& m_CacheStore; ZenCacheNamespace* m_Store = nullptr; ZenCacheNamespace::GetBatchHandle* m_NamespaceBatchHandle = nullptr; - std::vector<ZenCacheValue>& Results; + ZenCacheValueVec_t& Results; friend class ZenCacheStore; }; diff --git a/src/zenstore/xmake.lua b/src/zenstore/xmake.lua index f0bd64d2e..031a66829 100644 --- a/src/zenstore/xmake.lua +++ b/src/zenstore/xmake.lua @@ -8,3 +8,4 @@ target('zenstore') add_includedirs("include", {public=true}) add_deps("zencore", "zenutil") add_packages("vcpkg::robin-map") + add_packages("vcpkg::eastl", {public=true}); diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp index a4bb759e7..e57109006 100644 --- a/src/zenutil/filebuildstorage.cpp +++ b/src/zenutil/filebuildstorage.cpp @@ -325,7 +325,7 @@ public: return {}; } - virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash) override + virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override { ZEN_UNUSED(BuildId); SimulateLatency(0, 0); @@ -337,10 +337,19 @@ public: if (std::filesystem::is_regular_file(BlockPath)) { BasicFile File(BlockPath, BasicFile::Mode::kRead); - IoBuffer Payload = File.ReadAll(); - ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload)))); - m_Stats.TotalBytesRead += Payload.GetSize(); + IoBuffer Payload; + if (RangeOffset != 0 || RangeBytes != (uint64_t)-1) + { + Payload = IoBuffer(RangeBytes); + File.Read(Payload.GetMutableView().GetData(), RangeBytes, RangeOffset); + } + else + { + Payload = File.ReadAll(); + ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload)))); + } Payload.SetContentType(ZenContentType::kCompressedBinary); + m_Stats.TotalBytesRead += Payload.GetSize(); SimulateLatency(0, Payload.GetSize()); return Payload; } diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h index 9c236310f..9d2bab170 100644 --- a/src/zenutil/include/zenutil/buildstorage.h +++ b/src/zenutil/include/zenutil/buildstorage.h @@ -40,7 +40,10 @@ public: std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, std::function<void(uint64_t, bool)>&& OnSentBytes) = 0; - virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash) = 0; + virtual IoBuffer GetBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + uint64_t RangeOffset = 0, + uint64_t RangeBytes = (uint64_t)-1) = 0; virtual std::vector<std::function<void()>> GetLargeBuildBlob( const Oid& BuildId, const IoHash& RawHash, diff --git a/src/zenutil/include/zenutil/cache/cachekey.h b/src/zenutil/include/zenutil/cache/cachekey.h index 741375946..0ab05f4f1 100644 --- a/src/zenutil/include/zenutil/cache/cachekey.h +++ b/src/zenutil/include/zenutil/cache/cachekey.h @@ -17,6 +17,12 @@ struct CacheKey static CacheKey Create(std::string_view Bucket, const IoHash& Hash) { return {.Bucket = ToLower(Bucket), .Hash = Hash}; } + // This should be used whenever the bucket name has already been validated to avoid redundant ToLower calls + static CacheKey CreateValidated(std::string&& BucketValidated, const IoHash& Hash) + { + return {.Bucket = std::move(BucketValidated), .Hash = Hash}; + } + auto operator<=>(const CacheKey& that) const { if (auto b = caseSensitiveCompareStrings(Bucket, that.Bucket); b != std::strong_ordering::equal) diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h index 852271868..2c5fc73b8 100644 --- a/src/zenutil/include/zenutil/jupiter/jupitersession.h +++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h @@ -123,7 +123,9 @@ public: std::string_view BucketId, const Oid& BuildId, const IoHash& Hash, - std::filesystem::path TempFolderPath); + std::filesystem::path TempFolderPath, + uint64_t Offset = 0, + uint64_t Size = (uint64_t)-1); JupiterResult PutMultipartBuildBlob(std::string_view Namespace, std::string_view BucketId, diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp index 309885b05..bf89ce785 100644 --- a/src/zenutil/jupiter/jupiterbuildstorage.cpp +++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp @@ -217,13 +217,15 @@ public: return WorkList; } - virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash) override + virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override { ZEN_TRACE_CPU("Jupiter::GetBuildBlob"); - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult GetBuildBlobResult = m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath); + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + CreateDirectories(m_TempFolderPath); + JupiterResult GetBuildBlobResult = + m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath, RangeOffset, RangeBytes); AddStatistic(GetBuildBlobResult); if (!GetBuildBlobResult.Success) { diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp index 06ac6ae36..68f214c06 100644 --- a/src/zenutil/jupiter/jupitersession.cpp +++ b/src/zenutil/jupiter/jupitersession.cpp @@ -698,11 +698,19 @@ JupiterSession::GetBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoHash& Hash, - std::filesystem::path TempFolderPath) + std::filesystem::path TempFolderPath, + uint64_t Offset, + uint64_t Size) { + HttpClient::KeyValueMap Headers; + if (Offset != 0 || Size != (uint64_t)-1) + { + Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", Offset, Offset + Size - 1)}); + } HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()), - TempFolderPath); + TempFolderPath, + Headers); return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv); } @@ -9,6 +9,7 @@ add_requires( "vcpkg::curl", "vcpkg::cxxopts", "vcpkg::doctest", + "vcpkg::eastl", "vcpkg::fmt", "vcpkg::gsl-lite", "vcpkg::http-parser", @@ -24,6 +25,8 @@ add_requires( "vcpkg::zlib" ) +add_defines("EASTL_STD_ITERATOR_CATEGORY_ENABLED") + set_policy("build.ccache", false) if is_plat("windows") then |