aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-04-07 16:53:55 +0200
committerGitHub Enterprise <[email protected]>2026-04-07 16:53:55 +0200
commit4d8fae7636ad45900f22253621b9f7d51d0b646e (patch)
tree37fdf97870f216d465b4cb66563c5c366262483d /src/zenutil
parentdisable zencompute in bundle step (diff)
downloadzen-4d8fae7636ad45900f22253621b9f7d51d0b646e.tar.xz
zen-4d8fae7636ad45900f22253621b9f7d51d0b646e.zip
incremental dehydrate (#921)
- Feature: Incremental CAS-based hydration/dehydration replacing the previous full-copy approach - Feature: S3 hydration backend with multipart upload/download support - Feature: Configurable thread pools for hub instance provisioning and hydration `--hub-instance-provision-threads` defaults to `max(cpu_count / 4, 2)`. Set to 0 for synchronous operation. `--hub-hydration-threads` defaults to `max(cpu_count / 4, 2)`. Set to 0 for synchronous operation. - Improvement: Hub triggers GC on instance before deprovisioning to compact storage before dehydration - Improvement: GC status now reports pending triggers as running - Improvement: S3 client debug logging gated behind verbose mode to reduce log noise at default verbosity - Improvement: Hub dashboard Resources tile now shows total memory - Improvement: `filesystemutils` moved from `zenremotestore` to `zenutil` for broader reuse - Improvement: Hub uses separate provision and hydration worker pools to avoid deadlocks - Improvement: Hibernate/wake/deprovision on non-existent or already-in-target-state modules are idempotent - Improvement: `ScopedTemporaryDirectory` with empty path now creates a temporary directory instead of asserting
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/cloud/s3client.cpp74
-rw-r--r--src/zenutil/filesystemutils.cpp721
-rw-r--r--src/zenutil/include/zenutil/cloud/s3client.h1
-rw-r--r--src/zenutil/include/zenutil/filesystemutils.h98
-rw-r--r--src/zenutil/zenutil.cpp2
5 files changed, 879 insertions, 17 deletions
diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp
index d9fde05d9..ef4b344ce 100644
--- a/src/zenutil/cloud/s3client.cpp
+++ b/src/zenutil/cloud/s3client.cpp
@@ -148,6 +148,7 @@ S3Client::S3Client(const S3ClientOptions& Options)
, m_Credentials(Options.Credentials)
, m_CredentialProvider(Options.CredentialProvider)
, m_HttpClient(BuildEndpoint(), Options.HttpSettings)
+, m_Verbose(Options.HttpSettings.Verbose)
{
m_Host = BuildHostHeader();
ZEN_INFO("S3 client configured for bucket '{}' in region '{}' (endpoint: {}, {})",
@@ -338,7 +339,10 @@ S3Client::PutObject(std::string_view Key, IoBuffer Content)
return S3Result{std::move(Err)};
}
- ZEN_DEBUG("S3 PUT '{}' succeeded ({} bytes)", Key, Content.GetSize());
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 PUT '{}' succeeded ({} bytes)", Key, Content.GetSize());
+ }
return {};
}
@@ -362,7 +366,10 @@ S3Client::GetObject(std::string_view Key, const std::filesystem::path& TempFileP
return S3GetObjectResult{S3Result{std::move(Err)}, {}};
}
- ZEN_DEBUG("S3 GET '{}' succeeded ({} bytes)", Key, Response.ResponsePayload.GetSize());
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 GET '{}' succeeded ({} bytes)", Key, Response.ResponsePayload.GetSize());
+ }
return S3GetObjectResult{{}, std::move(Response.ResponsePayload)};
}
@@ -403,11 +410,14 @@ S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t Ran
return S3GetObjectResult{S3Result{std::move(Err)}, {}};
}
- ZEN_DEBUG("S3 GET range '{}' [{}-{}] succeeded ({} bytes)",
- Key,
- RangeStart,
- RangeStart + RangeSize - 1,
- Response.ResponsePayload.GetSize());
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 GET range '{}' [{}-{}] succeeded ({} bytes)",
+ Key,
+ RangeStart,
+ RangeStart + RangeSize - 1,
+ Response.ResponsePayload.GetSize());
+ }
return S3GetObjectResult{{}, std::move(Response.ResponsePayload)};
}
@@ -426,7 +436,10 @@ S3Client::DeleteObject(std::string_view Key)
return S3Result{std::move(Err)};
}
- ZEN_DEBUG("S3 DELETE '{}' succeeded", Key);
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 DELETE '{}' succeeded", Key);
+ }
return {};
}
@@ -468,7 +481,10 @@ S3Client::HeadObject(std::string_view Key)
Info.LastModified = *V;
}
- ZEN_DEBUG("S3 HEAD '{}' succeeded (size={})", Key, Info.Size);
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 HEAD '{}' succeeded (size={})", Key, Info.Size);
+ }
return S3HeadObjectResult{{}, std::move(Info), HeadObjectResult::Found};
}
@@ -565,10 +581,16 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys)
}
ContinuationToken = std::string(NextToken);
- ZEN_DEBUG("S3 ListObjectsV2 prefix='{}' fetching next page ({} objects so far)", Prefix, Result.Objects.size());
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 ListObjectsV2 prefix='{}' fetching next page ({} objects so far)", Prefix, Result.Objects.size());
+ }
}
- ZEN_DEBUG("S3 ListObjectsV2 prefix='{}' returned {} objects", Prefix, Result.Objects.size());
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 ListObjectsV2 prefix='{}' returned {} objects", Prefix, Result.Objects.size());
+ }
return Result;
}
@@ -607,7 +629,10 @@ S3Client::CreateMultipartUpload(std::string_view Key)
return S3CreateMultipartUploadResult{S3Result{std::move(Err)}, {}};
}
- ZEN_DEBUG("S3 CreateMultipartUpload '{}' succeeded (uploadId={})", Key, UploadId);
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 CreateMultipartUpload '{}' succeeded (uploadId={})", Key, UploadId);
+ }
return S3CreateMultipartUploadResult{{}, std::string(UploadId)};
}
@@ -642,7 +667,10 @@ S3Client::UploadPart(std::string_view Key, std::string_view UploadId, uint32_t P
return S3UploadPartResult{S3Result{std::move(Err)}, {}};
}
- ZEN_DEBUG("S3 UploadPart '{}' part {} succeeded ({} bytes, etag={})", Key, PartNumber, Content.GetSize(), *ETag);
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 UploadPart '{}' part {} succeeded ({} bytes, etag={})", Key, PartNumber, Content.GetSize(), *ETag);
+ }
return S3UploadPartResult{{}, *ETag};
}
@@ -691,7 +719,10 @@ S3Client::CompleteMultipartUpload(std::string_view Key,
return S3Result{std::move(Err)};
}
- ZEN_DEBUG("S3 CompleteMultipartUpload '{}' succeeded ({} parts)", Key, PartETags.size());
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 CompleteMultipartUpload '{}' succeeded ({} parts)", Key, PartETags.size());
+ }
return {};
}
@@ -712,7 +743,10 @@ S3Client::AbortMultipartUpload(std::string_view Key, std::string_view UploadId)
return S3Result{std::move(Err)};
}
- ZEN_DEBUG("S3 AbortMultipartUpload '{}' succeeded (uploadId={})", Key, UploadId);
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 AbortMultipartUpload '{}' succeeded (uploadId={})", Key, UploadId);
+ }
return {};
}
@@ -755,7 +789,10 @@ S3Client::PutObjectMultipart(std::string_view Key,
return PutObject(Key, TotalSize > 0 ? FetchRange(0, TotalSize) : IoBuffer{});
}
- ZEN_DEBUG("S3 multipart upload '{}': {} bytes in ~{} parts", Key, TotalSize, (TotalSize + PartSize - 1) / PartSize);
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 multipart upload '{}': {} bytes in ~{} parts", Key, TotalSize, (TotalSize + PartSize - 1) / PartSize);
+ }
S3CreateMultipartUploadResult InitResult = CreateMultipartUpload(Key);
if (!InitResult)
@@ -803,7 +840,10 @@ S3Client::PutObjectMultipart(std::string_view Key,
throw;
}
- ZEN_DEBUG("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), TotalSize);
+ if (m_Verbose)
+ {
+ ZEN_INFO("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), TotalSize);
+ }
return {};
}
diff --git a/src/zenutil/filesystemutils.cpp b/src/zenutil/filesystemutils.cpp
new file mode 100644
index 000000000..9b7953f95
--- /dev/null
+++ b/src/zenutil/filesystemutils.cpp
@@ -0,0 +1,721 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/filesystemutils.h>
+
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/parallelwork.h>
+#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+#endif // ZEN_WITH_TESTS
+
+namespace zen {
+
+BufferedOpenFile::BufferedOpenFile(const std::filesystem::path Path,
+ std::atomic<uint64_t>& OpenReadCount,
+ std::atomic<uint64_t>& CurrentOpenFileCount,
+ std::atomic<uint64_t>& ReadCount,
+ std::atomic<uint64_t>& ReadByteCount)
+: m_Source(Path, BasicFile::Mode::kRead)
+, m_SourceSize(m_Source.FileSize())
+, m_OpenReadCount(OpenReadCount)
+, m_CurrentOpenFileCount(CurrentOpenFileCount)
+, m_ReadCount(ReadCount)
+, m_ReadByteCount(ReadByteCount)
+
+{
+ m_OpenReadCount++;
+ m_CurrentOpenFileCount++;
+}
+
+BufferedOpenFile::~BufferedOpenFile()
+{
+ m_CurrentOpenFileCount--;
+}
+
+CompositeBuffer
+BufferedOpenFile::GetRange(uint64_t Offset, uint64_t Size)
+{
+ ZEN_TRACE_CPU("BufferedOpenFile::GetRange");
+
+ ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache);
+ auto _ = MakeGuard([&]() { ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache); });
+
+ ZEN_ASSERT((Offset + Size) <= m_SourceSize);
+ const uint64_t BlockIndexStart = Offset / BlockSize;
+ const uint64_t BlockIndexEnd = (Offset + Size - 1) / BlockSize;
+
+ std::vector<SharedBuffer> BufferRanges;
+ BufferRanges.reserve(BlockIndexEnd - BlockIndexStart + 1);
+
+ uint64_t ReadOffset = Offset;
+ for (uint64_t BlockIndex = BlockIndexStart; BlockIndex <= BlockIndexEnd; BlockIndex++)
+ {
+ const uint64_t BlockStartOffset = BlockIndex * BlockSize;
+ if (m_CacheBlockIndex != BlockIndex)
+ {
+ uint64_t CacheSize = Min(BlockSize, m_SourceSize - BlockStartOffset);
+ ZEN_ASSERT(CacheSize > 0);
+ m_Cache = IoBuffer(CacheSize);
+ m_Source.Read(m_Cache.GetMutableView().GetData(), CacheSize, BlockStartOffset);
+ m_ReadCount++;
+ m_ReadByteCount += CacheSize;
+ m_CacheBlockIndex = BlockIndex;
+ }
+
+ const uint64_t BytesRead = ReadOffset - Offset;
+ ZEN_ASSERT(BlockStartOffset <= ReadOffset);
+ const uint64_t OffsetIntoBlock = ReadOffset - BlockStartOffset;
+ ZEN_ASSERT(OffsetIntoBlock < m_Cache.GetSize());
+ const uint64_t BlockBytes = Min(m_Cache.GetSize() - OffsetIntoBlock, Size - BytesRead);
+ BufferRanges.emplace_back(SharedBuffer(IoBuffer(m_Cache, OffsetIntoBlock, BlockBytes)));
+ ReadOffset += BlockBytes;
+ }
+ CompositeBuffer Result(std::move(BufferRanges));
+ ZEN_ASSERT(Result.GetSize() == Size);
+ return Result;
+}
+
+bool
+IsFileWithRetry(const std::filesystem::path& Path)
+{
+ std::error_code Ec;
+ bool Result = IsFile(Path, Ec);
+ for (size_t Retries = 0; Ec && Retries < 3; Retries++)
+ {
+ Sleep(100 + int(Retries * 50));
+ Ec.clear();
+ Result = IsFile(Path, Ec);
+ }
+ if (Ec)
+ {
+ throw std::system_error(std::error_code(Ec.value(), std::system_category()),
+ fmt::format("Failed to check path '{}' is file, reason: ({}) {}", Path, Ec.value(), Ec.message()));
+ }
+ return Result;
+}
+
+bool
+SetFileReadOnlyWithRetry(const std::filesystem::path& Path, bool ReadOnly)
+{
+ std::error_code Ec;
+ bool Result = SetFileReadOnly(Path, ReadOnly, Ec);
+ for (size_t Retries = 0; Ec && Retries < 3; Retries++)
+ {
+ if (!IsFileWithRetry(Path))
+ {
+ return false;
+ }
+ Sleep(100 + int(Retries * 50));
+ Ec.clear();
+ Result = SetFileReadOnly(Path, ReadOnly, Ec);
+ }
+ if (Ec)
+ {
+ throw std::system_error(std::error_code(Ec.value(), std::system_category()),
+ fmt::format("Failed {} read only flag for file '{}', reason: ({}) {}",
+ ReadOnly ? "setting" : "clearing",
+ Path,
+ Ec.value(),
+ Ec.message()));
+ }
+ return Result;
+}
+
+std::error_code
+RenameFileWithRetry(const std::filesystem::path& SourcePath, const std::filesystem::path& TargetPath)
+{
+ std::error_code Ec;
+ RenameFile(SourcePath, TargetPath, Ec);
+ for (size_t Retries = 0; Ec && Retries < 5; Retries++)
+ {
+ ZEN_ASSERT_SLOW(IsFile(SourcePath));
+ Sleep(50 + int(Retries * 150));
+ Ec.clear();
+ RenameFile(SourcePath, TargetPath, Ec);
+ }
+ return Ec;
+}
+
+std::error_code
+RenameDirectoryWithRetry(const std::filesystem::path& SourcePath, const std::filesystem::path& TargetPath)
+{
+ std::error_code Ec;
+ RenameDirectory(SourcePath, TargetPath, Ec);
+ for (size_t Retries = 0; Ec && Retries < 5; Retries++)
+ {
+ ZEN_ASSERT_SLOW(IsDir(SourcePath));
+ Sleep(50 + int(Retries * 150));
+ Ec.clear();
+ RenameDirectory(SourcePath, TargetPath, Ec);
+ }
+ return Ec;
+}
+
+std::error_code
+TryRemoveFile(const std::filesystem::path& Path)
+{
+ std::error_code Ec;
+ RemoveFile(Path, Ec);
+ if (Ec)
+ {
+ if (IsFile(Path, Ec))
+ {
+ Ec.clear();
+ RemoveFile(Path, Ec);
+ if (Ec)
+ {
+ return Ec;
+ }
+ }
+ }
+ return {};
+}
+
+void
+RemoveFileWithRetry(const std::filesystem::path& Path)
+{
+ std::error_code Ec;
+ RemoveFile(Path, Ec);
+ for (size_t Retries = 0; Ec && Retries < 6; Retries++)
+ {
+ if (!IsFileWithRetry(Path))
+ {
+ return;
+ }
+ Sleep(100 + int(Retries * 50));
+ Ec.clear();
+ RemoveFile(Path, Ec);
+ }
+ if (Ec)
+ {
+ throw std::system_error(std::error_code(Ec.value(), std::system_category()),
+ fmt::format("Failed removing file '{}', reason: ({}) {}", Path, Ec.value(), Ec.message()));
+ }
+}
+
+void
+FastCopyFile(bool AllowFileClone,
+ bool UseSparseFiles,
+ const std::filesystem::path& SourceFilePath,
+ const std::filesystem::path& TargetFilePath,
+ uint64_t RawSize,
+ std::atomic<uint64_t>& WriteCount,
+ std::atomic<uint64_t>& WriteByteCount,
+ std::atomic<uint64_t>& CloneCount,
+ std::atomic<uint64_t>& CloneByteCount)
+{
+ ZEN_TRACE_CPU("CopyFile");
+ if (AllowFileClone && TryCloneFile(SourceFilePath, TargetFilePath))
+ {
+ WriteCount += 1;
+ WriteByteCount += RawSize;
+ CloneCount += 1;
+ CloneByteCount += RawSize;
+ }
+ else
+ {
+ BasicFile TargetFile(TargetFilePath, BasicFile::Mode::kTruncate);
+ if (UseSparseFiles)
+ {
+ PrepareFileForScatteredWrite(TargetFile.Handle(), RawSize);
+ }
+ uint64_t Offset = 0;
+ if (!ScanFile(SourceFilePath, 512u * 1024u, [&](const void* Data, size_t Size) {
+ TargetFile.Write(Data, Size, Offset);
+ Offset += Size;
+ WriteCount++;
+ WriteByteCount += Size;
+ }))
+ {
+ throw std::runtime_error(fmt::format("Failed to copy file '{}' to '{}'", SourceFilePath, TargetFilePath));
+ }
+ }
+}
+
+void
+GetDirectoryContent(WorkerThreadPool& WorkerPool,
+ const std::filesystem::path& Path,
+ DirectoryContentFlags Flags,
+ DirectoryContent& OutContent)
+{
+ struct Visitor : public GetDirectoryContentVisitor
+ {
+ Visitor(zen::DirectoryContent& OutContent, const std::filesystem::path& InRootPath) : Content(OutContent), RootPath(InRootPath) {}
+ virtual bool AsyncAllowDirectory(const std::filesystem::path& Parent, const std::filesystem::path& DirectoryName) const
+ {
+ ZEN_UNUSED(Parent, DirectoryName);
+ return true;
+ }
+ virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& InContent)
+ {
+ std::vector<std::filesystem::path> Files;
+ std::vector<std::filesystem::path> Directories;
+
+ if (!InContent.FileNames.empty())
+ {
+ Files.reserve(InContent.FileNames.size());
+ for (const std::filesystem::path& FileName : InContent.FileNames)
+ {
+ if (RelativeRoot.empty())
+ {
+ Files.push_back(RootPath / FileName);
+ }
+ else
+ {
+ Files.push_back(RootPath / RelativeRoot / FileName);
+ }
+ }
+ }
+
+ if (!InContent.DirectoryNames.empty())
+ {
+ Directories.reserve(InContent.DirectoryNames.size());
+ for (const std::filesystem::path& DirName : InContent.DirectoryNames)
+ {
+ if (RelativeRoot.empty())
+ {
+ Directories.push_back(RootPath / DirName);
+ }
+ else
+ {
+ Directories.push_back(RootPath / RelativeRoot / DirName);
+ }
+ }
+ }
+
+ Lock.WithExclusiveLock([&]() {
+ if (!InContent.FileNames.empty())
+ {
+ for (const std::filesystem::path& FileName : InContent.FileNames)
+ {
+ if (RelativeRoot.empty())
+ {
+ Content.Files.push_back(RootPath / FileName);
+ }
+ else
+ {
+ Content.Files.push_back(RootPath / RelativeRoot / FileName);
+ }
+ }
+ }
+ if (!InContent.FileSizes.empty())
+ {
+ Content.FileSizes.insert(Content.FileSizes.end(), InContent.FileSizes.begin(), InContent.FileSizes.end());
+ }
+ if (!InContent.FileAttributes.empty())
+ {
+ Content.FileAttributes.insert(Content.FileAttributes.end(),
+ InContent.FileAttributes.begin(),
+ InContent.FileAttributes.end());
+ }
+ if (!InContent.FileModificationTicks.empty())
+ {
+ Content.FileModificationTicks.insert(Content.FileModificationTicks.end(),
+ InContent.FileModificationTicks.begin(),
+ InContent.FileModificationTicks.end());
+ }
+
+ if (!InContent.DirectoryNames.empty())
+ {
+ for (const std::filesystem::path& DirName : InContent.DirectoryNames)
+ {
+ if (RelativeRoot.empty())
+ {
+ Content.Directories.push_back(RootPath / DirName);
+ }
+ else
+ {
+ Content.Directories.push_back(RootPath / RelativeRoot / DirName);
+ }
+ }
+ }
+ if (!InContent.DirectoryAttributes.empty())
+ {
+ Content.DirectoryAttributes.insert(Content.DirectoryAttributes.end(),
+ InContent.DirectoryAttributes.begin(),
+ InContent.DirectoryAttributes.end());
+ }
+ });
+ }
+ RwLock Lock;
+ zen::DirectoryContent& Content;
+ const std::filesystem::path& RootPath;
+ };
+
+ Visitor RootVisitor(OutContent, Path);
+
+ Latch PendingWork(1);
+ GetDirectoryContent(Path, Flags, RootVisitor, WorkerPool, PendingWork);
+ PendingWork.CountDown();
+ PendingWork.Wait();
+}
+
+CleanDirectoryResult
+CleanDirectory(
+ WorkerThreadPool& IOWorkerPool,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ const std::filesystem::path& Path,
+ std::span<const std::string> ExcludeDirectories,
+ std::function<void(const std::string_view Details, uint64_t TotalCount, uint64_t RemainingCount, bool IsPaused, bool IsAborted)>&&
+ ProgressFunc,
+ uint32_t ProgressUpdateDelayMS)
+{
+ ZEN_TRACE_CPU("CleanDirectory");
+ Stopwatch Timer;
+
+ std::atomic<uint64_t> DiscoveredItemCount = 0;
+ std::atomic<uint64_t> DeletedItemCount = 0;
+ std::atomic<uint64_t> DeletedByteCount = 0;
+
+ std::vector<std::filesystem::path> DirectoriesToDelete;
+ CleanDirectoryResult Result;
+ RwLock ResultLock;
+ auto _ = MakeGuard([&]() {
+ Result.DeletedCount = DeletedItemCount.load();
+ Result.DeletedByteCount = DeletedByteCount.load();
+ Result.FoundCount = DiscoveredItemCount.load();
+ });
+
+ ParallelWork Work(AbortFlag,
+ PauseFlag,
+ ProgressFunc ? WorkerThreadPool::EMode::DisableBacklog : WorkerThreadPool::EMode::EnableBacklog);
+
+ struct AsyncVisitor : public GetDirectoryContentVisitor
+ {
+ AsyncVisitor(const std::filesystem::path& InPath,
+ std::atomic<bool>& InAbortFlag,
+ std::atomic<uint64_t>& InDiscoveredItemCount,
+ std::atomic<uint64_t>& InDeletedItemCount,
+ std::atomic<uint64_t>& InDeletedByteCount,
+ std::span<const std::string> InExcludeDirectories,
+ std::vector<std::filesystem::path>& OutDirectoriesToDelete,
+ CleanDirectoryResult& InResult,
+ RwLock& InResultLock)
+ : Path(InPath)
+ , AbortFlag(InAbortFlag)
+ , DiscoveredItemCount(InDiscoveredItemCount)
+ , DeletedItemCount(InDeletedItemCount)
+ , DeletedByteCount(InDeletedByteCount)
+ , ExcludeDirectories(InExcludeDirectories)
+ , DirectoriesToDelete(OutDirectoriesToDelete)
+ , Result(InResult)
+ , ResultLock(InResultLock)
+ {
+ }
+
+ virtual bool AsyncAllowDirectory(const std::filesystem::path& Parent, const std::filesystem::path& DirectoryName) const override
+ {
+ ZEN_UNUSED(Parent);
+
+ if (AbortFlag)
+ {
+ return false;
+ }
+ const std::string DirectoryString = DirectoryName.string();
+ for (const std::string_view ExcludeDirectory : ExcludeDirectories)
+ {
+ if (DirectoryString == ExcludeDirectory)
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& Content) override
+ {
+ ZEN_TRACE_CPU("CleanDirectory_AsyncVisitDirectory");
+ if (!AbortFlag)
+ {
+ DiscoveredItemCount += Content.FileNames.size();
+
+ ZEN_TRACE_CPU("DeleteFiles");
+ std::vector<std::pair<std::filesystem::path, std::error_code>> FailedRemovePaths;
+ for (size_t FileIndex = 0; FileIndex < Content.FileNames.size(); FileIndex++)
+ {
+ const std::filesystem::path& FileName = Content.FileNames[FileIndex];
+ const std::filesystem::path FilePath = (Path / RelativeRoot / FileName).make_preferred();
+
+ bool IsRemoved = false;
+ std::error_code Ec;
+ (void)SetFileReadOnly(FilePath, false, Ec);
+ for (size_t Retries = 0; Ec && Retries < 3; Retries++)
+ {
+ if (!IsFileWithRetry(FilePath))
+ {
+ IsRemoved = true;
+ Ec.clear();
+ break;
+ }
+ Sleep(100 + int(Retries * 50));
+ Ec.clear();
+ (void)SetFileReadOnly(FilePath, false, Ec);
+ }
+ if (!IsRemoved && !Ec)
+ {
+ (void)RemoveFile(FilePath, Ec);
+ for (size_t Retries = 0; Ec && Retries < 6; Retries++)
+ {
+ if (!IsFileWithRetry(FilePath))
+ {
+ IsRemoved = true;
+ Ec.clear();
+ break;
+ }
+ Sleep(100 + int(Retries * 50));
+ Ec.clear();
+ (void)RemoveFile(FilePath, Ec);
+ }
+ }
+ if (!IsRemoved && Ec)
+ {
+ FailedRemovePaths.push_back(std::make_pair(FilePath, Ec));
+ }
+ else
+ {
+ DeletedItemCount++;
+ DeletedByteCount += Content.FileSizes[FileIndex];
+ }
+ }
+
+ if (!FailedRemovePaths.empty())
+ {
+ RwLock::ExclusiveLockScope _(ResultLock);
+ Result.FailedRemovePaths.insert(Result.FailedRemovePaths.end(), FailedRemovePaths.begin(), FailedRemovePaths.end());
+ }
+ else if (!RelativeRoot.empty())
+ {
+ DiscoveredItemCount++;
+ RwLock::ExclusiveLockScope _(ResultLock);
+ DirectoriesToDelete.push_back(RelativeRoot);
+ }
+ }
+ }
+ const std::filesystem::path& Path;
+ std::atomic<bool>& AbortFlag;
+ std::atomic<uint64_t>& DiscoveredItemCount;
+ std::atomic<uint64_t>& DeletedItemCount;
+ std::atomic<uint64_t>& DeletedByteCount;
+ std::span<const std::string> ExcludeDirectories;
+ std::vector<std::filesystem::path>& DirectoriesToDelete;
+ CleanDirectoryResult& Result;
+ RwLock& ResultLock;
+ } Visitor(Path,
+ AbortFlag,
+ DiscoveredItemCount,
+ DeletedItemCount,
+ DeletedByteCount,
+ ExcludeDirectories,
+ DirectoriesToDelete,
+ Result,
+ ResultLock);
+
+ GetDirectoryContent(Path,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFileSizes,
+ Visitor,
+ IOWorkerPool,
+ Work.PendingWork());
+
+ uint64_t LastUpdateTimeMs = Timer.GetElapsedTimeMs();
+
+ if (ProgressFunc && ProgressUpdateDelayMS != 0)
+ {
+ Work.Wait(ProgressUpdateDelayMS, [&](bool IsAborted, bool IsPaused, ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ LastUpdateTimeMs = Timer.GetElapsedTimeMs();
+
+ uint64_t Deleted = DeletedItemCount.load();
+ uint64_t DeletedBytes = DeletedByteCount.load();
+ uint64_t Discovered = DiscoveredItemCount.load();
+ std::string Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes));
+ ProgressFunc(Details, Discovered, Discovered - Deleted, IsPaused, IsAborted);
+ });
+ }
+ else
+ {
+ Work.Wait();
+ }
+
+ {
+ ZEN_TRACE_CPU("DeleteDirs");
+
+ std::sort(DirectoriesToDelete.begin(),
+ DirectoriesToDelete.end(),
+ [](const std::filesystem::path& Lhs, const std::filesystem::path& Rhs) {
+ auto DistanceLhs = std::distance(Lhs.begin(), Lhs.end());
+ auto DistanceRhs = std::distance(Rhs.begin(), Rhs.end());
+ return DistanceLhs > DistanceRhs;
+ });
+
+ for (const std::filesystem::path& DirectoryToDelete : DirectoriesToDelete)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+ else
+ {
+ while (PauseFlag && !AbortFlag)
+ {
+ Sleep(2000);
+ }
+ }
+
+ const std::filesystem::path FullPath = Path / DirectoryToDelete;
+
+ std::error_code Ec;
+ RemoveDir(FullPath, Ec);
+ if (Ec)
+ {
+ for (size_t Retries = 0; Ec && Retries < 3; Retries++)
+ {
+ if (!IsDir(FullPath))
+ {
+ Ec.clear();
+ break;
+ }
+ Sleep(100 + int(Retries * 50));
+ Ec.clear();
+ RemoveDir(FullPath, Ec);
+ }
+ }
+ if (Ec)
+ {
+ RwLock::ExclusiveLockScope __(ResultLock);
+ Result.FailedRemovePaths.push_back(std::make_pair(DirectoryToDelete, Ec));
+ }
+ else
+ {
+ DeletedItemCount++;
+ }
+
+ if (ProgressFunc)
+ {
+ uint64_t NowMs = Timer.GetElapsedTimeMs();
+
+ if ((NowMs - LastUpdateTimeMs) > 0)
+ {
+ LastUpdateTimeMs = NowMs;
+
+ uint64_t Deleted = DeletedItemCount.load();
+ uint64_t DeletedBytes = DeletedByteCount.load();
+ uint64_t Discovered = DiscoveredItemCount.load();
+ std::string Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes));
+ ProgressFunc(Details, Discovered, Discovered - Deleted, PauseFlag, AbortFlag);
+ }
+ }
+ }
+ }
+
+ return Result;
+}
+
+bool
+CleanAndRemoveDirectory(WorkerThreadPool& WorkerPool,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ const std::filesystem::path& Directory)
+{
+ if (!IsDir(Directory))
+ {
+ return true;
+ }
+ if (CleanDirectoryResult Res = CleanDirectory(
+ WorkerPool,
+ AbortFlag,
+ PauseFlag,
+ Directory,
+ {},
+ [](const std::string_view Details, uint64_t TotalCount, uint64_t RemainingCount, bool IsPaused, bool IsAborted) {
+ ZEN_UNUSED(Details, TotalCount, RemainingCount, IsPaused, IsAborted);
+ },
+ 1000);
+ Res.FailedRemovePaths.empty())
+ {
+ std::error_code Ec;
+ RemoveDir(Directory, Ec);
+ return !Ec;
+ }
+ return false;
+}
+
+#if ZEN_WITH_TESTS
+
+void
+filesystemutils_forcelink()
+{
+}
+
+namespace {
+ void GenerateFile(const std::filesystem::path& Path) { BasicFile _(Path, BasicFile::Mode::kTruncate); }
+} // namespace
+
+TEST_SUITE_BEGIN("zenutil.filesystemutils");
+
+TEST_CASE("filesystemutils.CleanDirectory")
+{
+ ScopedTemporaryDirectory TmpDir;
+
+ CreateDirectories(TmpDir.Path() / ".keepme");
+ GenerateFile(TmpDir.Path() / ".keepme" / "keep");
+ GenerateFile(TmpDir.Path() / "deleteme1");
+ GenerateFile(TmpDir.Path() / "deleteme2");
+ GenerateFile(TmpDir.Path() / "deleteme3");
+ CreateDirectories(TmpDir.Path() / ".keepmenot");
+ CreateDirectories(TmpDir.Path() / "no.keepme");
+
+ CreateDirectories(TmpDir.Path() / "DeleteMe");
+ GenerateFile(TmpDir.Path() / "DeleteMe" / "delete1");
+ CreateDirectories(TmpDir.Path() / "CantDeleteMe");
+ GenerateFile(TmpDir.Path() / "CantDeleteMe" / "delete1");
+ GenerateFile(TmpDir.Path() / "CantDeleteMe" / "delete2");
+ GenerateFile(TmpDir.Path() / "CantDeleteMe" / "delete3");
+ CreateDirectories(TmpDir.Path() / "CantDeleteMe" / ".keepme");
+ CreateDirectories(TmpDir.Path() / "CantDeleteMe" / "DeleteMe2");
+ GenerateFile(TmpDir.Path() / "CantDeleteMe" / "DeleteMe2" / "delete2");
+ GenerateFile(TmpDir.Path() / "CantDeleteMe" / "DeleteMe2" / "delete3");
+ CreateDirectories(TmpDir.Path() / "CantDeleteMe2" / ".keepme");
+ CreateDirectories(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept");
+ GenerateFile(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept" / "kept1");
+ GenerateFile(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept" / "kept2");
+ GenerateFile(TmpDir.Path() / "CantDeleteMe2" / "deleteme");
+
+ WorkerThreadPool Pool(4);
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+
+ CleanDirectory(Pool, AbortFlag, PauseFlag, TmpDir.Path(), std::vector<std::string>{".keepme"}, {}, 0);
+
+ CHECK(IsDir(TmpDir.Path() / ".keepme"));
+ CHECK(IsFile(TmpDir.Path() / ".keepme" / "keep"));
+ CHECK(!IsFile(TmpDir.Path() / "deleteme1"));
+ CHECK(!IsFile(TmpDir.Path() / "deleteme2"));
+ CHECK(!IsFile(TmpDir.Path() / "deleteme3"));
+ CHECK(!IsFile(TmpDir.Path() / ".keepmenot"));
+ CHECK(!IsFile(TmpDir.Path() / "no.keepme"));
+
+ CHECK(!IsDir(TmpDir.Path() / "DeleteMe"));
+ CHECK(!IsDir(TmpDir.Path() / "DeleteMe2"));
+
+ CHECK(IsDir(TmpDir.Path() / "CantDeleteMe"));
+ CHECK(IsDir(TmpDir.Path() / "CantDeleteMe" / ".keepme"));
+ CHECK(IsDir(TmpDir.Path() / "CantDeleteMe2"));
+ CHECK(IsDir(TmpDir.Path() / "CantDeleteMe2" / ".keepme"));
+ CHECK(IsDir(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept"));
+ CHECK(IsFile(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept" / "kept1"));
+ CHECK(IsFile(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept" / "kept2"));
+ CHECK(!IsFile(TmpDir.Path() / "CantDeleteMe2" / "deleteme"));
+}
+
+TEST_SUITE_END();
+
+#endif
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/cloud/s3client.h b/src/zenutil/include/zenutil/cloud/s3client.h
index f1f0df0e4..b0402d231 100644
--- a/src/zenutil/include/zenutil/cloud/s3client.h
+++ b/src/zenutil/include/zenutil/cloud/s3client.h
@@ -219,6 +219,7 @@ private:
SigV4Credentials m_Credentials;
Ref<ImdsCredentialProvider> m_CredentialProvider;
HttpClient m_HttpClient;
+ bool m_Verbose = false;
// Cached signing key (only changes once per day, protected by RwLock for thread safety)
mutable RwLock m_SigningKeyLock;
diff --git a/src/zenutil/include/zenutil/filesystemutils.h b/src/zenutil/include/zenutil/filesystemutils.h
new file mode 100644
index 000000000..05defd1a8
--- /dev/null
+++ b/src/zenutil/include/zenutil/filesystemutils.h
@@ -0,0 +1,98 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/basicfile.h>
+#include <zencore/filesystem.h>
+
+namespace zen {
+
+class CompositeBuffer;
+
+class BufferedOpenFile
+{
+public:
+ static constexpr uint64_t BlockSize = 256u * 1024u;
+
+ BufferedOpenFile(const std::filesystem::path Path,
+ std::atomic<uint64_t>& OpenReadCount,
+ std::atomic<uint64_t>& CurrentOpenFileCount,
+ std::atomic<uint64_t>& ReadCount,
+ std::atomic<uint64_t>& ReadByteCount);
+ ~BufferedOpenFile();
+ BufferedOpenFile() = delete;
+ BufferedOpenFile(const BufferedOpenFile&) = delete;
+ BufferedOpenFile(BufferedOpenFile&&) = delete;
+ BufferedOpenFile& operator=(BufferedOpenFile&&) = delete;
+ BufferedOpenFile& operator=(const BufferedOpenFile&) = delete;
+
+ CompositeBuffer GetRange(uint64_t Offset, uint64_t Size);
+
+public:
+ void* Handle() { return m_Source.Handle(); }
+
+private:
+ BasicFile m_Source;
+ const uint64_t m_SourceSize;
+ std::atomic<uint64_t>& m_OpenReadCount;
+ std::atomic<uint64_t>& m_CurrentOpenFileCount;
+ std::atomic<uint64_t>& m_ReadCount;
+ std::atomic<uint64_t>& m_ReadByteCount;
+ uint64_t m_CacheBlockIndex = (uint64_t)-1;
+ IoBuffer m_Cache;
+};
+
+bool IsFileWithRetry(const std::filesystem::path& Path);
+
+bool SetFileReadOnlyWithRetry(const std::filesystem::path& Path, bool ReadOnly);
+
+std::error_code RenameFileWithRetry(const std::filesystem::path& SourcePath, const std::filesystem::path& TargetPath);
+std::error_code RenameDirectoryWithRetry(const std::filesystem::path& SourcePath, const std::filesystem::path& TargetPath);
+
+std::error_code TryRemoveFile(const std::filesystem::path& Path);
+
+void RemoveFileWithRetry(const std::filesystem::path& Path);
+
+void FastCopyFile(bool AllowFileClone,
+ bool UseSparseFiles,
+ const std::filesystem::path& SourceFilePath,
+ const std::filesystem::path& TargetFilePath,
+ uint64_t RawSize,
+ std::atomic<uint64_t>& WriteCount,
+ std::atomic<uint64_t>& WriteByteCount,
+ std::atomic<uint64_t>& CloneCount,
+ std::atomic<uint64_t>& CloneByteCount);
+
+struct CleanDirectoryResult
+{
+ uint64_t FoundCount = 0;
+ uint64_t DeletedCount = 0;
+ uint64_t DeletedByteCount = 0;
+ std::vector<std::pair<std::filesystem::path, std::error_code>> FailedRemovePaths;
+};
+
+class WorkerThreadPool;
+
+void GetDirectoryContent(WorkerThreadPool& WorkerPool,
+ const std::filesystem::path& Path,
+ DirectoryContentFlags Flags,
+ DirectoryContent& OutContent);
+
+CleanDirectoryResult CleanDirectory(
+ WorkerThreadPool& IOWorkerPool,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ const std::filesystem::path& Path,
+ std::span<const std::string> ExcludeDirectories,
+ std::function<void(const std::string_view Details, uint64_t TotalCount, uint64_t RemainingCount, bool IsPaused, bool IsAborted)>&&
+ ProgressFunc,
+ uint32_t ProgressUpdateDelayMS);
+
+bool CleanAndRemoveDirectory(WorkerThreadPool& WorkerPool,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ const std::filesystem::path& Directory);
+
+void filesystemutils_forcelink(); // internal
+
+} // namespace zen
diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp
index 516eec3a9..b282adc03 100644
--- a/src/zenutil/zenutil.cpp
+++ b/src/zenutil/zenutil.cpp
@@ -9,6 +9,7 @@
# include <zenutil/cloud/s3client.h>
# include <zenutil/cloud/sigv4.h>
# include <zenutil/config/commandlineoptions.h>
+# include <zenutil/filesystemutils.h>
# include <zenutil/rpcrecording.h>
# include <zenutil/splitconsole/logstreamlistener.h>
# include <zenutil/process/subprocessmanager.h>
@@ -22,6 +23,7 @@ zenutil_forcelinktests()
cache::rpcrecord_forcelink();
commandlineoptions_forcelink();
consul::consul_forcelink();
+ filesystemutils_forcelink();
imdscredentials_forcelink();
logstreamlistener_forcelink();
subprocessmanager_forcelink();