aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore/builds/buildstorageoperations.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenremotestore/builds/buildstorageoperations.cpp')
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp504
1 files changed, 200 insertions, 304 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index 3a41cd7eb..389f8614d 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -382,6 +382,46 @@ namespace {
return CompositeBuffer{};
}
+ std::filesystem::path TryMoveDownloadedChunk(IoBuffer& BlockBuffer, const std::filesystem::path& Path, bool ForceDiskBased)
+ {
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize))
+ {
+ ZEN_TRACE_CPU("MoveTempFullBlock");
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ RenameFile(TempBlobPath, Path, Ec);
+ if (Ec)
+ {
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
+ }
+ else
+ {
+ return Path;
+ }
+ }
+ }
+
+ if (ForceDiskBased)
+ {
+ // Could not be moved and rather large, lets store it on disk
+ ZEN_TRACE_CPU("WriteTempFullBlock");
+ TemporaryFile::SafeWriteFile(Path, BlockBuffer);
+ BlockBuffer = {};
+ return Path;
+ }
+
+ return {};
+ }
+
} // namespace
class ReadFileCache
@@ -673,9 +713,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
auto EndProgress =
MakeGuard([&]() { m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); });
- ZEN_ASSERT((!m_Options.PrimeCacheOnly) ||
- (m_Options.PrimeCacheOnly && (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off)));
-
m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::ScanExistingData, (uint32_t)TaskSteps::StepCount);
CreateDirectories(m_CacheFolderPath);
@@ -690,20 +727,14 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound;
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound;
- if (!m_Options.PrimeCacheOnly)
- {
- ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound);
- }
+ ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound);
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound;
- if (!m_Options.PrimeCacheOnly)
- {
- ScanTempBlocksFolder(CachedBlocksFound);
- }
+ ScanTempBlocksFolder(CachedBlocksFound);
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceIndexesLeftToFindToRemoteIndex;
- if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging)
+ if (m_Options.EnableTargetFolderScavenging)
{
// Pick up all whole files we can use from current local state
ZEN_TRACE_CPU("GetLocalSequences");
@@ -738,7 +769,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
std::vector<ScavengedSequenceCopyOperation> ScavengedSequenceCopyOperations;
uint64_t ScavengedPathsCount = 0;
- if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging)
+ if (m_Options.EnableOtherDownloadsScavenging)
{
ZEN_TRACE_CPU("GetScavengedSequences");
@@ -882,7 +913,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCopyChunkDataIndex;
std::vector<CopyChunkData> CopyChunkDatas;
- if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging)
+ if (m_Options.EnableTargetFolderScavenging)
{
ZEN_TRACE_CPU("GetLocalChunks");
@@ -902,7 +933,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs();
}
- if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging)
+ if (m_Options.EnableOtherDownloadsScavenging)
{
ZEN_TRACE_CPU("GetScavengeChunks");
@@ -1025,30 +1056,23 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
ZEN_TRACE_CPU("BlockCacheFileExists");
for (const ChunkBlockAnalyser::NeededBlock& NeededBlock : NeededBlocks)
{
- if (m_Options.PrimeCacheOnly)
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex];
+ bool UsingCachedBlock = false;
+ if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end())
{
- FetchBlockIndexes.push_back(NeededBlock.BlockIndex);
- }
- else
- {
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex];
- bool UsingCachedBlock = false;
- if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end())
- {
- TotalPartWriteCount++;
+ TotalPartWriteCount++;
- std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
- if (IsFile(BlockPath))
- {
- CachedChunkBlockIndexes.push_back(NeededBlock.BlockIndex);
- UsingCachedBlock = true;
- }
- }
- if (!UsingCachedBlock)
+ std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
+ if (IsFile(BlockPath))
{
- FetchBlockIndexes.push_back(NeededBlock.BlockIndex);
+ CachedChunkBlockIndexes.push_back(NeededBlock.BlockIndex);
+ UsingCachedBlock = true;
}
}
+ if (!UsingCachedBlock)
+ {
+ FetchBlockIndexes.push_back(NeededBlock.BlockIndex);
+ }
}
}
@@ -1229,10 +1253,9 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
FilteredRate FilteredDownloadedBytesPerSecond;
FilteredRate FilteredWrittenBytesPerSecond;
- std::unique_ptr<OperationLogOutput::ProgressBar> WriteProgressBarPtr(
- m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing"));
- OperationLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr);
- ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ std::unique_ptr<OperationLogOutput::ProgressBar> WriteProgressBarPtr(m_LogOutput.CreateProgressBar("Writing"));
+ OperationLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr);
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
TotalPartWriteCount += CopyChunkDatas.size();
TotalPartWriteCount += ScavengedSequenceCopyOperations.size();
@@ -1245,37 +1268,34 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
{
break;
}
- if (!m_Options.PrimeCacheOnly)
- {
- Work.ScheduleWork(
- m_IOWorkerPool,
- [this,
- &ScavengedPaths,
- &ScavengedSequenceCopyOperations,
- &ScavengedContents,
- &FilteredWrittenBytesPerSecond,
- ScavengeOpIndex,
- &WritePartsComplete,
- TotalPartWriteCount](std::atomic<bool>&) mutable {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_WriteScavenged");
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &ScavengedPaths,
+ &ScavengedSequenceCopyOperations,
+ &ScavengedContents,
+ &FilteredWrittenBytesPerSecond,
+ ScavengeOpIndex,
+ &WritePartsComplete,
+ TotalPartWriteCount](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WriteScavenged");
- FilteredWrittenBytesPerSecond.Start();
+ FilteredWrittenBytesPerSecond.Start();
- const ScavengedSequenceCopyOperation& ScavengeOp = ScavengedSequenceCopyOperations[ScavengeOpIndex];
- const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex];
- const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex];
+ const ScavengedSequenceCopyOperation& ScavengeOp = ScavengedSequenceCopyOperations[ScavengeOpIndex];
+ const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex];
+ const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex];
- WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp);
+ WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp);
- if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
+ if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
}
- });
- }
+ }
+ });
}
for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++)
@@ -1285,16 +1305,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
break;
}
- if (m_Options.PrimeCacheOnly)
- {
- const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex;
- if (ExistsResult.ExistingBlobs.contains(m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]))
- {
- m_DownloadStats.RequestsCompleteCount++;
- continue;
- }
- }
-
Work.ScheduleWork(
m_IOWorkerPool,
[this,
@@ -1338,7 +1348,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
for (size_t CopyDataIndex = 0; CopyDataIndex < CopyChunkDatas.size(); CopyDataIndex++)
{
- ZEN_ASSERT(!m_Options.PrimeCacheOnly);
if (m_AbortFlag)
{
break;
@@ -1397,7 +1406,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
for (uint32_t BlockIndex : CachedChunkBlockIndexes)
{
- ZEN_ASSERT(!m_Options.PrimeCacheOnly);
if (m_AbortFlag)
{
break;
@@ -1464,7 +1472,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocks.BlockRanges.size();)
{
- ZEN_ASSERT(!m_Options.PrimeCacheOnly);
if (m_AbortFlag)
{
break;
@@ -1627,12 +1634,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
break;
}
- if (m_Options.PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash))
- {
- m_DownloadStats.RequestsCompleteCount++;
- continue;
- }
-
Work.ScheduleWork(
m_NetworkPool,
[this,
@@ -1684,14 +1685,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
- if (m_Storage.CacheStorage && m_Options.PopulateCache)
- {
- m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
- BlockDescription.BlockHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(BlockBuffer)));
- }
-
uint64_t BlockSize = BlockBuffer.GetSize();
m_DownloadStats.DownloadedBlockCount++;
m_DownloadStats.DownloadedBlockByteCount += BlockSize;
@@ -1700,47 +1693,27 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
FilteredDownloadedBytesPerSecond.Stop();
}
- if (!m_Options.PrimeCacheOnly)
- {
- std::filesystem::path BlockChunkPath;
+ const bool PutInCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache;
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
- {
- IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockSize))
- {
- ZEN_TRACE_CPU("MoveTempFullBlock");
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
- {
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
- RenameFile(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
- {
- BlockChunkPath = std::filesystem::path{};
+ std::filesystem::path BlockChunkPath = TryMoveDownloadedChunk(
+ BlockBuffer,
+ m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(),
+ /* ForceDiskBased */ PutInCache || (BlockSize > m_Options.MaximumInMemoryPayloadSize));
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
- }
- }
- }
- }
-
- if (BlockChunkPath.empty() && (BlockSize > m_Options.MaximumInMemoryPayloadSize))
+ if (PutInCache)
+ {
+ ZEN_ASSERT(!BlockChunkPath.empty());
+ IoBuffer CacheBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (CacheBuffer)
{
- ZEN_TRACE_CPU("WriteTempFullBlock");
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ BlockDescription.BlockHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(CacheBuffer)));
}
+ }
+ {
if (!m_AbortFlag)
{
Work.ScheduleWork(
@@ -1840,12 +1813,11 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
{
CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load()));
}
- std::string WriteDetails = m_Options.PrimeCacheOnly ? ""
- : fmt::format(" {}/{} ({}B/s) written{}",
- NiceBytes(m_WrittenChunkByteCount.load()),
- NiceBytes(BytesToWrite),
- NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()),
- CloneDetails);
+ std::string WriteDetails = fmt::format(" {}/{} ({}B/s) written{}",
+ NiceBytes(m_WrittenChunkByteCount.load()),
+ NiceBytes(BytesToWrite),
+ NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()),
+ CloneDetails);
std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}",
m_DownloadStats.RequestsCompleteCount.load(),
@@ -1855,11 +1827,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
WriteDetails);
std::string Task;
- if (m_Options.PrimeCacheOnly)
- {
- Task = "Downloading ";
- }
- else if ((m_WrittenChunkByteCount < BytesToWrite) || (BytesToValidate == 0))
+ if ((m_WrittenChunkByteCount < BytesToWrite) || (BytesToValidate == 0))
{
Task = "Writing chunks ";
}
@@ -1868,15 +1836,13 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
Task = "Verifying chunks ";
}
- WriteProgressBar.UpdateState(
- {.Task = Task,
- .Details = Details,
- .TotalCount = m_Options.PrimeCacheOnly ? TotalRequestCount : (BytesToWrite + BytesToValidate),
- .RemainingCount = m_Options.PrimeCacheOnly ? (TotalRequestCount - m_DownloadStats.RequestsCompleteCount.load())
- : ((BytesToWrite + BytesToValidate) -
- (m_WrittenChunkByteCount.load() + m_ValidatedChunkByteCount.load())),
- .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
- false);
+ WriteProgressBar.UpdateState({.Task = Task,
+ .Details = Details,
+ .TotalCount = (BytesToWrite + BytesToValidate),
+ .RemainingCount = ((BytesToWrite + BytesToValidate) -
+ (m_WrittenChunkByteCount.load() + m_ValidatedChunkByteCount.load())),
+ .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
});
}
@@ -1891,34 +1857,31 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
return;
}
- if (!m_Options.PrimeCacheOnly)
+ uint32_t RawSequencesMissingWriteCount = 0;
+ for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++)
{
- uint32_t RawSequencesMissingWriteCount = 0;
- for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++)
+ const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex];
+ if (SequenceIndexChunksLeftToWriteCounter.load() != 0)
{
- const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex];
- if (SequenceIndexChunksLeftToWriteCounter.load() != 0)
+ RawSequencesMissingWriteCount++;
+ const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex];
+ ZEN_ASSERT(!IncompletePath.empty());
+ const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex];
+ if (!m_Options.IsQuiet)
{
- RawSequencesMissingWriteCount++;
- const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
- const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex];
- ZEN_ASSERT(!IncompletePath.empty());
- const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex];
- if (!m_Options.IsQuiet)
- {
- ZEN_OPERATION_LOG_INFO(m_LogOutput,
- "{}: Max count {}, Current count {}",
- IncompletePath,
- ExpectedSequenceCount,
- SequenceIndexChunksLeftToWriteCounter.load());
- }
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount);
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "{}: Max count {}, Current count {}",
+ IncompletePath,
+ ExpectedSequenceCount,
+ SequenceIndexChunksLeftToWriteCounter.load());
}
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount);
}
- ZEN_ASSERT(RawSequencesMissingWriteCount == 0);
- ZEN_ASSERT(m_WrittenChunkByteCount == BytesToWrite);
- ZEN_ASSERT(m_ValidatedChunkByteCount == BytesToValidate);
}
+ ZEN_ASSERT(RawSequencesMissingWriteCount == 0);
+ ZEN_ASSERT(m_WrittenChunkByteCount == BytesToWrite);
+ ZEN_ASSERT(m_ValidatedChunkByteCount == BytesToValidate);
const uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() +
m_DownloadStats.DownloadedBlockByteCount.load() +
@@ -1948,11 +1911,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
m_WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS();
}
- if (m_Options.PrimeCacheOnly)
- {
- return;
- }
-
m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::PrepareTarget, (uint32_t)TaskSteps::StepCount);
tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex;
@@ -3104,17 +3062,13 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
FilteredRate& FilteredDownloadedBytesPerSecond,
FilteredRate& FilteredWrittenBytesPerSecond)
{
- std::filesystem::path ExistingCompressedChunkPath;
- if (!m_Options.PrimeCacheOnly)
+ const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ std::filesystem::path ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash);
+ if (!ExistingCompressedChunkPath.empty())
{
- const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash);
- if (!ExistingCompressedChunkPath.empty())
+ if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount)
{
- if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
+ FilteredDownloadedBytesPerSecond.Stop();
}
}
if (!m_AbortFlag)
@@ -3220,10 +3174,9 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
RemoteChunkIndex,
&FilteredWrittenBytesPerSecond,
ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable {
- IoBufferFileReference FileRef;
- bool EnableBacklog = Payload.GetFileReference(FileRef);
AsyncWriteDownloadedChunk(m_Options.ZenFolderPath,
RemoteChunkIndex,
+ ExistsResult,
std::move(ChunkTargetPtrs),
WriteCache,
Work,
@@ -3231,8 +3184,7 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
SequenceIndexChunksLeftToWriteCounters,
WritePartsComplete,
TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- EnableBacklog);
+ FilteredWrittenBytesPerSecond);
});
}
});
@@ -3293,14 +3245,6 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde
FilteredDownloadedBytesPerSecond.Stop();
}
- if (Payload && m_Storage.CacheStorage && m_Options.PopulateCache)
- {
- m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
- ChunkHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(Payload)));
- }
-
OnDownloaded(std::move(Payload));
});
}
@@ -3320,31 +3264,22 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde
}
if (!m_AbortFlag)
{
- if (BuildBlob && m_Storage.CacheStorage && m_Options.PopulateCache)
- {
- m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
- ChunkHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(BuildBlob)));
- }
if (!BuildBlob)
{
throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
}
- if (!m_Options.PrimeCacheOnly)
+
+ if (!m_AbortFlag)
{
- if (!m_AbortFlag)
+ uint64_t BlobSize = BuildBlob.GetSize();
+ m_DownloadStats.DownloadedChunkCount++;
+ m_DownloadStats.DownloadedChunkByteCount += BlobSize;
+ if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount)
{
- uint64_t BlobSize = BuildBlob.GetSize();
- m_DownloadStats.DownloadedChunkCount++;
- m_DownloadStats.DownloadedChunkByteCount += BlobSize;
- if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
-
- OnDownloaded(std::move(BuildBlob));
+ FilteredDownloadedBytesPerSecond.Stop();
}
+
+ OnDownloaded(std::move(BuildBlob));
}
}
}
@@ -3388,61 +3323,17 @@ BuildsOperationUpdateFolder::DownloadPartialBlock(
FilteredDownloadedBytesPerSecond.Stop();
}
- std::filesystem::path BlockChunkPath;
-
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ IoHashStream RangeId;
+ for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths)
{
- IoBufferFileReference FileRef;
- if (BlockRangeBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockRangeBufferSize))
- {
- ZEN_TRACE_CPU("MoveTempPartialBlock");
-
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
- {
- BlockRangeBuffer.SetDeleteOnClose(false);
- BlockRangeBuffer = {};
-
- IoHashStream RangeId;
- for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths)
- {
- RangeId.Append(&Range.first, sizeof(uint64_t));
- RangeId.Append(&Range.second, sizeof(uint64_t));
- }
-
- BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash());
- RenameFile(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
- {
- BlockChunkPath = std::filesystem::path{};
-
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockRangeBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockRangeBufferSize, true);
- BlockRangeBuffer.SetDeleteOnClose(true);
- }
- }
- }
+ RangeId.Append(&Range.first, sizeof(uint64_t));
+ RangeId.Append(&Range.second, sizeof(uint64_t));
}
+ std::filesystem::path BlockChunkPath =
+ TryMoveDownloadedChunk(BlockRangeBuffer,
+ m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()),
+ /* ForceDiskBased */ BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize);
- if (BlockChunkPath.empty() && (BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize))
- {
- ZEN_TRACE_CPU("WriteTempPartialBlock");
-
- IoHashStream RangeId;
- for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths)
- {
- RangeId.Append(&Range.first, sizeof(uint64_t));
- RangeId.Append(&Range.second, sizeof(uint64_t));
- }
-
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash());
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockRangeBuffer);
- BlockRangeBuffer = {};
- }
if (!m_AbortFlag)
{
OnDownloaded(std::move(BlockRangeBuffer), std::move(BlockChunkPath), BlockRangeStartIndex, BlockOffsetAndLengths);
@@ -3570,16 +3461,36 @@ BuildsOperationUpdateFolder::DownloadPartialBlock(
// Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3
// Upload to cache (if enabled) and use the whole payload for the remaining ranges
- if (m_Storage.CacheStorage && m_Options.PopulateCache)
+ const uint64_t Size = RangeBuffers.PayloadBuffer.GetSize();
+
+ const bool PopulateCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache;
+
+ std::filesystem::path BlockPath =
+ TryMoveDownloadedChunk(RangeBuffers.PayloadBuffer,
+ m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(),
+ /* ForceDiskBased */ PopulateCache || Size > m_Options.MaximumInMemoryPayloadSize);
+ if (!BlockPath.empty())
+ {
+ RangeBuffers.PayloadBuffer = IoBufferBuilder::MakeFromFile(BlockPath);
+ if (!RangeBuffers.PayloadBuffer)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed to read block {} from temporary path '{}'", BlockDescription.BlockHash, BlockPath));
+ }
+ RangeBuffers.PayloadBuffer.SetDeleteOnClose(true);
+ }
+
+ if (PopulateCache)
{
m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
BlockDescription.BlockHash,
ZenContentType::kCompressedBinary,
- CompositeBuffer(std::vector<IoBuffer>{RangeBuffers.PayloadBuffer}));
- if (m_AbortFlag)
- {
- break;
- }
+ CompositeBuffer(SharedBuffer(RangeBuffers.PayloadBuffer)));
+ }
+
+ if (m_AbortFlag)
+ {
+ break;
}
SubRangeCount = Ranges.size() - SubRangeCountComplete;
@@ -4331,6 +4242,7 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc
void
BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath,
uint32_t RemoteChunkIndex,
+ const BlobsExistsResult& ExistsResult,
std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs,
BufferedWriteFileCache& WriteCache,
ParallelWork& Work,
@@ -4338,8 +4250,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
std::atomic<uint64_t>& WritePartsComplete,
const uint64_t TotalPartWriteCount,
- FilteredRate& FilteredWrittenBytesPerSecond,
- bool EnableBacklog)
+ FilteredRate& FilteredWrittenBytesPerSecond)
{
ZEN_TRACE_CPU("AsyncWriteDownloadedChunk");
@@ -4347,43 +4258,28 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
const uint64_t Size = Payload.GetSize();
- std::filesystem::path CompressedChunkPath;
+ const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
+
+ const bool PopulateCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache;
- // Check if the dowloaded chunk is file based and we can move it directly without rewriting it
+ std::filesystem::path CompressedChunkPath =
+ TryMoveDownloadedChunk(Payload,
+ m_TempDownloadFolderPath / ChunkHash.ToHexString(),
+ /* ForceDiskBased */ PopulateCache || Size > m_Options.MaximumInMemoryPayloadSize);
+ if (PopulateCache)
{
- IoBufferFileReference FileRef;
- if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == Size))
+ IoBuffer CacheBlob = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (CacheBlob)
{
- ZEN_TRACE_CPU("MoveTempChunk");
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
- {
- Payload.SetDeleteOnClose(false);
- Payload = {};
- CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString();
- RenameFile(TempBlobPath, CompressedChunkPath, Ec);
- if (Ec)
- {
- CompressedChunkPath = std::filesystem::path{};
-
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, Size, true);
- Payload.SetDeleteOnClose(true);
- }
- }
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(CacheBlob)));
}
}
- if (CompressedChunkPath.empty() && (Size > m_Options.MaximumInMemoryPayloadSize))
- {
- ZEN_TRACE_CPU("WriteTempChunk");
- // Could not be moved and rather large, lets store it on disk
- CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString();
- TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload);
- Payload = {};
- }
+ IoBufferFileReference FileRef;
+ bool EnableBacklog = !CompressedChunkPath.empty() || Payload.GetFileReference(FileRef);
Work.ScheduleWork(
m_IOWorkerPool,