aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2025-03-07 15:23:36 +0100
committerStefan Boberg <[email protected]>2025-03-07 15:23:36 +0100
commitadbf61a119e5aabbcec274b846f1109de0093bee (patch)
treee320053d112b4e56218d7e29310c76b43f442697
parentTactical check-in to simplify merge from old branch (diff)
parentpartial block fetch (#298) (diff)
downloadzen-adbf61a119e5aabbcec274b846f1109de0093bee.tar.xz
zen-adbf61a119e5aabbcec274b846f1109de0093bee.zip
Merge remote-tracking branch 'origin/main' into sb/build-cache
-rw-r--r--CHANGELOG.md6
-rw-r--r--src/zen/cmds/builds_cmd.cpp1099
-rw-r--r--src/zen/cmds/builds_cmd.h1
-rw-r--r--src/zencore/compactbinary.cpp10
-rw-r--r--src/zencore/compactbinarybuilder.cpp11
-rw-r--r--src/zencore/compactbinarypackage.cpp14
-rw-r--r--src/zencore/include/zencore/compactbinarybuilder.h8
-rw-r--r--src/zencore/include/zencore/compactbinarypackage.h15
-rw-r--r--src/zencore/include/zencore/compositebuffer.h38
-rw-r--r--src/zencore/include/zencore/eastlutil.h20
-rw-r--r--src/zencore/include/zencore/memory/newdelete.h26
-rw-r--r--src/zencore/xmake.lua1
-rw-r--r--src/zenhttp/httpclient.cpp161
-rw-r--r--src/zenhttp/httpserver.cpp6
-rw-r--r--src/zenhttp/include/zenhttp/httpserver.h8
-rw-r--r--src/zenhttp/packageformat.cpp24
-rw-r--r--src/zenhttp/servers/httpsys.cpp18
-rw-r--r--src/zenserver/objectstore/objectstore.cpp27
-rw-r--r--src/zenserver/objectstore/objectstore.h4
-rw-r--r--src/zenserver/projectstore/httpprojectstore.cpp32
-rw-r--r--src/zenserver/projectstore/projectstore.cpp45
-rw-r--r--src/zenserver/workspaces/httpworkspaces.cpp12
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp121
-rw-r--r--src/zenstore/cache/cacherpc.cpp73
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp8
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h38
-rw-r--r--src/zenstore/include/zenstore/cache/cacheshared.h4
-rw-r--r--src/zenstore/include/zenstore/cache/structuredcachestore.h6
-rw-r--r--src/zenstore/xmake.lua1
-rw-r--r--src/zenutil/filebuildstorage.cpp17
-rw-r--r--src/zenutil/include/zenutil/buildstorage.h5
-rw-r--r--src/zenutil/include/zenutil/cache/cachekey.h6
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupitersession.h4
-rw-r--r--src/zenutil/jupiter/jupiterbuildstorage.cpp10
-rw-r--r--src/zenutil/jupiter/jupitersession.cpp12
-rw-r--r--xmake.lua3
36 files changed, 1293 insertions, 601 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f8858e4b9..b2559ddaa 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,10 @@
##
+- **EXPERIMENTAL** `zen builds`
+ - Improvement: Do partial requests of blocks if not all of the block is needed
+ - Improvement: Better progress/statistics on download
+ - Bugfix: Ensure that temporary folder for Jupiter downloads exists during verify phase
+
+## 5.6.0
- Feature: Added support for `--trace`, `--tracehost` and `--tracefile` options to zen CLI command
- Improvement: When logging HTTP responses, the body is now sanity checked to ensure it is human readable, and the length of the output is capped to prevent inadvertent log bloat
- Improvement: Instrumented `zen builds download` command code so we get more useful Insights output
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index eb0650c4d..1c9476b96 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -231,7 +231,10 @@ namespace {
return AuthToken;
}
- CompositeBuffer WriteToTempFileIfNeeded(const CompositeBuffer& Buffer, const std::filesystem::path& TempFolderPath, const IoHash& Hash)
+ CompositeBuffer WriteToTempFileIfNeeded(const CompositeBuffer& Buffer,
+ const std::filesystem::path& TempFolderPath,
+ const IoHash& Hash,
+ const std::string& Suffix = {})
{
// If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory
std::span<const SharedBuffer> Segments = Buffer.GetSegments();
@@ -241,7 +244,7 @@ namespace {
{
return Buffer;
}
- std::filesystem::path TempFilePath = (TempFolderPath / Hash.ToHexString()).make_preferred();
+ std::filesystem::path TempFilePath = (TempFolderPath / (Hash.ToHexString() + Suffix)).make_preferred();
return CompositeBuffer(WriteToTempFile(Buffer, TempFilePath));
}
@@ -302,7 +305,7 @@ namespace {
return FilteredPerSecond;
}
- uint64_t GetElapsedTime() const
+ uint64_t GetElapsedTimeUS() const
{
if (StartTimeUS == (uint64_t)-1)
{
@@ -1738,8 +1741,8 @@ namespace {
ProgressBar.Finish();
- GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTime();
- UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTime();
+ GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS();
+ UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
}
}
@@ -2069,9 +2072,9 @@ namespace {
});
ProgressBar.Finish();
- UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTime();
- GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTime();
- LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTime();
+ UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
+ GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTimeUS();
+ LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS();
}
}
@@ -3358,17 +3361,8 @@ namespace {
return ChunkTargetPtrs;
};
- bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath,
- const ChunkedFolderContent& RemoteContent,
- std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- const CompositeBuffer& DecompressedBlockBuffer,
- const ChunkedContentLookup& Lookup,
- std::atomic<bool>* RemoteChunkIndexNeedsCopyFromSourceFlags,
- std::atomic<uint32_t>& OutChunksComplete,
- std::atomic<uint64_t>& OutBytesWritten)
+ struct BlockWriteOps
{
- ZEN_TRACE_CPU("WriteBlockToDisk");
-
std::vector<CompositeBuffer> ChunkBuffers;
struct WriteOpData
{
@@ -3376,7 +3370,79 @@ namespace {
size_t ChunkBufferIndex;
};
std::vector<WriteOpData> WriteOps;
+ };
+ void WriteBlockChunkOps(const std::filesystem::path& CacheFolderPath,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& Lookup,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const BlockWriteOps Ops,
+ std::atomic<uint32_t>& OutChunksComplete,
+ std::atomic<uint64_t>& OutBytesWritten)
+ {
+ ZEN_TRACE_CPU("WriteBlockChunkOps");
+ {
+ WriteFileCache OpenFileCache;
+ for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+ const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex];
+ const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex;
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <=
+ RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]);
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0);
+ const uint64_t ChunkSize = Chunk.GetSize();
+ const uint64_t FileOffset = WriteOp.Target->Offset;
+ const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]);
+
+ OpenFileCache.WriteToFile<CompositeBuffer>(
+ SequenceIndex,
+ [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
+ return GetTempChunkedSequenceFileName(CacheFolderPath,
+ RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ },
+ Chunk,
+ FileOffset,
+ RemoteContent.RawSizes[PathIndex]);
+ OutBytesWritten += ChunkSize;
+ }
+ }
+ if (!AbortFlag)
+ {
+ // Write tracking, updating this must be done without any files open (WriteFileCache)
+ for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
+ {
+ const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex;
+ if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
+ {
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ const IoHash VerifyChunkHash =
+ IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written hunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash));
+ }
+ std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
+ }
+ }
+ OutChunksComplete += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size());
+ }
+ }
+
+ bool GetBlockWriteOps(const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& Lookup,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ const CompositeBuffer& DecompressedBlockBuffer,
+ BlockWriteOps& OutOps)
+ {
+ ZEN_TRACE_CPU("GetBlockWriteOps");
SharedBuffer BlockBuffer = DecompressedBlockBuffer.Flatten();
uint64_t HeaderSize = 0;
if (IterateChunkBlock(
@@ -3402,91 +3468,207 @@ namespace {
ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs)
{
- WriteOps.push_back(WriteOpData{.Target = Target, .ChunkBufferIndex = ChunkBuffers.size()});
+ OutOps.WriteOps.push_back(
+ BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()});
}
- ChunkBuffers.emplace_back(std::move(Decompressed));
+ OutOps.ChunkBuffers.emplace_back(std::move(Decompressed));
}
}
}
},
HeaderSize))
{
- if (!WriteOps.empty())
+ std::sort(OutOps.WriteOps.begin(),
+ OutOps.WriteOps.end(),
+ [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
+ return false;
+ }
+ return Lhs.Target->Offset < Rhs.Target->Offset;
+ });
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkBlockDescription& BlockDescription,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const CompositeBuffer& BlockBuffer,
+ const ChunkedContentLookup& Lookup,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ std::atomic<uint32_t>& OutChunksComplete,
+ std::atomic<uint64_t>& OutBytesWritten)
+ {
+ ZEN_TRACE_CPU("WriteBlockToDisk");
+
+ IoHash BlockRawHash;
+ uint64_t BlockRawSize;
+ CompressedBuffer CompressedBlockBuffer = CompressedBuffer::FromCompressed(BlockBuffer, BlockRawHash, BlockRawSize);
+ if (!CompressedBlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", BlockDescription.BlockHash));
+ }
+
+ if (BlockRawHash != BlockDescription.BlockHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Block {} header has a mismatching raw hash {}", BlockDescription.BlockHash, BlockRawHash));
+ }
+
+ CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite();
+ if (!DecompressedBlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} failed to decompress", BlockDescription.BlockHash));
+ }
+
+ ZEN_ASSERT_SLOW(BlockDescription.BlockHash == IoHash::HashBuffer(DecompressedBlockBuffer));
+
+ BlockWriteOps Ops;
+ if (GetBlockWriteOps(RemoteContent,
+ Lookup,
+ SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DecompressedBlockBuffer,
+ Ops))
+ {
+ WriteBlockChunkOps(CacheFolderPath,
+ RemoteContent,
+ Lookup,
+ SequenceIndexChunksLeftToWriteCounters,
+ Ops,
+ OutChunksComplete,
+ OutBytesWritten);
+ return true;
+ }
+ return false;
+ }
+
+ bool GetPartialBlockWriteOps(const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& Lookup,
+ const ChunkBlockDescription& BlockDescription,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ const CompositeBuffer& PartialBlockBuffer,
+ uint32_t FirstIncludedBlockChunkIndex,
+ uint32_t LastIncludedBlockChunkIndex,
+ BlockWriteOps& OutOps)
+ {
+ ZEN_TRACE_CPU("GetPartialBlockWriteOps");
+ uint32_t OffsetInBlock = 0;
+ for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++)
+ {
+ const uint32_t ChunkCompressedSize = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
+ const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
+ if (auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); It != Lookup.ChunkHashToChunkIndex.end())
{
- if (!AbortFlag)
+ const uint32_t ChunkIndex = It->second;
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, Lookup, ChunkIndex);
+
+ if (!ChunkTargetPtrs.empty())
{
- std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOpData& Lhs, const WriteOpData& Rhs) {
- if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ bool NeedsWrite = true;
+ if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false))
+ {
+ CompositeBuffer Chunk = PartialBlockBuffer.Mid(OffsetInBlock, ChunkCompressedSize);
+ IoHash VerifyChunkHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, VerifyChunkHash, VerifyRawSize);
+ if (!Compressed)
{
- return true;
+ ZEN_ASSERT(false);
}
- if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ if (VerifyChunkHash != ChunkHash)
{
- return false;
+ ZEN_ASSERT(false);
}
- return Lhs.Target->Offset < Rhs.Target->Offset;
- });
-
- {
- WriteFileCache OpenFileCache;
- for (const WriteOpData& WriteOp : WriteOps)
+ if (VerifyRawSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex])
{
- if (AbortFlag)
- {
- break;
- }
- const CompositeBuffer& Chunk = ChunkBuffers[WriteOp.ChunkBufferIndex];
- const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex;
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <=
- RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]);
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0);
- const uint64_t ChunkSize = Chunk.GetSize();
- const uint64_t FileOffset = WriteOp.Target->Offset;
- const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
- ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]);
-
- OpenFileCache.WriteToFile<CompositeBuffer>(
- SequenceIndex,
- [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- Chunk,
- FileOffset,
- RemoteContent.RawSizes[PathIndex]);
- OutBytesWritten += ChunkSize;
+ ZEN_ASSERT(false);
}
- }
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("WriteBlockToDisk_VerifyHash");
-
- // Write tracking, updating this must be done without any files open (WriteFileCache)
- for (const WriteOpData& WriteOp : WriteOps)
+ CompositeBuffer Decompressed = Compressed.DecompressToComposite();
+ if (!Decompressed)
{
- const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
- {
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- const IoHash VerifyChunkHash = IoHash::HashBuffer(
- IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
- }
- std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
- GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
- }
+ throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash));
}
+ ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed));
+ ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs)
+ {
+ OutOps.WriteOps.push_back(
+ BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()});
+ }
+ OutOps.ChunkBuffers.emplace_back(std::move(Decompressed));
}
- OutChunksComplete += gsl::narrow<uint32_t>(ChunkBuffers.size());
}
}
+
+ OffsetInBlock += ChunkCompressedSize;
+ }
+ std::sort(OutOps.WriteOps.begin(),
+ OutOps.WriteOps.end(),
+ [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
+ return false;
+ }
+ return Lhs.Target->Offset < Rhs.Target->Offset;
+ });
+ return true;
+ }
+
+ bool WritePartialBlockToDisk(const std::filesystem::path& CacheFolderPath,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkBlockDescription& BlockDescription,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const CompositeBuffer& PartialBlockBuffer,
+ uint32_t FirstIncludedBlockChunkIndex,
+ uint32_t LastIncludedBlockChunkIndex,
+ const ChunkedContentLookup& Lookup,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ std::atomic<uint32_t>& OutChunksComplete,
+ std::atomic<uint64_t>& OutBytesWritten)
+ {
+ ZEN_TRACE_CPU("WritePartialBlockToDisk");
+ BlockWriteOps Ops;
+ if (GetPartialBlockWriteOps(RemoteContent,
+ Lookup,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ PartialBlockBuffer,
+ FirstIncludedBlockChunkIndex,
+ LastIncludedBlockChunkIndex,
+ Ops))
+ {
+ WriteBlockChunkOps(CacheFolderPath,
+ RemoteContent,
+ Lookup,
+ SequenceIndexChunksLeftToWriteCounters,
+ Ops,
+ OutChunksComplete,
+ OutBytesWritten);
return true;
}
- return false;
+ else
+ {
+ return false;
+ }
}
SharedBuffer Decompress(const CompositeBuffer& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize)
@@ -3571,6 +3753,7 @@ namespace {
CompositeBuffer&& CompressedPart,
std::atomic<uint64_t>& WriteToDiskBytes)
{
+ ZEN_TRACE_CPU("StreamDecompress");
const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash);
TemporaryFile DecompressedTemp;
std::error_code Ec;
@@ -3633,10 +3816,9 @@ namespace {
WorkerThreadPool& NetworkPool,
std::atomic<uint64_t>& WriteToDiskBytes,
std::atomic<uint64_t>& BytesDownloaded,
- std::atomic<uint64_t>& LooseChunksBytes,
- std::atomic<uint64_t>& DownloadedChunks,
- std::atomic<uint32_t>& ChunksComplete,
- std::atomic<uint64_t>& MultipartAttachmentCount)
+ std::atomic<uint64_t>& MultipartAttachmentCount,
+ std::function<void(uint64_t DowloadedBytes)>&& OnDownloadComplete,
+ std::function<void()>&& OnWriteComplete)
{
ZEN_TRACE_CPU("DownloadLargeBlob");
@@ -3665,22 +3847,20 @@ namespace {
Workload,
ChunkHash,
&BytesDownloaded,
- &LooseChunksBytes,
+ OnDownloadComplete = std::move(OnDownloadComplete),
+ OnWriteComplete = std::move(OnWriteComplete),
&WriteToDiskBytes,
- &DownloadedChunks,
- &ChunksComplete,
SequenceIndexChunksLeftToWriteCounters,
ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>(
ChunkTargetPtrs)](uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining) {
BytesDownloaded += Chunk.GetSize();
- LooseChunksBytes += Chunk.GetSize();
if (!AbortFlag.load())
{
Workload->TempFile.Write(Chunk.GetView(), Offset);
if (Chunk.GetSize() == BytesRemaining)
{
- DownloadedChunks++;
+ OnDownloadComplete(Workload->TempFile.FileSize());
Work.ScheduleWork(
WritePool, // GetSyncWorkerPool(),//
@@ -3690,8 +3870,7 @@ namespace {
ChunkHash,
Workload,
Offset,
- BytesRemaining,
- &ChunksComplete,
+ OnWriteComplete = std::move(OnWriteComplete),
&WriteToDiskBytes,
SequenceIndexChunksLeftToWriteCounters,
ChunkTargetPtrs](std::atomic<bool>&) {
@@ -3725,7 +3904,7 @@ namespace {
CompositeBuffer(std::move(CompressedPart)),
WriteToDiskBytes);
NeedHashVerify = false;
- ChunksComplete++;
+ OnWriteComplete();
}
else
{
@@ -3748,7 +3927,7 @@ namespace {
CompositeBuffer(Chunk),
OpenFileCache,
WriteToDiskBytes);
- ChunksComplete++;
+ OnWriteComplete();
}
}
@@ -3760,12 +3939,12 @@ namespace {
const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
{
- ZEN_TRACE_CPU("VerifyChunkHash");
-
const IoHash& SequenceRawHash =
RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
if (NeedHashVerify)
{
+ ZEN_TRACE_CPU("VerifyChunkHash");
+
const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
if (VerifyChunkHash != ChunkHash)
@@ -3816,6 +3995,7 @@ namespace {
const ChunkedFolderContent& RemoteContent,
const std::vector<ChunkBlockDescription>& BlockDescriptions,
const std::vector<IoHash>& LooseChunkHashes,
+ bool AllowPartialBlockRequests,
bool WipeTargetFolder,
FolderContent& OutLocalFolderState)
{
@@ -3967,11 +4147,17 @@ namespace {
}
}
- std::atomic<uint32_t> ChunkCountWritten = 0;
+ uint64_t TotalRequestCount = 0;
+ std::atomic<uint64_t> RequestsComplete = 0;
+ std::atomic<uint32_t> ChunkCountWritten = 0;
+ std::atomic<size_t> TotalPartWriteCount = 0;
+ std::atomic<size_t> WritePartsComplete = 0;
{
ZEN_TRACE_CPU("HandleChunks");
+ Stopwatch WriteTimer;
+
FilteredRate FilteredDownloadedBytesPerSecond;
FilteredRate FilteredWrittenBytesPerSecond;
@@ -4010,6 +4196,8 @@ namespace {
}
else
{
+ TotalRequestCount++;
+ TotalPartWriteCount++;
Work.ScheduleWork(
NetworkPool, // GetSyncWorkerPool(),//
[&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) {
@@ -4020,25 +4208,39 @@ namespace {
FilteredDownloadedBytesPerSecond.Start();
if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
{
- DownloadLargeBlob(Storage,
- Path / ZenTempChunkFolderName,
- CacheFolderPath,
- RemoteContent,
- RemoteLookup,
- BuildId,
- ChunkHash,
- PreferredMultipartChunkSize,
- ChunkTargetPtrs,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- NetworkPool,
- WriteToDiskBytes,
- BytesDownloaded,
- LooseChunksBytes,
- DownloadedChunks,
- ChunkCountWritten,
- MultipartAttachmentCount);
+ DownloadLargeBlob(
+ Storage,
+ Path / ZenTempChunkFolderName,
+ CacheFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ BuildId,
+ ChunkHash,
+ PreferredMultipartChunkSize,
+ ChunkTargetPtrs,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ NetworkPool,
+ WriteToDiskBytes,
+ BytesDownloaded,
+ MultipartAttachmentCount,
+ [&](uint64_t BytesDownloaded) {
+ LooseChunksBytes += BytesDownloaded;
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ },
+ [&]() {
+ ChunkCountWritten++;
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ });
}
else
{
@@ -4049,6 +4251,11 @@ namespace {
}
BytesDownloaded += CompressedPart.GetSize();
LooseChunksBytes += CompressedPart.GetSize();
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(CompressedPart)),
Path / ZenTempChunkFolderName,
ChunkHash);
@@ -4077,6 +4284,11 @@ namespace {
CompositeBuffer(CompressedPart),
WriteToDiskBytes);
ChunkCountWritten++;
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
NeedHashVerify = false;
}
else
@@ -4096,10 +4308,17 @@ namespace {
OpenFileCache,
WriteToDiskBytes);
ChunkCountWritten++;
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
}
if (!AbortFlag)
{
+ WritePartsComplete++;
+
// Write tracking, updating this must be done without any files open
// (WriteFileCache)
for (const ChunkedContentLookup::ChunkSequenceLocation* Location :
@@ -4109,12 +4328,12 @@ namespace {
if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(
1) == 1)
{
- ZEN_TRACE_CPU("UpdateFolder_VerifyHash");
-
const IoHash& SequenceRawHash =
RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
if (NeedHashVerify)
{
+ ZEN_TRACE_CPU("UpdateFolder_VerifyHash");
+
const IoHash VerifyChunkHash =
IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
GetTempChunkedSequenceFileName(CacheFolderPath,
@@ -4155,6 +4374,8 @@ namespace {
break;
}
+ TotalPartWriteCount++;
+
Work.ScheduleWork(
WritePool, // GetSyncWorkerPool(),//
[&, CopyDataIndex](std::atomic<bool>&) {
@@ -4166,245 +4387,428 @@ namespace {
const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.LocalSequenceIndex];
const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
- if (!CopyData.TargetChunkLocationPtrs.empty())
- {
- uint64_t CacheLocalFileBytesRead = 0;
+ ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty());
- size_t TargetStart = 0;
- const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(
- CopyData.TargetChunkLocationPtrs);
+ uint64_t CacheLocalFileBytesRead = 0;
- struct WriteOp
- {
- const ChunkedContentLookup::ChunkSequenceLocation* Target;
- uint64_t CacheFileOffset;
- uint64_t ChunkSize;
- };
+ size_t TargetStart = 0;
+ const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(
+ CopyData.TargetChunkLocationPtrs);
+
+ struct WriteOp
+ {
+ const ChunkedContentLookup::ChunkSequenceLocation* Target;
+ uint64_t CacheFileOffset;
+ uint64_t ChunkSize;
+ };
- std::vector<WriteOp> WriteOps;
- WriteOps.reserve(AllTargets.size());
+ std::vector<WriteOp> WriteOps;
+ WriteOps.reserve(AllTargets.size());
- for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
+ for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
+ {
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
+ AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
{
- std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
- AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
- for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
- {
- WriteOps.push_back(WriteOp{.Target = Target,
- .CacheFileOffset = ChunkTarget.CacheFileOffset,
- .ChunkSize = ChunkTarget.ChunkRawSize});
- }
- TargetStart += ChunkTarget.TargetChunkLocationCount;
+ WriteOps.push_back(WriteOp{.Target = Target,
+ .CacheFileOffset = ChunkTarget.CacheFileOffset,
+ .ChunkSize = ChunkTarget.ChunkRawSize});
}
+ TargetStart += ChunkTarget.TargetChunkLocationCount;
+ }
- std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
- if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
- {
- return true;
- }
- else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
- {
- return false;
- }
- if (Lhs.Target->Offset < Rhs.Target->Offset)
- {
- return true;
- }
+ std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
return false;
- });
+ }
+ if (Lhs.Target->Offset < Rhs.Target->Offset)
+ {
+ return true;
+ }
+ return false;
+ });
- if (!AbortFlag)
+ if (!AbortFlag)
+ {
+ BufferedOpenFile SourceFile(LocalFilePath);
+ WriteFileCache OpenFileCache;
+ for (const WriteOp& Op : WriteOps)
{
- BufferedOpenFile SourceFile(LocalFilePath);
- WriteFileCache OpenFileCache;
- for (const WriteOp& Op : WriteOps)
+ if (AbortFlag)
{
- if (AbortFlag)
- {
- break;
- }
- const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <=
- RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
- const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
- const uint64_t ChunkSize = Op.ChunkSize;
- CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize);
-
- ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
-
- OpenFileCache.WriteToFile<CompositeBuffer>(
- RemoteSequenceIndex,
- [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(
- CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- ChunkSource,
- Op.Target->Offset,
- RemoteContent.RawSizes[RemotePathIndex]);
- WriteToDiskBytes += ChunkSize;
- CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes?
+ break;
}
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <=
+ RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
+ const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
+ const uint64_t ChunkSize = Op.ChunkSize;
+ CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize);
+
+ ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
+
+ OpenFileCache.WriteToFile<CompositeBuffer>(
+ RemoteSequenceIndex,
+ [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
+ return GetTempChunkedSequenceFileName(
+ CacheFolderPath,
+ RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ },
+ ChunkSource,
+ Op.Target->Offset,
+ RemoteContent.RawSizes[RemotePathIndex]);
+ WriteToDiskBytes += ChunkSize;
+ CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes?
}
- if (!AbortFlag)
+ }
+ if (!AbortFlag)
+ {
+ // Write tracking, updating this must be done without any files open (WriteFileCache)
+ for (const WriteOp& Op : WriteOps)
{
- if (!AbortFlag)
+ ZEN_TRACE_CPU("UpdateFolder_Copy_VerifyHash");
+
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
{
- // Write tracking, updating this must be done without any files open (WriteFileCache)
- for (const WriteOp& Op : WriteOps)
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
+ GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
{
- ZEN_TRACE_CPU("UpdateFolder_Copy_VerifyHash");
-
- const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
- {
- const IoHash& SequenceRawHash =
- RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
- GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
- }
-
- ZEN_TRACE_CPU("UpdateFolder_Copy_rename");
- std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
- GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
- }
+ throw std::runtime_error(
+ fmt::format("Written chunk sequence {} hash does not match expected hash {}",
+ VerifyChunkHash,
+ SequenceRawHash));
}
- }
- ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size());
- ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]);
+ ZEN_TRACE_CPU("UpdateFolder_Copy_rename");
+ std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
+ }
}
+
+ ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size());
+ ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]);
+ }
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
}
}
},
Work.DefaultErrorFunction());
}
- size_t BlockCount = BlockDescriptions.size();
- std::atomic<size_t> BlocksComplete = 0;
-
- auto IsBlockNeeded = [&RemoteContent, &RemoteLookup, &RemoteChunkIndexNeedsCopyFromSourceFlags](
- const ChunkBlockDescription& BlockDescription) -> bool {
- for (const IoHash& ChunkHash : BlockDescription.ChunkRawHashes)
- {
- if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t RemoteChunkIndex = It->second;
- if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex])
- {
- return true;
- }
- }
- }
- return false;
+ size_t BlockCount = BlockDescriptions.size();
+
+ std::vector<bool> ChunkIsPickedUpByBlock(RemoteContent.ChunkedContent.ChunkHashes.size(), false);
+ auto GetNeededChunkBlockIndexes = [&RemoteContent,
+ &RemoteLookup,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &ChunkIsPickedUpByBlock](const ChunkBlockDescription& BlockDescription) {
+ std::vector<uint32_t> NeededBlockChunkIndexes;
+ for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++)
+ {
+ const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
+ if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t RemoteChunkIndex = It->second;
+ if (!ChunkIsPickedUpByBlock[RemoteChunkIndex])
+ {
+ if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex])
+ {
+ ChunkIsPickedUpByBlock[RemoteChunkIndex] = true;
+ NeededBlockChunkIndexes.push_back(ChunkBlockIndex);
+ }
+ }
+ }
+ }
+ return NeededBlockChunkIndexes;
};
- size_t BlocksNeededCount = 0;
+ size_t BlocksNeededCount = 0;
+ uint64_t AllBlocksSize = 0;
+ uint64_t AllBlocksFetch = 0;
+ uint64_t AllBlocksSlack = 0;
+ uint64_t AllBlockRequests = 0;
+ uint64_t AllBlockChunksSize = 0;
for (size_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++)
{
if (Work.IsAborted())
{
break;
}
- if (IsBlockNeeded(BlockDescriptions[BlockIndex]))
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ const std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription);
+ if (!BlockChunkIndexNeeded.empty())
{
- BlocksNeededCount++;
- Work.ScheduleWork(
- NetworkPool,
- [&, BlockIndex](std::atomic<bool>&) {
- if (!AbortFlag)
+ bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size();
+ bool CanDoPartialBlockDownload = (BlockDescription.HeaderSize > 0) && (BlockDescription.ChunkCompressedLengths.size() ==
+ BlockDescription.ChunkRawHashes.size());
+ if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload)
+ {
+ struct BlockRangeDescriptor
+ {
+ uint64_t RangeStart = 0;
+ uint64_t RangeLength = 0;
+ uint32_t ChunkBlockIndexStart = 0;
+ uint32_t ChunkBlockIndexCount = 0;
+ };
+ std::vector<BlockRangeDescriptor> BlockRanges;
+
+ ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalisys");
+
+ uint32_t NeedBlockChunkIndexOffset = 0;
+ uint32_t ChunkBlockIndex = 0;
+ uint32_t CurrentOffset =
+ gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+
+ BlockRangeDescriptor NextRange;
+ while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() &&
+ ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
+ {
+ const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
+ if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
{
- ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Read");
-
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescriptions[BlockIndex].BlockHash);
- if (!BlockBuffer)
+ if (NextRange.RangeLength > 0)
{
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescriptions[BlockIndex].BlockHash));
+ BlockRanges.push_back(NextRange);
+ NextRange = {};
}
- BytesDownloaded += BlockBuffer.GetSize();
- BlockBytes += BlockBuffer.GetSize();
- DownloadedBlocks++;
- CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)),
- Path / ZenTempBlockFolderName,
- BlockDescriptions[BlockIndex].BlockHash);
-
- if (!AbortFlag)
+ ChunkBlockIndex++;
+ CurrentOffset += ChunkCompressedLength;
+ }
+ else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
+ {
+ AllBlockChunksSize += ChunkCompressedLength;
+ if (NextRange.RangeLength == 0)
{
- Work.ScheduleWork(
- WritePool,
- [&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Write");
-
- FilteredWrittenBytesPerSecond.Start();
- IoHash BlockRawHash;
- uint64_t BlockRawSize;
- CompressedBuffer CompressedBlockBuffer =
- CompressedBuffer::FromCompressed(std::move(BlockBuffer), BlockRawHash, BlockRawSize);
- if (!CompressedBlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} is not a compressed buffer",
- BlockDescriptions[BlockIndex].BlockHash));
- }
+ NextRange.RangeStart = CurrentOffset;
+ NextRange.ChunkBlockIndexStart = ChunkBlockIndex;
+ }
+ NextRange.RangeLength += ChunkCompressedLength;
+ NextRange.ChunkBlockIndexCount++;
+ ChunkBlockIndex++;
+ CurrentOffset += ChunkCompressedLength;
+ NeedBlockChunkIndexOffset++;
+ }
+ else
+ {
+ ZEN_ASSERT(false);
+ }
+ }
+ AllBlocksSize += CurrentOffset;
+ if (NextRange.RangeLength > 0)
+ {
+ BlockRanges.push_back(NextRange);
+ NextRange = {};
+ }
- if (BlockRawHash != BlockDescriptions[BlockIndex].BlockHash)
- {
- throw std::runtime_error(fmt::format("Block {} header has a mismatching raw hash {}",
- BlockDescriptions[BlockIndex].BlockHash,
- BlockRawHash));
- }
+ ZEN_ASSERT(!BlockRanges.empty());
+ std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
+ auto It = BlockRanges.begin();
+ CollapsedBlockRanges.push_back(*It++);
+ uint64_t TotalSlack = 0;
+ while (It != BlockRanges.end())
+ {
+ BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
+ uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength);
+ uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength;
+ if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out
+ {
+ LastRange.ChunkBlockIndexCount =
+ (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
+ LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart;
+ TotalSlack += Slack;
+ }
+ else
+ {
+ CollapsedBlockRanges.push_back(*It);
+ }
+ ++It;
+ }
+
+ uint64_t TotalFetch = 0;
+ for (const BlockRangeDescriptor& Range : CollapsedBlockRanges)
+ {
+ TotalFetch += Range.RangeLength;
+ }
+
+ AllBlocksFetch += TotalFetch;
+ AllBlocksSlack += TotalSlack;
+ BlocksNeededCount++;
+ AllBlockRequests += CollapsedBlockRanges.size();
+
+ for (size_t BlockRangeIndex = 0; BlockRangeIndex < CollapsedBlockRanges.size(); BlockRangeIndex++)
+ {
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+ const BlockRangeDescriptor BlockRange = CollapsedBlockRanges[BlockRangeIndex];
+ // Partial block schedule
+ Work.ScheduleWork(
+ NetworkPool, // NetworkPool, // GetSyncWorkerPool()
+ [&, BlockIndex, BlockRange](std::atomic<bool>&) {
+ ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialGet");
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ }
+ BytesDownloaded += BlockBuffer.GetSize();
+ BlockBytes += BlockBuffer.GetSize();
+ DownloadedBlocks++;
+ CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)),
+ Path / ZenTempBlockFolderName,
+ BlockDescription.BlockHash,
+ fmt::format("_{}", BlockRange.RangeStart));
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
- CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite();
- if (!DecompressedBlockBuffer)
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ WritePool, // WritePool, // GetSyncWorkerPool(),
+ [&, BlockIndex, BlockRange, BlockPartialBuffer = std::move(Payload)](std::atomic<bool>&) {
+ if (!AbortFlag)
{
- throw std::runtime_error(fmt::format("Block {} failed to decompress",
- BlockDescriptions[BlockIndex].BlockHash));
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ FilteredWrittenBytesPerSecond.Start();
+
+ if (!WritePartialBlockToDisk(
+ CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ BlockPartialBuffer,
+ BlockRange.ChunkBlockIndexStart,
+ BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ ChunkCountWritten,
+ WriteToDiskBytes))
+ {
+ throw std::runtime_error(
+ fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
+ }
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
+ },
+ [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) {
+ ZEN_ERROR("Failed writing block {}. Reason: {}",
+ BlockDescriptions[BlockIndex].BlockHash,
+ Ex.what());
+ AbortFlag = true;
+ });
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ }
+ else
+ {
+ BlocksNeededCount++;
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ Work.ScheduleWork(
+ NetworkPool, // GetSyncWorkerPool(), // NetworkPool,
+ [&, BlockIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Get");
+
+ // Full block schedule
- ZEN_ASSERT_SLOW(BlockDescriptions[BlockIndex].BlockHash ==
- IoHash::HashBuffer(DecompressedBlockBuffer));
-
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- SequenceIndexChunksLeftToWriteCounters,
- DecompressedBlockBuffer,
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags.data(),
- ChunkCountWritten,
- WriteToDiskBytes))
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ }
+ BytesDownloaded += BlockBuffer.GetSize();
+ BlockBytes += BlockBuffer.GetSize();
+ DownloadedBlocks++;
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
+ CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)),
+ Path / ZenTempBlockFolderName,
+ BlockDescription.BlockHash);
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ WritePool,
+ [&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic<bool>&) {
+ if (!AbortFlag)
{
- throw std::runtime_error(
- fmt::format("Block {} is malformed", BlockDescriptions[BlockIndex].BlockHash));
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ FilteredWrittenBytesPerSecond.Start();
+ if (!WriteBlockToDisk(CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ BlockBuffer,
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ ChunkCountWritten,
+ WriteToDiskBytes))
+ {
+ throw std::runtime_error(
+ fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
- BlocksComplete++;
- }
- },
- [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) {
- ZEN_ERROR("Failed writing block {}. Reason: {}",
- BlockDescriptions[BlockIndex].BlockHash,
- Ex.what());
- AbortFlag = true;
- });
+ },
+ Work.DefaultErrorFunction());
+ }
}
- }
- },
- Work.DefaultErrorFunction());
+ },
+ Work.DefaultErrorFunction());
+ }
}
else
{
ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash);
}
}
-
+ ZEN_DEBUG("Fetching {} with {} slack (ideal {}) out of {} using {} requests for {} blocks",
+ NiceBytes(AllBlocksFetch),
+ NiceBytes(AllBlocksSlack),
+ NiceBytes(AllBlockChunksSize),
+ NiceBytes(AllBlocksSize),
+ AllBlockRequests,
+ BlocksNeededCount);
ZEN_TRACE_CPU("HandleChunks_Wait");
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
@@ -4412,13 +4816,13 @@ namespace {
ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load());
FilteredWrittenBytesPerSecond.Update(WriteToDiskBytes.load());
FilteredDownloadedBytesPerSecond.Update(BytesDownloaded.load());
- std::string Details = fmt::format("{}/{} chunks. {}/{} blocks. {} {}bits/s downloaded. {} {}B/s written",
- ChunkCountWritten.load(),
- ChunkCountToWrite,
- BlocksComplete.load(),
- BlocksNeededCount,
+ std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.",
+ RequestsComplete.load(),
+ TotalRequestCount,
NiceBytes(BytesDownloaded.load()),
NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
+ ChunkCountWritten.load(),
+ ChunkCountToWrite,
NiceBytes(WriteToDiskBytes.load()),
NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()));
WriteProgressBar.UpdateState({.Task = "Writing chunks ",
@@ -4437,6 +4841,13 @@ namespace {
}
WriteProgressBar.Finish();
+
+ ZEN_CONSOLE("Downloaded {} ({}bits/s). Wrote {} ({}B/s). Completed in {}",
+ NiceBytes(BytesDownloaded.load()),
+ NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), BytesDownloaded * 8)),
+ NiceBytes(WriteToDiskBytes.load()),
+ NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), WriteToDiskBytes.load())),
+ NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs()));
}
for (const auto& SequenceIndexChunksLeftToWriteCounter : SequenceIndexChunksLeftToWriteCounters)
@@ -4455,6 +4866,7 @@ namespace {
});
// Move all files we will reuse to cache folder
+ // TODO: If WipeTargetFolder is false we could check which files are already correct and leave them in place
for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++)
{
const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex];
@@ -5186,6 +5598,7 @@ namespace {
std::span<const std::string> BuildPartNames,
const std::filesystem::path& Path,
bool AllowMultiparts,
+ bool AllowPartialBlockRequests,
bool WipeTargetFolder,
bool PostDownloadVerify)
{
@@ -5202,8 +5615,8 @@ namespace {
CreateDirectories(Path / ZenTempBlockFolderName);
CreateDirectories(Path / ZenTempChunkFolderName); // TODO: Don't clear this - pick up files -> chunks to use
CreateDirectories(Path /
- ZenTempCacheFolderName); // TODO: Don't clear this - pick up files and use as sequences (non .tmp extension) and
- // delete .tmp (maybe?) - chunk them? How do we know the file is worth chunking?
+ ZenTempCacheFolderName); // TODO: Don't clear this - pick up files and use as sequences (non .tmp extension)
+ // and delete .tmp (maybe?) - chunk them? How do we know the file is worth chunking?
std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
@@ -5311,6 +5724,7 @@ namespace {
RemoteContent,
BlockDescriptions,
LooseChunkHashes,
+ AllowPartialBlockRequests,
WipeTargetFolder,
LocalFolderState);
@@ -5705,6 +6119,12 @@ BuildsCommand::BuildsCommand()
"Allow large attachments to be transfered using multipart protocol. Defaults to true.",
cxxopts::value(m_AllowMultiparts),
"<allowmultipart>");
+ m_DownloadOptions.add_option("",
+ "",
+ "allow-partial-block-requests",
+ "Allow request for partial chunk blocks. Defaults to true.",
+ cxxopts::value(m_AllowPartialBlockRequests),
+ "<allowpartialblockrequests>");
m_DownloadOptions
.add_option("", "", "verify", "Enable post download verify of all tracked files", cxxopts::value(m_PostDownloadVerify), "<verify>");
m_DownloadOptions.parse_positional({"local-path", "build-id", "build-part-name"});
@@ -5728,6 +6148,18 @@ BuildsCommand::BuildsCommand()
AddOutputOptions(m_TestOptions);
m_TestOptions.add_options()("h,help", "Print help");
m_TestOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>");
+ m_TestOptions.add_option("",
+ "",
+ "allow-multipart",
+ "Allow large attachments to be transfered using multipart protocol. Defaults to true.",
+ cxxopts::value(m_AllowMultiparts),
+ "<allowmultipart>");
+ m_TestOptions.add_option("",
+ "",
+ "allow-partial-block-requests",
+ "Allow request for partial chunk blocks. Defaults to true.",
+ cxxopts::value(m_AllowPartialBlockRequests),
+ "<allowpartialblockrequests>");
m_TestOptions.parse_positional({"local-path"});
m_TestOptions.positional_help("local-path");
@@ -6037,7 +6469,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Bucket,
GeneratedBuildId ? "Generated " : "",
BuildId);
- CreateDirectories(m_Path / ZenTempStorageFolderName);
Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}
@@ -6175,7 +6606,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Namespace,
m_Bucket,
BuildId);
- CreateDirectories(m_Path / ZenTempStorageFolderName);
Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}
@@ -6190,7 +6620,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
}
- DownloadFolder(*Storage, BuildId, BuildPartIds, m_BuildPartNames, m_Path, m_AllowMultiparts, m_Clean, m_PostDownloadVerify);
+ DownloadFolder(*Storage,
+ BuildId,
+ BuildPartIds,
+ m_BuildPartNames,
+ m_Path,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ m_Clean,
+ m_PostDownloadVerify);
if (false)
{
@@ -6275,7 +6713,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Namespace,
m_Bucket,
BuildId);
- CreateDirectories(m_Path / ZenTempStorageFolderName);
Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}
@@ -6335,7 +6772,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
const std::filesystem::path DownloadPath = m_Path.parent_path() / (m_BuildPartName + "_download");
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, true, true);
+ DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true);
if (AbortFlag)
{
ZEN_CONSOLE("Download failed.");
@@ -6347,7 +6784,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildPartId,
m_BuildPartName,
DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true);
+ DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed. (identical target)");
@@ -6449,7 +6886,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildPartId,
m_BuildPartName,
DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true);
+ DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed. (scrambled target)");
@@ -6487,7 +6924,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true);
+ DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -6495,7 +6932,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false, true);
+ DownloadFolder(*Storage,
+ BuildId2,
+ {BuildPartId2},
+ {},
+ DownloadPath,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ false,
+ true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -6503,7 +6948,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false, true);
+ DownloadFolder(*Storage,
+ BuildId2,
+ {BuildPartId2},
+ {},
+ DownloadPath,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ false,
+ true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -6549,7 +7002,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Namespace,
m_Bucket,
BuildId);
- CreateDirectories(m_Path / ZenTempStorageFolderName);
Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}
@@ -6617,7 +7069,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Namespace,
m_Bucket,
BuildId);
- CreateDirectories(m_Path / ZenTempStorageFolderName);
Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}
diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h
index c54fb4db1..838a17807 100644
--- a/src/zen/cmds/builds_cmd.h
+++ b/src/zen/cmds/builds_cmd.h
@@ -49,6 +49,7 @@ private:
bool m_Clean = false;
uint8_t m_BlockReuseMinPercentLimit = 85;
bool m_AllowMultiparts = true;
+ bool m_AllowPartialBlockRequests = true;
std::filesystem::path m_ManifestPath;
// Direct access token (may expire)
diff --git a/src/zencore/compactbinary.cpp b/src/zencore/compactbinary.cpp
index adccaba70..b43cc18f1 100644
--- a/src/zencore/compactbinary.cpp
+++ b/src/zencore/compactbinary.cpp
@@ -15,6 +15,8 @@
#include <zencore/testing.h>
#include <zencore/uid.h>
+#include <EASTL/fixed_vector.h>
+
#include <fmt/format.h>
#include <string_view>
@@ -1376,9 +1378,9 @@ TryMeasureCompactBinary(MemoryView View, CbFieldType& OutType, uint64_t& OutSize
CbField
LoadCompactBinary(BinaryReader& Ar, BufferAllocator Allocator)
{
- std::vector<uint8_t> HeaderBytes;
- CbFieldType FieldType;
- uint64_t FieldSize = 1;
+ eastl::fixed_vector<uint8_t, 32> HeaderBytes;
+ CbFieldType FieldType;
+ uint64_t FieldSize = 1;
for (const int64_t StartPos = Ar.CurrentOffset(); FieldSize > 0;)
{
@@ -1393,7 +1395,7 @@ LoadCompactBinary(BinaryReader& Ar, BufferAllocator Allocator)
HeaderBytes.resize(ReadOffset + ReadSize);
Ar.Read(HeaderBytes.data() + ReadOffset, ReadSize);
- if (TryMeasureCompactBinary(MakeMemoryView(HeaderBytes), FieldType, FieldSize))
+ if (TryMeasureCompactBinary(MakeMemoryView(HeaderBytes.data(), HeaderBytes.size()), FieldType, FieldSize))
{
if (FieldSize <= uint64_t(Ar.Size() - StartPos))
{
diff --git a/src/zencore/compactbinarybuilder.cpp b/src/zencore/compactbinarybuilder.cpp
index a60de023d..63c0b9c5c 100644
--- a/src/zencore/compactbinarybuilder.cpp
+++ b/src/zencore/compactbinarybuilder.cpp
@@ -15,23 +15,21 @@
namespace zen {
-template<typename T>
uint64_t
-AddUninitialized(std::vector<T>& Vector, uint64_t Count)
+AddUninitialized(CbWriter::CbWriterData_t& Vector, uint64_t Count)
{
const uint64_t Offset = Vector.size();
Vector.resize(Offset + Count);
return Offset;
}
-template<typename T>
uint64_t
-Append(std::vector<T>& Vector, const T* Data, uint64_t Count)
+Append(CbWriter::CbWriterData_t& Vector, const uint8_t* Data, uint64_t Count)
{
const uint64_t Offset = Vector.size();
Vector.resize(Offset + Count);
- memcpy(Vector.data() + Offset, Data, sizeof(T) * Count);
+ memcpy(Vector.data() + Offset, Data, sizeof(uint8_t) * Count);
return Offset;
}
@@ -76,7 +74,7 @@ IsUniformType(const CbFieldType Type)
/** Append the payload from the compact binary value to the array and return its type. */
static inline CbFieldType
-AppendCompactBinary(const CbFieldView& Value, std::vector<uint8_t>& OutData)
+AppendCompactBinary(const CbFieldView& Value, CbWriter::CbWriterData_t& OutData)
{
struct FCopy : public CbFieldView
{
@@ -93,7 +91,6 @@ AppendCompactBinary(const CbFieldView& Value, std::vector<uint8_t>& OutData)
CbWriter::CbWriter()
{
- States.reserve(4);
States.emplace_back();
}
diff --git a/src/zencore/compactbinarypackage.cpp b/src/zencore/compactbinarypackage.cpp
index 7de161845..ffe64f2e9 100644
--- a/src/zencore/compactbinarypackage.cpp
+++ b/src/zencore/compactbinarypackage.cpp
@@ -3,10 +3,13 @@
#include "zencore/compactbinarypackage.h"
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinaryvalidation.h>
+#include <zencore/eastlutil.h>
#include <zencore/endian.h>
#include <zencore/stream.h>
#include <zencore/testing.h>
+#include <EASTL/span.h>
+
namespace zen {
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -341,6 +344,12 @@ CbPackage::SetObject(CbObject InObject, const IoHash* InObjectHash, AttachmentRe
}
void
+CbPackage::ReserveAttachments(size_t Count)
+{
+ Attachments.reserve(Count);
+}
+
+void
CbPackage::AddAttachment(const CbAttachment& Attachment, AttachmentResolver* Resolver)
{
if (!Attachment.IsNull())
@@ -374,17 +383,18 @@ CbPackage::AddAttachments(std::span<const CbAttachment> InAttachments)
{
ZEN_ASSERT(!Attachment.IsNull());
}
+
// Assume we have no duplicates!
Attachments.insert(Attachments.end(), InAttachments.begin(), InAttachments.end());
std::sort(Attachments.begin(), Attachments.end());
- ZEN_ASSERT_SLOW(std::unique(Attachments.begin(), Attachments.end()) == Attachments.end());
+ ZEN_ASSERT_SLOW(eastl::unique(Attachments.begin(), Attachments.end()) == Attachments.end());
}
int32_t
CbPackage::RemoveAttachment(const IoHash& Hash)
{
return gsl::narrow_cast<int32_t>(
- std::erase_if(Attachments, [&Hash](const CbAttachment& Attachment) -> bool { return Attachment.GetHash() == Hash; }));
+ erase_if(Attachments, [&Hash](const CbAttachment& Attachment) -> bool { return Attachment.GetHash() == Hash; }));
}
bool
diff --git a/src/zencore/include/zencore/compactbinarybuilder.h b/src/zencore/include/zencore/compactbinarybuilder.h
index 1c625cacc..f11717453 100644
--- a/src/zencore/include/zencore/compactbinarybuilder.h
+++ b/src/zencore/include/zencore/compactbinarybuilder.h
@@ -18,6 +18,8 @@
#include <type_traits>
#include <vector>
+#include <EASTL/fixed_vector.h>
+
#include <gsl/gsl-lite.hpp>
namespace zen {
@@ -367,6 +369,8 @@ public:
/** Private flags that are public to work with ENUM_CLASS_FLAGS. */
enum class StateFlags : uint8_t;
+ typedef eastl::fixed_vector<uint8_t, 2048> CbWriterData_t;
+
protected:
/** Reserve the specified size up front until the format is optimized. */
ZENCORE_API explicit CbWriter(int64_t InitialSize);
@@ -409,8 +413,8 @@ private:
// provided externally, such as on the stack. That format will store the offsets that require
// object or array sizes to be inserted and field types to be removed, and will perform those
// operations only when saving to a buffer.
- std::vector<uint8_t> Data;
- std::vector<WriterState> States;
+ eastl::fixed_vector<WriterState, 4> States;
+ CbWriterData_t Data;
};
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/src/zencore/include/zencore/compactbinarypackage.h b/src/zencore/include/zencore/compactbinarypackage.h
index 12fcc41b7..9ec12cb0f 100644
--- a/src/zencore/include/zencore/compactbinarypackage.h
+++ b/src/zencore/include/zencore/compactbinarypackage.h
@@ -12,6 +12,8 @@
#include <span>
#include <variant>
+#include <EASTL/fixed_vector.h>
+
#ifdef GetObject
# error "windows.h pollution"
# undef GetObject
@@ -265,7 +267,10 @@ public:
}
/** Returns the attachments in this package. */
- inline std::span<const CbAttachment> GetAttachments() const { return Attachments; }
+ inline std::span<const CbAttachment> GetAttachments() const
+ {
+ return std::span<const CbAttachment>(begin(Attachments), end(Attachments));
+ }
/**
* Find an attachment by its hash.
@@ -286,6 +291,8 @@ public:
void AddAttachments(std::span<const CbAttachment> Attachments);
+ void ReserveAttachments(size_t Count);
+
/**
* Remove an attachment by hash.
*
@@ -324,9 +331,9 @@ private:
void GatherAttachments(const CbObject& Object, AttachmentResolver Resolver);
/** Attachments ordered by their hash. */
- std::vector<CbAttachment> Attachments;
- CbObject Object;
- IoHash ObjectHash;
+ eastl::fixed_vector<CbAttachment, 32> Attachments;
+ CbObject Object;
+ IoHash ObjectHash;
};
namespace legacy {
diff --git a/src/zencore/include/zencore/compositebuffer.h b/src/zencore/include/zencore/compositebuffer.h
index b435c5e74..1e1611de9 100644
--- a/src/zencore/include/zencore/compositebuffer.h
+++ b/src/zencore/include/zencore/compositebuffer.h
@@ -2,6 +2,7 @@
#pragma once
+#include <zencore/eastlutil.h>
#include <zencore/sharedbuffer.h>
#include <zencore/zencore.h>
@@ -9,6 +10,8 @@
#include <span>
#include <vector>
+#include <EASTL/fixed_vector.h>
+
namespace zen {
/**
@@ -35,7 +38,7 @@ public:
{
m_Segments.reserve((GetBufferCount(std::forward<BufferTypes>(Buffers)) + ...));
(AppendBuffers(std::forward<BufferTypes>(Buffers)), ...);
- std::erase_if(m_Segments, [](const SharedBuffer& It) { return It.IsNull(); });
+ erase_if(m_Segments, [](const SharedBuffer& It) { return It.IsNull(); });
}
}
@@ -46,7 +49,10 @@ public:
[[nodiscard]] ZENCORE_API uint64_t GetSize() const;
/** Returns the segments that the buffer is composed from. */
- [[nodiscard]] inline std::span<const SharedBuffer> GetSegments() const { return std::span<const SharedBuffer>{m_Segments}; }
+ [[nodiscard]] inline std::span<const SharedBuffer> GetSegments() const
+ {
+ return std::span<const SharedBuffer>{begin(m_Segments), end(m_Segments)};
+ }
/** Returns true if the composite buffer is not null. */
[[nodiscard]] inline explicit operator bool() const { return !IsNull(); }
@@ -120,6 +126,8 @@ public:
static const CompositeBuffer Null;
private:
+ typedef eastl::fixed_vector<SharedBuffer, 4> SharedBufferVector_t;
+
static inline size_t GetBufferCount(const CompositeBuffer& Buffer) { return Buffer.m_Segments.size(); }
inline void AppendBuffers(const CompositeBuffer& Buffer)
{
@@ -134,12 +142,25 @@ private:
inline void AppendBuffers(SharedBuffer&& Buffer) { m_Segments.push_back(std::move(Buffer)); }
inline void AppendBuffers(IoBuffer&& Buffer) { m_Segments.push_back(SharedBuffer(std::move(Buffer))); }
+ static inline size_t GetBufferCount(std::span<IoBuffer>&& Container) { return Container.size(); }
+ inline void AppendBuffers(std::span<IoBuffer>&& Container)
+ {
+ m_Segments.reserve(m_Segments.size() + Container.size());
+ for (IoBuffer& Buffer : Container)
+ {
+ m_Segments.emplace_back(SharedBuffer(std::move(Buffer)));
+ }
+ }
+
static inline size_t GetBufferCount(std::vector<SharedBuffer>&& Container) { return Container.size(); }
static inline size_t GetBufferCount(std::vector<IoBuffer>&& Container) { return Container.size(); }
inline void AppendBuffers(std::vector<SharedBuffer>&& Container)
{
m_Segments.reserve(m_Segments.size() + Container.size());
- m_Segments.insert(m_Segments.end(), std::make_move_iterator(Container.begin()), std::make_move_iterator(Container.end()));
+ for (SharedBuffer& Buffer : Container)
+ {
+ m_Segments.emplace_back(std::move(Buffer));
+ }
}
inline void AppendBuffers(std::vector<IoBuffer>&& Container)
{
@@ -150,8 +171,17 @@ private:
}
}
+ inline void AppendBuffers(SharedBufferVector_t&& Container)
+ {
+ m_Segments.reserve(m_Segments.size() + Container.size());
+ for (SharedBuffer& Buffer : Container)
+ {
+ m_Segments.emplace_back(std::move(Buffer));
+ }
+ }
+
private:
- std::vector<SharedBuffer> m_Segments;
+ SharedBufferVector_t m_Segments;
};
void compositebuffer_forcelink(); // internal
diff --git a/src/zencore/include/zencore/eastlutil.h b/src/zencore/include/zencore/eastlutil.h
new file mode 100644
index 000000000..642321dae
--- /dev/null
+++ b/src/zencore/include/zencore/eastlutil.h
@@ -0,0 +1,20 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <algorithm>
+
+namespace zen {
+
+size_t
+erase_if(auto& _Cont, auto Predicate)
+{
+ auto _First = _Cont.begin();
+ const auto _Last = _Cont.end();
+ const auto _Old_size = _Cont.size();
+ _First = std::remove_if(_First, _Last, Predicate);
+ _Cont.erase(_First, _Last);
+ return _Old_size - _Cont.size();
+}
+
+} // namespace zen
diff --git a/src/zencore/include/zencore/memory/newdelete.h b/src/zencore/include/zencore/memory/newdelete.h
index d22c8604f..059f1d5ea 100644
--- a/src/zencore/include/zencore/memory/newdelete.h
+++ b/src/zencore/include/zencore/memory/newdelete.h
@@ -153,3 +153,29 @@ operator new[](std::size_t n, std::align_val_t al, const std::nothrow_t&) noexce
return zen_new_aligned_nothrow(n, static_cast<size_t>(al));
}
#endif
+
+// EASTL operator new
+
+void*
+operator new[](size_t size, const char* pName, int flags, unsigned debugFlags, const char* file, int line)
+{
+ ZEN_UNUSED(pName, flags, debugFlags, file, line);
+ return zen_new(size);
+}
+
+void*
+operator new[](size_t size,
+ size_t alignment,
+ size_t alignmentOffset,
+ const char* pName,
+ int flags,
+ unsigned debugFlags,
+ const char* file,
+ int line)
+{
+ ZEN_UNUSED(alignmentOffset, pName, flags, debugFlags, file, line);
+
+ ZEN_ASSERT_SLOW(alignmentOffset == 0); // currently not supported
+
+ return zen_new_aligned(size, alignment);
+}
diff --git a/src/zencore/xmake.lua b/src/zencore/xmake.lua
index 2efa3fdb8..b8b14084c 100644
--- a/src/zencore/xmake.lua
+++ b/src/zencore/xmake.lua
@@ -55,6 +55,7 @@ target('zencore')
add_packages(
"vcpkg::doctest",
+ "vcpkg::eastl",
"vcpkg::fmt",
"vcpkg::gsl-lite",
"vcpkg::lz4",
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index e4c6d243d..30711a432 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -1148,6 +1148,30 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
return true;
};
+ uint64_t RequestedContentLength = (uint64_t)-1;
+ if (auto RangeIt = AdditionalHeader.Entries.find("Range"); RangeIt != AdditionalHeader.Entries.end())
+ {
+ if (RangeIt->second.starts_with("bytes"))
+ {
+ size_t RangeStartPos = RangeIt->second.find('=', 5);
+ if (RangeStartPos != std::string::npos)
+ {
+ RangeStartPos++;
+ size_t RangeSplitPos = RangeIt->second.find('-', RangeStartPos);
+ if (RangeSplitPos != std::string::npos)
+ {
+ std::optional<size_t> RequestedRangeStart =
+ ParseInt<size_t>(RangeIt->second.substr(RangeStartPos, RangeSplitPos - RangeStartPos));
+ std::optional<size_t> RequestedRangeEnd = ParseInt<size_t>(RangeIt->second.substr(RangeStartPos + 1));
+ if (RequestedRangeStart.has_value() && RequestedRangeEnd.has_value())
+ {
+ RequestedContentLength = RequestedRangeEnd.value() - 1;
+ }
+ }
+ }
+ }
+ }
+
cpr::Response Response;
{
std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
@@ -1155,10 +1179,10 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
std::pair<std::string, std::string> Header = GetHeader(header);
if (Header.first == "Content-Length"sv)
{
- std::optional<size_t> ContentSize = ParseInt<size_t>(Header.second);
- if (ContentSize.has_value())
+ std::optional<size_t> ContentLength = ParseInt<size_t>(Header.second);
+ if (ContentLength.has_value())
{
- if (ContentSize.value() > 1024 * 1024)
+ if (ContentLength.value() > 1024 * 1024)
{
PayloadFile = std::make_unique<detail::TempPayloadFile>();
std::error_code Ec = PayloadFile->Open(TempFolderPath);
@@ -1172,7 +1196,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
}
else
{
- PayloadString.reserve(ContentSize.value());
+ PayloadString.reserve(ContentLength.value());
}
}
}
@@ -1218,85 +1242,90 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
auto It = Response.header.find("Content-Length");
if (It != Response.header.end())
{
- std::optional<int64_t> ContentLength = ParseInt<int64_t>(It->second);
- if (ContentLength)
- {
- std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
+ std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
- auto HeaderCallback = [&](std::string header, intptr_t) {
- std::pair<std::string, std::string> Header = GetHeader(header);
- if (!Header.first.empty())
- {
- ReceivedHeaders.emplace_back(std::move(Header));
- }
+ auto HeaderCallback = [&](std::string header, intptr_t) {
+ std::pair<std::string, std::string> Header = GetHeader(header);
+ if (!Header.first.empty())
+ {
+ ReceivedHeaders.emplace_back(std::move(Header));
+ }
- if (Header.first == "Content-Range"sv)
+ if (Header.first == "Content-Range"sv)
+ {
+ if (Header.second.starts_with("bytes "sv))
{
- if (Header.second.starts_with("bytes "sv))
+ size_t RangeStartEnd = Header.second.find('-', 6);
+ if (RangeStartEnd != std::string::npos)
{
- size_t RangeStartEnd = Header.second.find('-', 6);
- if (RangeStartEnd != std::string::npos)
+ const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6));
+ if (Start)
{
- const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6));
- if (Start)
+ uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
+ if (Start.value() == DownloadedSize)
{
- uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
- if (Start.value() == DownloadedSize)
- {
- return 1;
- }
- else if (Start.value() > DownloadedSize)
- {
- return 0;
- }
- if (PayloadFile)
- {
- PayloadFile->ResetWritePos(Start.value());
- }
- else
- {
- PayloadString = PayloadString.substr(0, Start.value());
- }
return 1;
}
+ else if (Start.value() > DownloadedSize)
+ {
+ return 0;
+ }
+ if (PayloadFile)
+ {
+ PayloadFile->ResetWritePos(Start.value());
+ }
+ else
+ {
+ PayloadString = PayloadString.substr(0, Start.value());
+ }
+ return 1;
}
}
- return 0;
}
- return 1;
- };
+ return 0;
+ }
+ return 1;
+ };
- KeyValueMap HeadersWithRange(AdditionalHeader);
- do
- {
- uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
+ KeyValueMap HeadersWithRange(AdditionalHeader);
+ do
+ {
+ uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
- std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength.value() - 1);
- if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end())
+ uint64_t ContentLength = RequestedContentLength;
+ if (ContentLength == uint64_t(-1))
+ {
+ if (auto ParsedContentLength = ParseInt<int64_t>(It->second); ParsedContentLength.has_value())
{
- if (RangeIt->second == Range)
- {
- // If we didn't make any progress, abort
- break;
- }
+ ContentLength = ParsedContentLength.value();
}
- HeadersWithRange.Entries.insert_or_assign("Range", Range);
-
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri,
- Url,
- m_ConnectionSettings,
- HeadersWithRange,
- {},
- m_SessionId,
- GetAccessToken());
- Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback});
- for (const std::pair<std::string, std::string>& H : ReceivedHeaders)
+ }
+
+ std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength - 1);
+ if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end())
+ {
+ if (RangeIt->second == Range)
{
- Response.header.insert_or_assign(H.first, H.second);
+ // If we didn't make any progress, abort
+ break;
}
- ReceivedHeaders.clear();
- } while (ShouldResume(Response));
- }
+ }
+ HeadersWithRange.Entries.insert_or_assign("Range", Range);
+
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri,
+ Url,
+ m_ConnectionSettings,
+ HeadersWithRange,
+ {},
+ m_SessionId,
+ GetAccessToken());
+ Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback});
+ for (const std::pair<std::string, std::string>& H : ReceivedHeaders)
+ {
+ Response.header.insert_or_assign(H.first, H.second);
+ }
+ ReceivedHeaders.clear();
+ } while (ShouldResume(Response));
}
}
}
diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp
index 1fbe22628..27a09f339 100644
--- a/src/zenhttp/httpserver.cpp
+++ b/src/zenhttp/httpserver.cpp
@@ -31,6 +31,8 @@
#include <span>
#include <string_view>
+#include <EASTL/fixed_vector.h>
+
namespace zen {
using namespace std::literals;
@@ -529,7 +531,7 @@ HttpServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType
{
std::span<const SharedBuffer> Segments = Payload.GetSegments();
- std::vector<IoBuffer> Buffers;
+ eastl::fixed_vector<IoBuffer, 64> Buffers;
Buffers.reserve(Segments.size());
for (auto& Segment : Segments)
@@ -537,7 +539,7 @@ HttpServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType
Buffers.push_back(Segment.AsIoBuffer());
}
- WriteResponse(ResponseCode, ContentType, Buffers);
+ WriteResponse(ResponseCode, ContentType, std::span<IoBuffer>(begin(Buffers), end(Buffers)));
}
std::string
diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h
index 7b87cb84b..217455dba 100644
--- a/src/zenhttp/include/zenhttp/httpserver.h
+++ b/src/zenhttp/include/zenhttp/httpserver.h
@@ -208,7 +208,7 @@ class HttpRouterRequest
public:
HttpRouterRequest(HttpServerRequest& Request) : m_HttpRequest(Request) {}
- ZENCORE_API std::string GetCapture(uint32_t Index) const;
+ std::string_view GetCapture(uint32_t Index) const;
inline HttpServerRequest& ServerRequest() { return m_HttpRequest; }
private:
@@ -220,12 +220,14 @@ private:
friend class HttpRequestRouter;
};
-inline std::string
+inline std::string_view
HttpRouterRequest::GetCapture(uint32_t Index) const
{
ZEN_ASSERT(Index < m_Match.size());
- return m_Match[Index];
+ const auto& Match = m_Match[Index];
+
+ return std::string_view(&*Match.first, Match.second - Match.first);
}
/** HTTP request router helper
diff --git a/src/zenhttp/packageformat.cpp b/src/zenhttp/packageformat.cpp
index 676fc73fd..ae80851e4 100644
--- a/src/zenhttp/packageformat.cpp
+++ b/src/zenhttp/packageformat.cpp
@@ -19,6 +19,8 @@
#include <span>
#include <vector>
+#include <EASTL/fixed_vector.h>
+
#if ZEN_PLATFORM_WINDOWS
# include <zencore/windows.h>
#endif
@@ -31,6 +33,10 @@ namespace zen {
const std::string_view HandlePrefix(":?#:");
+typedef eastl::fixed_vector<IoBuffer, 16> IoBufferVec_t;
+
+IoBufferVec_t FormatPackageMessageInternal(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle);
+
std::vector<IoBuffer>
FormatPackageMessage(const CbPackage& Data, void* TargetProcessHandle)
{
@@ -42,10 +48,18 @@ FormatPackageMessageBuffer(const CbPackage& Data, void* TargetProcessHandle)
return FormatPackageMessageBuffer(Data, FormatFlags::kDefault, TargetProcessHandle);
}
+std::vector<IoBuffer>
+FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle)
+{
+ auto Vec = FormatPackageMessageInternal(Data, Flags, TargetProcessHandle);
+ return std::vector<IoBuffer>(begin(Vec), end(Vec));
+}
+
CompositeBuffer
FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle)
{
- return CompositeBuffer(FormatPackageMessage(Data, Flags, TargetProcessHandle));
+ auto Vec = FormatPackageMessageInternal(Data, Flags, TargetProcessHandle);
+ return CompositeBuffer(std::span{begin(Vec), end(Vec)});
}
static void
@@ -54,7 +68,7 @@ MarshalLocal(CbAttachmentEntry*& AttachmentInfo,
CbAttachmentReferenceHeader& LocalRef,
const IoHash& AttachmentHash,
bool IsCompressed,
- std::vector<IoBuffer>& ResponseBuffers)
+ IoBufferVec_t& ResponseBuffers)
{
IoBuffer RefBuffer(sizeof(CbAttachmentReferenceHeader) + Path8.size());
@@ -146,8 +160,8 @@ IsLocalRef(tsl::robin_map<void*, std::string>& FileNameMap,
return true;
};
-std::vector<IoBuffer>
-FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle)
+IoBufferVec_t
+FormatPackageMessageInternal(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle)
{
ZEN_TRACE_CPU("FormatPackageMessage");
@@ -177,7 +191,7 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, void* TargetProce
#endif // ZEN_PLATFORM_WINDOWS
const std::span<const CbAttachment>& Attachments = Data.GetAttachments();
- std::vector<IoBuffer> ResponseBuffers;
+ IoBufferVec_t ResponseBuffers;
ResponseBuffers.reserve(2 + Attachments.size()); // TODO: may want to use an additional fudge factor here to avoid growing since each
// attachment is likely to consist of several buffers
diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp
index 87128c0c9..3bdcdf098 100644
--- a/src/zenhttp/servers/httpsys.cpp
+++ b/src/zenhttp/servers/httpsys.cpp
@@ -16,6 +16,8 @@
#include <zencore/trace.h>
#include <zenhttp/packageformat.h>
+#include <EASTL/fixed_vector.h>
+
#if ZEN_WITH_HTTPSYS
# define _WINSOCKAPI_
# include <zencore/windows.h>
@@ -381,14 +383,14 @@ public:
void SuppressResponseBody(); // typically used for HEAD requests
private:
- std::vector<HTTP_DATA_CHUNK> m_HttpDataChunks;
- uint64_t m_TotalDataSize = 0; // Sum of all chunk sizes
- uint16_t m_ResponseCode = 0;
- uint32_t m_NextDataChunkOffset = 0; // Cursor used for very large chunk lists
- uint32_t m_RemainingChunkCount = 0; // Backlog for multi-call sends
- bool m_IsInitialResponse = true;
- HttpContentType m_ContentType = HttpContentType::kBinary;
- std::vector<IoBuffer> m_DataBuffers;
+ eastl::fixed_vector<HTTP_DATA_CHUNK, 16> m_HttpDataChunks;
+ uint64_t m_TotalDataSize = 0; // Sum of all chunk sizes
+ uint16_t m_ResponseCode = 0;
+ uint32_t m_NextDataChunkOffset = 0; // Cursor used for very large chunk lists
+ uint32_t m_RemainingChunkCount = 0; // Backlog for multi-call sends
+ bool m_IsInitialResponse = true;
+ HttpContentType m_ContentType = HttpContentType::kBinary;
+ eastl::fixed_vector<IoBuffer, 16> m_DataBuffers;
void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> Blobs);
};
diff --git a/src/zenserver/objectstore/objectstore.cpp b/src/zenserver/objectstore/objectstore.cpp
index 5d96de225..e757ef84e 100644
--- a/src/zenserver/objectstore/objectstore.cpp
+++ b/src/zenserver/objectstore/objectstore.cpp
@@ -269,9 +269,9 @@ HttpObjectStoreService::Inititalize()
m_Router.RegisterRoute(
"bucket/{path}",
[this](zen::HttpRouterRequest& Request) {
- const std::string Path = Request.GetCapture(1);
- const auto Sep = Path.find_last_of('.');
- const bool IsObject = Sep != std::string::npos && Path.size() - Sep > 0;
+ const std::string_view Path = Request.GetCapture(1);
+ const auto Sep = Path.find_last_of('.');
+ const bool IsObject = Sep != std::string::npos && Path.size() - Sep > 0;
if (IsObject)
{
@@ -337,18 +337,18 @@ HttpObjectStoreService::CreateBucket(zen::HttpRouterRequest& Request)
}
void
-HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::string& Path)
+HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::string_view Path)
{
namespace fs = std::filesystem;
- const auto Sep = Path.find_first_of('/');
- const std::string BucketName = Sep == std::string::npos ? Path : Path.substr(0, Sep);
+ const auto Sep = Path.find_first_of('/');
+ const std::string BucketName{Sep == std::string::npos ? Path : Path.substr(0, Sep)};
if (BucketName.empty())
{
return Request.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
}
- std::string BucketPrefix = Sep == std::string::npos || Sep == Path.size() - 1 ? std::string() : Path.substr(BucketName.size() + 1);
+ std::string BucketPrefix{Sep == std::string::npos || Sep == Path.size() - 1 ? std::string() : Path.substr(BucketName.size() + 1)};
if (BucketPrefix.empty())
{
const auto QueryParms = Request.ServerRequest().GetQueryParams();
@@ -450,14 +450,13 @@ HttpObjectStoreService::DeleteBucket(zen::HttpRouterRequest& Request)
}
void
-HttpObjectStoreService::GetObject(zen::HttpRouterRequest& Request, const std::string& Path)
+HttpObjectStoreService::GetObject(zen::HttpRouterRequest& Request, const std::string_view Path)
{
namespace fs = std::filesystem;
- const auto Sep = Path.find_first_of('/');
- const std::string BucketName = Sep == std::string::npos ? Path : Path.substr(0, Sep);
- const std::string BucketPrefix =
- Sep == std::string::npos || Sep == Path.size() - 1 ? std::string() : Path.substr(BucketName.size() + 1);
+ const auto Sep = Path.find_first_of('/');
+ const std::string BucketName{Sep == std::string::npos ? Path : Path.substr(0, Sep)};
+ const std::string BucketPrefix{Sep == std::string::npos || Sep == Path.size() - 1 ? std::string() : Path.substr(BucketName.size() + 1)};
const fs::path BucketDir = GetBucketDirectory(BucketName);
@@ -554,8 +553,8 @@ HttpObjectStoreService::PutObject(zen::HttpRouterRequest& Request)
{
namespace fs = std::filesystem;
- const std::string& BucketName = Request.GetCapture(1);
- const fs::path BucketDir = GetBucketDirectory(BucketName);
+ const std::string_view BucketName = Request.GetCapture(1);
+ const fs::path BucketDir = GetBucketDirectory(BucketName);
if (BucketDir.empty())
{
diff --git a/src/zenserver/objectstore/objectstore.h b/src/zenserver/objectstore/objectstore.h
index c905ceab3..dae979c4c 100644
--- a/src/zenserver/objectstore/objectstore.h
+++ b/src/zenserver/objectstore/objectstore.h
@@ -36,9 +36,9 @@ private:
void Inititalize();
std::filesystem::path GetBucketDirectory(std::string_view BucketName);
void CreateBucket(zen::HttpRouterRequest& Request);
- void ListBucket(zen::HttpRouterRequest& Request, const std::string& Path);
+ void ListBucket(zen::HttpRouterRequest& Request, const std::string_view Path);
void DeleteBucket(zen::HttpRouterRequest& Request);
- void GetObject(zen::HttpRouterRequest& Request, const std::string& Path);
+ void GetObject(zen::HttpRouterRequest& Request, const std::string_view Path);
void PutObject(zen::HttpRouterRequest& Request);
ObjectStoreConfig m_Cfg;
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp
index 0b8e5f13b..47748dd90 100644
--- a/src/zenserver/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/projectstore/httpprojectstore.cpp
@@ -983,15 +983,19 @@ HttpProjectService::HandleOplogOpPrepRequest(HttpRouterRequest& Req)
IoBuffer Payload = HttpReq.ReadPayload();
CbObject RequestObject = LoadCompactBinaryObject(Payload);
- std::vector<IoHash> ChunkList;
- CbArrayView HaveList = RequestObject["have"sv].AsArrayView();
- ChunkList.reserve(HaveList.Num());
- for (auto& Entry : HaveList)
+ std::vector<IoHash> NeedList;
+
{
- ChunkList.push_back(Entry.AsHash());
- }
+ eastl::fixed_vector<IoHash, 16> ChunkList;
+ CbArrayView HaveList = RequestObject["have"sv].AsArrayView();
+ ChunkList.reserve(HaveList.Num());
+ for (auto& Entry : HaveList)
+ {
+ ChunkList.push_back(Entry.AsHash());
+ }
- std::vector<IoHash> NeedList = FoundLog->CheckPendingChunkReferences(ChunkList, std::chrono::minutes(2));
+ NeedList = FoundLog->CheckPendingChunkReferences(std::span(begin(ChunkList), end(ChunkList)), std::chrono::minutes(2));
+ }
CbObjectWriter Cbo(1 + 1 + 5 + NeedList.size() * (1 + sizeof(IoHash::Hash)) + 1);
Cbo.BeginArray("need");
@@ -1151,7 +1155,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req)
return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified");
}
- std::vector<IoHash> ReferencedChunks;
+ eastl::fixed_vector<IoHash, 16> ReferencedChunks;
Core.IterateAttachments([&ReferencedChunks](CbFieldView View) { ReferencedChunks.push_back(View.AsAttachment()); });
// Write core to oplog
@@ -1169,7 +1173,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req)
// Once we stored the op, we no longer need to retain any chunks this op references
if (!ReferencedChunks.empty())
{
- FoundLog->RemovePendingChunkReferences(ReferencedChunks);
+ FoundLog->RemovePendingChunkReferences(std::span(begin(ReferencedChunks), end(ReferencedChunks)));
}
m_ProjectStats.OpWriteCount++;
@@ -1301,9 +1305,9 @@ HttpProjectService::HandleOpLogOpRequest(HttpRouterRequest& Req)
HttpServerRequest& HttpReq = Req.ServerRequest();
- const std::string& ProjectId = Req.GetCapture(1);
- const std::string& OplogId = Req.GetCapture(2);
- const std::string& OpIdString = Req.GetCapture(3);
+ const std::string_view ProjectId = Req.GetCapture(1);
+ const std::string_view OplogId = Req.GetCapture(2);
+ const std::string_view OpIdString = Req.GetCapture(3);
Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
if (!Project)
@@ -1690,8 +1694,8 @@ HttpProjectService::HandleProjectRequest(HttpRouterRequest& Req)
using namespace std::literals;
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const std::string ProjectId = Req.GetCapture(1);
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const std::string_view ProjectId = Req.GetCapture(1);
switch (HttpReq.RequestVerb())
{
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 53df12b14..86791e29a 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -423,9 +423,13 @@ ComputeOpKey(const CbObjectView& Op)
{
using namespace std::literals;
- BinaryWriter KeyStream;
+ eastl::fixed_vector<uint8_t, 256> KeyData;
- Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyStream.Write(Data, Size); });
+ Op["key"sv].WriteToStream([&](const void* Data, size_t Size) {
+ auto Begin = reinterpret_cast<const uint8_t*>(Data);
+ auto End = Begin + Size;
+ KeyData.insert(KeyData.end(), Begin, End);
+ });
XXH3_128 KeyHash128;
@@ -434,15 +438,15 @@ ComputeOpKey(const CbObjectView& Op)
// path but longer paths are evaluated properly. In the future all key lengths
// should be evaluated using the proper path, this is a temporary workaround to
// maintain compatibility with existing disk state.
- if (KeyStream.GetSize() < 240)
+ if (KeyData.size() < 240)
{
XXH3_128Stream_deprecated KeyHasher;
- KeyHasher.Append(KeyStream.Data(), KeyStream.Size());
+ KeyHasher.Append(KeyData.data(), KeyData.size());
KeyHash128 = KeyHasher.GetHash();
}
else
{
- KeyHash128 = XXH3_128::HashMemory(KeyStream.GetView());
+ KeyHash128 = XXH3_128::HashMemory(KeyData.data(), KeyData.size());
}
Oid KeyHash;
@@ -2735,7 +2739,7 @@ ProjectStore::Oplog::CheckPendingChunkReferences(std::span<const IoHash> ChunkHa
MissingChunks.reserve(ChunkHashes.size());
for (const IoHash& FileHash : ChunkHashes)
{
- if (IoBuffer Payload = m_CidStore.FindChunkByCid(FileHash); !Payload)
+ if (!m_CidStore.ContainsChunk(FileHash))
{
MissingChunks.push_back(FileHash);
}
@@ -3359,7 +3363,6 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bo
ZEN_MEMSCOPE(GetProjectstoreTag());
ZEN_TRACE_CPU("Store::OpenOplog");
- std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
{
RwLock::SharedLockScope ProjectLock(m_ProjectLock);
@@ -3367,21 +3370,35 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bo
if (OplogIt != m_Oplogs.end())
{
- if (!VerifyPathOnDisk || Oplog::ExistsAt(OplogBasePath))
+ bool ReOpen = false;
+
+ if (VerifyPathOnDisk)
{
- return OplogIt->second.get();
+ std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
+
+ if (!Oplog::ExistsAt(OplogBasePath))
+ {
+ // Somebody deleted the oplog on disk behind our back
+ ProjectLock.ReleaseNow();
+ std::filesystem::path DeletePath;
+ if (!RemoveOplog(OplogId, DeletePath))
+ {
+ ZEN_WARN("Failed to clean up deleted oplog {}/{}", Identifier, OplogId, OplogBasePath);
+ }
+
+ ReOpen = true;
+ }
}
- // Somebody deleted the oplog on disk behind our back
- ProjectLock.ReleaseNow();
- std::filesystem::path DeletePath;
- if (!RemoveOplog(OplogId, DeletePath))
+ if (!ReOpen)
{
- ZEN_WARN("Failed to clean up deleted oplog {}/{}", Identifier, OplogId, OplogBasePath);
+ return OplogIt->second.get();
}
}
}
+ std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
+
RwLock::ExclusiveLockScope Lock(m_ProjectLock);
if (auto It = m_Oplogs.find(std::string{OplogId}); It != m_Oplogs.end())
{
diff --git a/src/zenserver/workspaces/httpworkspaces.cpp b/src/zenserver/workspaces/httpworkspaces.cpp
index 905ba5ab2..8a4b977ad 100644
--- a/src/zenserver/workspaces/httpworkspaces.cpp
+++ b/src/zenserver/workspaces/httpworkspaces.cpp
@@ -589,7 +589,7 @@ void
HttpWorkspacesService::ShareAliasFilesRequest(HttpRouterRequest& Req)
{
HttpServerRequest& ServerRequest = Req.ServerRequest();
- std::string Alias = Req.GetCapture(1);
+ std::string_view Alias = Req.GetCapture(1);
if (Alias.empty())
{
return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
@@ -608,7 +608,7 @@ void
HttpWorkspacesService::ShareAliasChunkInfoRequest(HttpRouterRequest& Req)
{
HttpServerRequest& ServerRequest = Req.ServerRequest();
- std::string Alias = Req.GetCapture(1);
+ std::string_view Alias = Req.GetCapture(1);
if (Alias.empty())
{
return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
@@ -635,7 +635,7 @@ void
HttpWorkspacesService::ShareAliasBatchRequest(HttpRouterRequest& Req)
{
HttpServerRequest& ServerRequest = Req.ServerRequest();
- std::string Alias = Req.GetCapture(1);
+ std::string_view Alias = Req.GetCapture(1);
if (Alias.empty())
{
return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
@@ -654,7 +654,7 @@ void
HttpWorkspacesService::ShareAliasEntriesRequest(HttpRouterRequest& Req)
{
HttpServerRequest& ServerRequest = Req.ServerRequest();
- std::string Alias = Req.GetCapture(1);
+ std::string_view Alias = Req.GetCapture(1);
if (Alias.empty())
{
return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
@@ -673,7 +673,7 @@ void
HttpWorkspacesService::ShareAliasChunkRequest(HttpRouterRequest& Req)
{
HttpServerRequest& ServerRequest = Req.ServerRequest();
- std::string Alias = Req.GetCapture(1);
+ std::string_view Alias = Req.GetCapture(1);
if (Alias.empty())
{
return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
@@ -700,7 +700,7 @@ void
HttpWorkspacesService::ShareAliasRequest(HttpRouterRequest& Req)
{
HttpServerRequest& ServerRequest = Req.ServerRequest();
- std::string Alias = Req.GetCapture(1);
+ std::string_view Alias = Req.GetCapture(1);
if (Alias.empty())
{
return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 25f68330a..61552fafc 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -708,11 +708,11 @@ namespace zen {
ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
std::atomic_uint64_t& OuterCacheMemoryUsage,
- std::string BucketName,
+ std::string_view BucketName,
const BucketConfiguration& Config)
: m_Gc(Gc)
, m_OuterCacheMemoryUsage(OuterCacheMemoryUsage)
-, m_BucketName(std::move(BucketName))
+, m_BucketName(BucketName)
, m_Configuration(Config)
, m_BucketId(Oid::Zero)
{
@@ -1329,7 +1329,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle
{
- GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults)
+ GetBatchHandle(ZenCacheValueVec_t& OutResults) : OutResults(OutResults)
{
Keys.reserve(OutResults.capacity());
ResultIndexes.reserve(OutResults.capacity());
@@ -1340,11 +1340,11 @@ struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle
std::vector<IoHash> Keys;
std::vector<size_t> ResultIndexes;
- std::vector<ZenCacheValue>& OutResults;
+ ZenCacheValueVec_t& OutResults;
};
ZenCacheDiskLayer::CacheBucket::GetBatchHandle*
-ZenCacheDiskLayer::CacheBucket::BeginGetBatch(std::vector<ZenCacheValue>& OutResult)
+ZenCacheDiskLayer::CacheBucket::BeginGetBatch(ZenCacheValueVec_t& OutResult)
{
ZEN_TRACE_CPU("Z$::Bucket::BeginGetBatch");
return new GetBatchHandle(OutResult);
@@ -1364,13 +1364,13 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
if (!Batch->ResultIndexes.empty())
{
- std::vector<DiskLocation> StandaloneDiskLocations;
- std::vector<size_t> StandaloneKeyIndexes;
- std::vector<size_t> MemCachedKeyIndexes;
- std::vector<DiskLocation> InlineDiskLocations;
- std::vector<BlockStoreLocation> InlineBlockLocations;
- std::vector<size_t> InlineKeyIndexes;
- std::vector<bool> FillRawHashAndRawSize(Batch->Keys.size(), false);
+ eastl::fixed_vector<DiskLocation, 16> StandaloneDiskLocations;
+ eastl::fixed_vector<size_t, 16> StandaloneKeyIndexes;
+ eastl::fixed_vector<size_t, 16> MemCachedKeyIndexes;
+ eastl::fixed_vector<DiskLocation, 16> InlineDiskLocations;
+ eastl::fixed_vector<BlockStoreLocation, 16> InlineBlockLocations;
+ eastl::fixed_vector<size_t, 16> InlineKeyIndexes;
+ eastl::fixed_vector<bool, 16> FillRawHashAndRawSize(Batch->Keys.size(), false);
{
RwLock::SharedLockScope IndexLock(m_IndexLock);
for (size_t KeyIndex = 0; KeyIndex < Batch->Keys.size(); KeyIndex++)
@@ -1526,33 +1526,35 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
if (!InlineDiskLocations.empty())
{
ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline");
- m_BlockStore.IterateChunks(InlineBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
- // Only read into memory the IoBuffers we could potentially add to memcache
- const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 1u * 1024u);
- m_BlockStore.IterateBlock(
- InlineBlockLocations,
- ChunkIndexes,
- [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
- const void* Data,
- uint64_t Size) -> bool {
- if (Data != nullptr)
- {
- FillOne(InlineDiskLocations[ChunkIndex],
- InlineKeyIndexes[ChunkIndex],
- IoBufferBuilder::MakeCloneFromMemory(Data, Size));
- }
- return true;
- },
- [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
- BlockStoreFile& File,
- uint64_t Offset,
- uint64_t Size) -> bool {
- FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size));
- return true;
- },
- LargeChunkSizeLimit);
- return true;
- });
+ m_BlockStore.IterateChunks(
+ std::span{begin(InlineBlockLocations), end(InlineBlockLocations)},
+ [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
+ // Only read into memory the IoBuffers we could potentially add to memcache
+ const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 1u * 1024u);
+ m_BlockStore.IterateBlock(
+ std::span{begin(InlineBlockLocations), end(InlineBlockLocations)},
+ ChunkIndexes,
+ [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
+ const void* Data,
+ uint64_t Size) -> bool {
+ if (Data != nullptr)
+ {
+ FillOne(InlineDiskLocations[ChunkIndex],
+ InlineKeyIndexes[ChunkIndex],
+ IoBufferBuilder::MakeCloneFromMemory(Data, Size));
+ }
+ return true;
+ },
+ [this, &FillOne, &InlineDiskLocations, &InlineKeyIndexes](size_t ChunkIndex,
+ BlockStoreFile& File,
+ uint64_t Offset,
+ uint64_t Size) -> bool {
+ FillOne(InlineDiskLocations[ChunkIndex], InlineKeyIndexes[ChunkIndex], File.GetChunk(Offset, Size));
+ return true;
+ },
+ LargeChunkSizeLimit);
+ return true;
+ });
}
if (!StandaloneDiskLocations.empty())
@@ -3581,15 +3583,29 @@ ZenCacheDiskLayer::~ZenCacheDiskLayer()
}
}
+template<typename T, typename U>
+struct equal_to_2 : public eastl::binary_function<T, U, bool>
+{
+ constexpr bool operator()(const T& a, const U& b) const { return a == b; }
+
+ template<typename T_ = T,
+ typename U_ = U,
+ typename = eastl::enable_if_t<!eastl::is_same_v<eastl::remove_const_t<T_>, eastl::remove_const_t<U_>>>>
+ constexpr bool operator()(const U& b, const T& a) const
+ {
+ return b == a;
+ }
+};
+
ZenCacheDiskLayer::CacheBucket*
ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
{
ZEN_TRACE_CPU("Z$::GetOrCreateBucket");
- const auto BucketName = std::string(InBucket);
{
RwLock::SharedLockScope SharedLock(m_Lock);
- if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
+ if (auto It = m_Buckets.find_as(InBucket, std::hash<std::string_view>(), equal_to_2<std::string, std::string_view>());
+ It != m_Buckets.end())
{
return It->second.get();
}
@@ -3597,31 +3613,32 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
// We create the bucket without holding a lock since contructor calls GcManager::AddGcReferencer which takes an exclusive lock.
// This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock
- std::unique_ptr<CacheBucket> Bucket(
- std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig));
+ std::unique_ptr<CacheBucket> Bucket(std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, InBucket, m_Configuration.BucketConfig));
RwLock::ExclusiveLockScope Lock(m_Lock);
- if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
+ if (auto It = m_Buckets.find_as(InBucket, std::hash<std::string_view>(), equal_to_2<std::string, std::string_view>());
+ It != m_Buckets.end())
{
return It->second.get();
}
std::filesystem::path BucketPath = m_RootDir;
- BucketPath /= BucketName;
+ BucketPath /= InBucket;
try
{
if (!Bucket->OpenOrCreate(BucketPath))
{
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", InBucket, m_RootDir);
return nullptr;
}
}
catch (const std::exception& Err)
{
- ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
+ ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", InBucket, BucketPath, Err.what());
throw;
}
+ std::string BucketName{InBucket};
CacheBucket* Result = Bucket.get();
m_Buckets.emplace(BucketName, std::move(Bucket));
if (m_CapturedBuckets)
@@ -3720,7 +3737,7 @@ ZenCacheDiskLayer::EndPutBatch(PutBatchHandle* Batch) noexcept
struct ZenCacheDiskLayer::GetBatchHandle
{
- GetBatchHandle(std::vector<ZenCacheValue>& OutResults) : OutResults(OutResults) {}
+ GetBatchHandle(ZenCacheValueVec_t& OutResults) : OutResults(OutResults) {}
struct BucketHandle
{
CacheBucket* Bucket;
@@ -3780,13 +3797,13 @@ struct ZenCacheDiskLayer::GetBatchHandle
return NewBucketHandle;
}
- RwLock Lock;
- std::vector<BucketHandle> BucketHandles;
- std::vector<ZenCacheValue>& OutResults;
+ RwLock Lock;
+ eastl::fixed_vector<BucketHandle, 4> BucketHandles;
+ ZenCacheValueVec_t& OutResults;
};
ZenCacheDiskLayer::GetBatchHandle*
-ZenCacheDiskLayer::BeginGetBatch(std::vector<ZenCacheValue>& OutResults)
+ZenCacheDiskLayer::BeginGetBatch(ZenCacheValueVec_t& OutResults)
{
return new GetBatchHandle(OutResults);
}
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index cca51e63e..97e26a38d 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -20,6 +20,8 @@
#include <zencore/memory/llm.h>
+#include <EASTL/fixed_vector.h>
+
//////////////////////////////////////////////////////////////////////////
namespace zen {
@@ -89,7 +91,7 @@ GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key)
return false;
}
IoHash Hash = HashField.AsHash();
- Key = CacheKey::Create(*Bucket, Hash);
+ Key = CacheKey::CreateValidated(std::move(*Bucket), Hash);
return true;
}
@@ -305,7 +307,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
}
DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::vector<bool> Results;
+ eastl::fixed_vector<bool, 32> Results;
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
for (CbFieldView RequestField : RequestsArray)
@@ -481,16 +483,15 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
bool Exists = false;
bool ReadFromUpstream = false;
};
- struct RecordRequestData
+ struct RecordRequestData : public CacheKeyRequest
{
- CacheKeyRequest Upstream;
- CbObjectView RecordObject;
- IoBuffer RecordCacheValue;
- CacheRecordPolicy DownstreamPolicy;
- std::vector<ValueRequestData> Values;
- bool Complete = false;
- const UpstreamEndpointInfo* Source = nullptr;
- uint64_t ElapsedTimeUs;
+ CbObjectView RecordObject;
+ IoBuffer RecordCacheValue;
+ CacheRecordPolicy DownstreamPolicy;
+ eastl::fixed_vector<ValueRequestData, 4> Values;
+ bool Complete = false;
+ const UpstreamEndpointInfo* Source = nullptr;
+ uint64_t ElapsedTimeUs;
};
std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
@@ -503,8 +504,8 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
const bool HasUpstream = m_UpstreamCache.IsActive();
- std::vector<RecordRequestData> Requests;
- std::vector<size_t> UpstreamIndexes;
+ eastl::fixed_vector<RecordRequestData, 16> Requests;
+ eastl::fixed_vector<size_t, 16> UpstreamIndexes;
auto ParseValues = [](RecordRequestData& Request) {
CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView();
@@ -535,7 +536,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
CbObjectView RequestObject = RequestField.AsObjectView();
CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
- CacheKey& Key = Request.Upstream.Key;
+ CacheKey& Key = Request.Key;
if (!GetRpcRequestCacheKey(KeyObject, Key))
{
return CbPackage{};
@@ -707,7 +708,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
for (size_t Index : UpstreamIndexes)
{
RecordRequestData& Request = Requests[Index];
- UpstreamRequests.push_back(&Request.Upstream);
+ UpstreamRequests.push_back(&Request);
if (Request.Values.size())
{
@@ -721,13 +722,13 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None;
Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy);
}
- Request.Upstream.Policy = Builder.Build();
+ Request.Policy = Builder.Build();
}
else
{
// We don't know which Values exist in the Record; ask the upstrem for all values that the client wants,
// and convert the CacheRecordPolicy to an upstream policy
- Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream();
+ Request.Policy = Request.DownstreamPolicy.ConvertToUpstream();
}
}
@@ -737,10 +738,9 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
return;
}
- RecordRequestData& Request =
- *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream));
+ RecordRequestData& Request = *static_cast<RecordRequestData*>(&Params.Request);
Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- const CacheKey& Key = Request.Upstream.Key;
+ const CacheKey& Key = Request.Key;
Stopwatch Timer;
auto TimeGuard = MakeGuard([&Timer, &Request]() { Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); });
if (!Request.RecordObject)
@@ -832,10 +832,12 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
CbPackage ResponsePackage;
CbObjectWriter ResponseObject{2048};
+ ResponsePackage.ReserveAttachments(Requests.size());
+
ResponseObject.BeginArray("Result"sv);
for (RecordRequestData& Request : Requests)
{
- const CacheKey& Key = Request.Upstream.Key;
+ const CacheKey& Key = Request.Key;
if (Request.Complete ||
(Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)))
{
@@ -910,11 +912,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
const bool HasUpstream = m_UpstreamCache.IsActive();
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- std::vector<bool> BatchResults;
- std::vector<size_t> BatchResultIndexes;
- std::vector<bool> Results;
- std::vector<CacheKey> UpstreamCacheKeys;
- uint64_t RequestCount = RequestsArray.Num();
+ std::vector<bool> BatchResults;
+ eastl::fixed_vector<size_t, 32> BatchResultIndexes;
+ eastl::fixed_vector<bool, 32> Results;
+ eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys;
+
+ uint64_t RequestCount = RequestsArray.Num();
{
Results.reserve(RequestCount);
std::unique_ptr<ZenCacheStore::PutBatch> Batch;
@@ -1099,15 +1102,15 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
uint64_t RawSize = 0;
CompressedBuffer Result;
};
- std::vector<RequestData> Requests;
+ eastl::fixed_vector<RequestData, 16> Requests;
- std::vector<size_t> RemoteRequestIndexes;
+ eastl::fixed_vector<size_t, 16> RemoteRequestIndexes;
const bool HasUpstream = m_UpstreamCache.IsActive();
- CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- std::vector<ZenCacheValue> CacheValues;
- const uint64_t RequestCount = RequestsArray.Num();
+ CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+ ZenCacheValueVec_t CacheValues;
+ const uint64_t RequestCount = RequestsArray.Num();
CacheValues.reserve(RequestCount);
{
std::unique_ptr<ZenCacheStore::GetBatch> Batch;
@@ -1136,7 +1139,6 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
CacheKey& Key = Request.Key;
CachePolicy Policy = Request.Policy;
- ZenCacheValue CacheValue;
if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
{
if (Batch)
@@ -1276,6 +1278,9 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Response");
CbPackage RpcResponse;
CbObjectWriter ResponseObject{1024};
+
+ RpcResponse.ReserveAttachments(Requests.size());
+
ResponseObject.BeginArray("Result"sv);
for (const RequestData& Request : Requests)
{
@@ -1642,7 +1647,7 @@ CacheRpcHandler::GetLocalCacheValues(const CacheRequestContext& Context,
using namespace cache::detail;
const bool HasUpstream = m_UpstreamCache.IsActive();
- std::vector<ZenCacheValue> Chunks;
+ ZenCacheValueVec_t Chunks;
Chunks.reserve(ValueRequests.size());
{
std::unique_ptr<ZenCacheStore::GetBatch> Batch;
@@ -1796,6 +1801,8 @@ CacheRpcHandler::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequest
CbPackage RpcResponse;
CbObjectWriter Writer{1024};
+ RpcResponse.ReserveAttachments(Requests.size());
+
Writer.BeginArray("Result"sv);
for (ChunkRequest& Request : Requests)
{
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index 133cb42d7..7d277329e 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -178,13 +178,13 @@ ZenCacheNamespace::EndPutBatch(PutBatchHandle* Batch) noexcept
struct ZenCacheNamespace::GetBatchHandle
{
- GetBatchHandle(std::vector<ZenCacheValue>& OutResult) : Results(OutResult) {}
- std::vector<ZenCacheValue>& Results;
+ GetBatchHandle(ZenCacheValueVec_t& OutResult) : Results(OutResult) {}
+ ZenCacheValueVec_t& Results;
ZenCacheDiskLayer::GetBatchHandle* DiskLayerHandle = nullptr;
};
ZenCacheNamespace::GetBatchHandle*
-ZenCacheNamespace::BeginGetBatch(std::vector<ZenCacheValue>& OutResult)
+ZenCacheNamespace::BeginGetBatch(ZenCacheValueVec_t& OutResult)
{
ZenCacheNamespace::GetBatchHandle* Handle = new ZenCacheNamespace::GetBatchHandle(OutResult);
Handle->DiskLayerHandle = m_DiskLayer.BeginGetBatch(OutResult);
@@ -580,7 +580,7 @@ ZenCacheStore::PutBatch::~PutBatch()
}
}
-ZenCacheStore::GetBatch::GetBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<ZenCacheValue>& OutResult)
+ZenCacheStore::GetBatch::GetBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, ZenCacheValueVec_t& OutResult)
: m_CacheStore(CacheStore)
, Results(OutResult)
{
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index b0b4f22cb..05400c784 100644
--- a/src/zenstore/include/zenstore/cache/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
@@ -12,8 +12,9 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <EASTL/string.h>
+#include <EASTL/unordered_map.h>
#include <filesystem>
-#include <unordered_map>
namespace zen {
@@ -169,7 +170,7 @@ public:
~ZenCacheDiskLayer();
struct GetBatchHandle;
- GetBatchHandle* BeginGetBatch(std::vector<ZenCacheValue>& OutResult);
+ GetBatchHandle* BeginGetBatch(ZenCacheValueVec_t& OutResult);
void EndGetBatch(GetBatchHandle* Batch) noexcept;
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& BatchHandle);
@@ -216,13 +217,16 @@ public:
*/
struct CacheBucket : public GcReferencer
{
- CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config);
+ CacheBucket(GcManager& Gc,
+ std::atomic_uint64_t& OuterCacheMemoryUsage,
+ std::string_view BucketName,
+ const BucketConfiguration& Config);
~CacheBucket();
bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
struct GetBatchHandle;
- GetBatchHandle* BeginGetBatch(std::vector<ZenCacheValue>& OutResult);
+ GetBatchHandle* BeginGetBatch(ZenCacheValueVec_t& OutResult);
void EndGetBatch(GetBatchHandle* Batch) noexcept;
bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle);
@@ -486,18 +490,20 @@ private:
bool StartAsyncMemCacheTrim();
void MemCacheTrim();
- GcManager& m_Gc;
- JobQueue& m_JobQueue;
- std::filesystem::path m_RootDir;
- Configuration m_Configuration;
- std::atomic_uint64_t m_TotalMemCachedSize{};
- std::atomic_bool m_IsMemCacheTrimming = false;
- std::atomic<GcClock::Tick> m_NextAllowedTrimTick;
- mutable RwLock m_Lock;
- std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets;
- std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets;
- uint32_t m_UpdateCaptureRefCounter = 0;
- std::unique_ptr<std::vector<std::string>> m_CapturedBuckets;
+ typedef eastl::unordered_map<std::string, std::unique_ptr<CacheBucket>, std::hash<std::string>, std::equal_to<std::string>> BucketMap_t;
+
+ GcManager& m_Gc;
+ JobQueue& m_JobQueue;
+ std::filesystem::path m_RootDir;
+ Configuration m_Configuration;
+ std::atomic_uint64_t m_TotalMemCachedSize{};
+ std::atomic_bool m_IsMemCacheTrimming = false;
+ std::atomic<GcClock::Tick> m_NextAllowedTrimTick;
+ mutable RwLock m_Lock;
+ BucketMap_t m_Buckets;
+ std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets;
+ uint32_t m_UpdateCaptureRefCounter = 0;
+ std::unique_ptr<std::vector<std::string>> m_CapturedBuckets;
ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete;
ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete;
diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h
index 9b45c7b21..521c78bb1 100644
--- a/src/zenstore/include/zenstore/cache/cacheshared.h
+++ b/src/zenstore/include/zenstore/cache/cacheshared.h
@@ -6,6 +6,8 @@
#include <zencore/iohash.h>
#include <zenstore/gc.h>
+#include <EASTL/fixed_vector.h>
+
#include <gsl/gsl-lite.hpp>
#include <unordered_map>
@@ -32,6 +34,8 @@ struct ZenCacheValue
IoHash RawHash = IoHash::Zero;
};
+typedef eastl::fixed_vector<ZenCacheValue, 16> ZenCacheValueVec_t;
+
struct CacheValueDetails
{
struct ValueDetails
diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h
index 82fec9b0e..5e056cf2d 100644
--- a/src/zenstore/include/zenstore/cache/structuredcachestore.h
+++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h
@@ -86,7 +86,7 @@ public:
void EndPutBatch(PutBatchHandle* Batch) noexcept;
struct GetBatchHandle;
- GetBatchHandle* BeginGetBatch(std::vector<ZenCacheValue>& OutResults);
+ GetBatchHandle* BeginGetBatch(ZenCacheValueVec_t& OutResults);
void EndGetBatch(GetBatchHandle* Batch) noexcept;
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
@@ -220,14 +220,14 @@ public:
class GetBatch
{
public:
- GetBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<ZenCacheValue>& OutResult);
+ GetBatch(ZenCacheStore& CacheStore, std::string_view Namespace, ZenCacheValueVec_t& OutResult);
~GetBatch();
private:
ZenCacheStore& m_CacheStore;
ZenCacheNamespace* m_Store = nullptr;
ZenCacheNamespace::GetBatchHandle* m_NamespaceBatchHandle = nullptr;
- std::vector<ZenCacheValue>& Results;
+ ZenCacheValueVec_t& Results;
friend class ZenCacheStore;
};
diff --git a/src/zenstore/xmake.lua b/src/zenstore/xmake.lua
index f0bd64d2e..031a66829 100644
--- a/src/zenstore/xmake.lua
+++ b/src/zenstore/xmake.lua
@@ -8,3 +8,4 @@ target('zenstore')
add_includedirs("include", {public=true})
add_deps("zencore", "zenutil")
add_packages("vcpkg::robin-map")
+ add_packages("vcpkg::eastl", {public=true});
diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp
index a4bb759e7..e57109006 100644
--- a/src/zenutil/filebuildstorage.cpp
+++ b/src/zenutil/filebuildstorage.cpp
@@ -325,7 +325,7 @@ public:
return {};
}
- virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash) override
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override
{
ZEN_UNUSED(BuildId);
SimulateLatency(0, 0);
@@ -337,10 +337,19 @@ public:
if (std::filesystem::is_regular_file(BlockPath))
{
BasicFile File(BlockPath, BasicFile::Mode::kRead);
- IoBuffer Payload = File.ReadAll();
- ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload))));
- m_Stats.TotalBytesRead += Payload.GetSize();
+ IoBuffer Payload;
+ if (RangeOffset != 0 || RangeBytes != (uint64_t)-1)
+ {
+ Payload = IoBuffer(RangeBytes);
+ File.Read(Payload.GetMutableView().GetData(), RangeBytes, RangeOffset);
+ }
+ else
+ {
+ Payload = File.ReadAll();
+ ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload))));
+ }
Payload.SetContentType(ZenContentType::kCompressedBinary);
+ m_Stats.TotalBytesRead += Payload.GetSize();
SimulateLatency(0, Payload.GetSize());
return Payload;
}
diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h
index 9c236310f..9d2bab170 100644
--- a/src/zenutil/include/zenutil/buildstorage.h
+++ b/src/zenutil/include/zenutil/buildstorage.h
@@ -40,7 +40,10 @@ public:
std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
std::function<void(uint64_t, bool)>&& OnSentBytes) = 0;
- virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash) = 0;
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t RangeOffset = 0,
+ uint64_t RangeBytes = (uint64_t)-1) = 0;
virtual std::vector<std::function<void()>> GetLargeBuildBlob(
const Oid& BuildId,
const IoHash& RawHash,
diff --git a/src/zenutil/include/zenutil/cache/cachekey.h b/src/zenutil/include/zenutil/cache/cachekey.h
index 741375946..0ab05f4f1 100644
--- a/src/zenutil/include/zenutil/cache/cachekey.h
+++ b/src/zenutil/include/zenutil/cache/cachekey.h
@@ -17,6 +17,12 @@ struct CacheKey
static CacheKey Create(std::string_view Bucket, const IoHash& Hash) { return {.Bucket = ToLower(Bucket), .Hash = Hash}; }
+ // This should be used whenever the bucket name has already been validated to avoid redundant ToLower calls
+ static CacheKey CreateValidated(std::string&& BucketValidated, const IoHash& Hash)
+ {
+ return {.Bucket = std::move(BucketValidated), .Hash = Hash};
+ }
+
auto operator<=>(const CacheKey& that) const
{
if (auto b = caseSensitiveCompareStrings(Bucket, that.Bucket); b != std::strong_ordering::equal)
diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h
index 852271868..2c5fc73b8 100644
--- a/src/zenutil/include/zenutil/jupiter/jupitersession.h
+++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h
@@ -123,7 +123,9 @@ public:
std::string_view BucketId,
const Oid& BuildId,
const IoHash& Hash,
- std::filesystem::path TempFolderPath);
+ std::filesystem::path TempFolderPath,
+ uint64_t Offset = 0,
+ uint64_t Size = (uint64_t)-1);
JupiterResult PutMultipartBuildBlob(std::string_view Namespace,
std::string_view BucketId,
diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp
index 309885b05..bf89ce785 100644
--- a/src/zenutil/jupiter/jupiterbuildstorage.cpp
+++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp
@@ -217,13 +217,15 @@ public:
return WorkList;
}
- virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash) override
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override
{
ZEN_TRACE_CPU("Jupiter::GetBuildBlob");
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- JupiterResult GetBuildBlobResult = m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath);
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ CreateDirectories(m_TempFolderPath);
+ JupiterResult GetBuildBlobResult =
+ m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath, RangeOffset, RangeBytes);
AddStatistic(GetBuildBlobResult);
if (!GetBuildBlobResult.Success)
{
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp
index 06ac6ae36..68f214c06 100644
--- a/src/zenutil/jupiter/jupitersession.cpp
+++ b/src/zenutil/jupiter/jupitersession.cpp
@@ -698,11 +698,19 @@ JupiterSession::GetBuildBlob(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
const IoHash& Hash,
- std::filesystem::path TempFolderPath)
+ std::filesystem::path TempFolderPath,
+ uint64_t Offset,
+ uint64_t Size)
{
+ HttpClient::KeyValueMap Headers;
+ if (Offset != 0 || Size != (uint64_t)-1)
+ {
+ Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", Offset, Offset + Size - 1)});
+ }
HttpClient::Response Response =
m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()),
- TempFolderPath);
+ TempFolderPath,
+ Headers);
return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv);
}
diff --git a/xmake.lua b/xmake.lua
index c49a71901..103ca6703 100644
--- a/xmake.lua
+++ b/xmake.lua
@@ -9,6 +9,7 @@ add_requires(
"vcpkg::curl",
"vcpkg::cxxopts",
"vcpkg::doctest",
+ "vcpkg::eastl",
"vcpkg::fmt",
"vcpkg::gsl-lite",
"vcpkg::http-parser",
@@ -24,6 +25,8 @@ add_requires(
"vcpkg::zlib"
)
+add_defines("EASTL_STD_ITERATOR_CATEGORY_ENABLED")
+
set_policy("build.ccache", false)
if is_plat("windows") then