diff options
| author | Dan Engelbrecht <[email protected]> | 2026-04-07 16:53:55 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-04-07 16:53:55 +0200 |
| commit | 4d8fae7636ad45900f22253621b9f7d51d0b646e (patch) | |
| tree | 37fdf97870f216d465b4cb66563c5c366262483d /src/zenutil | |
| parent | disable zencompute in bundle step (diff) | |
| download | zen-4d8fae7636ad45900f22253621b9f7d51d0b646e.tar.xz zen-4d8fae7636ad45900f22253621b9f7d51d0b646e.zip | |
incremental dehydrate (#921)
- Feature: Incremental CAS-based hydration/dehydration replacing the previous full-copy approach
- Feature: S3 hydration backend with multipart upload/download support
- Feature: Configurable thread pools for hub instance provisioning and hydration
`--hub-instance-provision-threads` defaults to `max(cpu_count / 4, 2)`. Set to 0 for synchronous operation.
`--hub-hydration-threads` defaults to `max(cpu_count / 4, 2)`. Set to 0 for synchronous operation.
- Improvement: Hub triggers GC on instance before deprovisioning to compact storage before dehydration
- Improvement: GC status now reports pending triggers as running
- Improvement: S3 client debug logging gated behind verbose mode to reduce log noise at default verbosity
- Improvement: Hub dashboard Resources tile now shows total memory
- Improvement: `filesystemutils` moved from `zenremotestore` to `zenutil` for broader reuse
- Improvement: Hub uses separate provision and hydration worker pools to avoid deadlocks
- Improvement: Hibernate/wake/deprovision on non-existent or already-in-target-state modules are idempotent
- Improvement: `ScopedTemporaryDirectory` with empty path now creates a temporary directory instead of asserting
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/cloud/s3client.cpp | 74 | ||||
| -rw-r--r-- | src/zenutil/filesystemutils.cpp | 721 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cloud/s3client.h | 1 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/filesystemutils.h | 98 | ||||
| -rw-r--r-- | src/zenutil/zenutil.cpp | 2 |
5 files changed, 879 insertions, 17 deletions
diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp index d9fde05d9..ef4b344ce 100644 --- a/src/zenutil/cloud/s3client.cpp +++ b/src/zenutil/cloud/s3client.cpp @@ -148,6 +148,7 @@ S3Client::S3Client(const S3ClientOptions& Options) , m_Credentials(Options.Credentials) , m_CredentialProvider(Options.CredentialProvider) , m_HttpClient(BuildEndpoint(), Options.HttpSettings) +, m_Verbose(Options.HttpSettings.Verbose) { m_Host = BuildHostHeader(); ZEN_INFO("S3 client configured for bucket '{}' in region '{}' (endpoint: {}, {})", @@ -338,7 +339,10 @@ S3Client::PutObject(std::string_view Key, IoBuffer Content) return S3Result{std::move(Err)}; } - ZEN_DEBUG("S3 PUT '{}' succeeded ({} bytes)", Key, Content.GetSize()); + if (m_Verbose) + { + ZEN_INFO("S3 PUT '{}' succeeded ({} bytes)", Key, Content.GetSize()); + } return {}; } @@ -362,7 +366,10 @@ S3Client::GetObject(std::string_view Key, const std::filesystem::path& TempFileP return S3GetObjectResult{S3Result{std::move(Err)}, {}}; } - ZEN_DEBUG("S3 GET '{}' succeeded ({} bytes)", Key, Response.ResponsePayload.GetSize()); + if (m_Verbose) + { + ZEN_INFO("S3 GET '{}' succeeded ({} bytes)", Key, Response.ResponsePayload.GetSize()); + } return S3GetObjectResult{{}, std::move(Response.ResponsePayload)}; } @@ -403,11 +410,14 @@ S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t Ran return S3GetObjectResult{S3Result{std::move(Err)}, {}}; } - ZEN_DEBUG("S3 GET range '{}' [{}-{}] succeeded ({} bytes)", - Key, - RangeStart, - RangeStart + RangeSize - 1, - Response.ResponsePayload.GetSize()); + if (m_Verbose) + { + ZEN_INFO("S3 GET range '{}' [{}-{}] succeeded ({} bytes)", + Key, + RangeStart, + RangeStart + RangeSize - 1, + Response.ResponsePayload.GetSize()); + } return S3GetObjectResult{{}, std::move(Response.ResponsePayload)}; } @@ -426,7 +436,10 @@ S3Client::DeleteObject(std::string_view Key) return S3Result{std::move(Err)}; } - ZEN_DEBUG("S3 DELETE '{}' succeeded", Key); + if (m_Verbose) + { + ZEN_INFO("S3 DELETE '{}' succeeded", Key); + } return {}; } @@ -468,7 +481,10 @@ S3Client::HeadObject(std::string_view Key) Info.LastModified = *V; } - ZEN_DEBUG("S3 HEAD '{}' succeeded (size={})", Key, Info.Size); + if (m_Verbose) + { + ZEN_INFO("S3 HEAD '{}' succeeded (size={})", Key, Info.Size); + } return S3HeadObjectResult{{}, std::move(Info), HeadObjectResult::Found}; } @@ -565,10 +581,16 @@ S3Client::ListObjects(std::string_view Prefix, uint32_t MaxKeys) } ContinuationToken = std::string(NextToken); - ZEN_DEBUG("S3 ListObjectsV2 prefix='{}' fetching next page ({} objects so far)", Prefix, Result.Objects.size()); + if (m_Verbose) + { + ZEN_INFO("S3 ListObjectsV2 prefix='{}' fetching next page ({} objects so far)", Prefix, Result.Objects.size()); + } } - ZEN_DEBUG("S3 ListObjectsV2 prefix='{}' returned {} objects", Prefix, Result.Objects.size()); + if (m_Verbose) + { + ZEN_INFO("S3 ListObjectsV2 prefix='{}' returned {} objects", Prefix, Result.Objects.size()); + } return Result; } @@ -607,7 +629,10 @@ S3Client::CreateMultipartUpload(std::string_view Key) return S3CreateMultipartUploadResult{S3Result{std::move(Err)}, {}}; } - ZEN_DEBUG("S3 CreateMultipartUpload '{}' succeeded (uploadId={})", Key, UploadId); + if (m_Verbose) + { + ZEN_INFO("S3 CreateMultipartUpload '{}' succeeded (uploadId={})", Key, UploadId); + } return S3CreateMultipartUploadResult{{}, std::string(UploadId)}; } @@ -642,7 +667,10 @@ S3Client::UploadPart(std::string_view Key, std::string_view UploadId, uint32_t P return S3UploadPartResult{S3Result{std::move(Err)}, {}}; } - ZEN_DEBUG("S3 UploadPart '{}' part {} succeeded ({} bytes, etag={})", Key, PartNumber, Content.GetSize(), *ETag); + if (m_Verbose) + { + ZEN_INFO("S3 UploadPart '{}' part {} succeeded ({} bytes, etag={})", Key, PartNumber, Content.GetSize(), *ETag); + } return S3UploadPartResult{{}, *ETag}; } @@ -691,7 +719,10 @@ S3Client::CompleteMultipartUpload(std::string_view Key, return S3Result{std::move(Err)}; } - ZEN_DEBUG("S3 CompleteMultipartUpload '{}' succeeded ({} parts)", Key, PartETags.size()); + if (m_Verbose) + { + ZEN_INFO("S3 CompleteMultipartUpload '{}' succeeded ({} parts)", Key, PartETags.size()); + } return {}; } @@ -712,7 +743,10 @@ S3Client::AbortMultipartUpload(std::string_view Key, std::string_view UploadId) return S3Result{std::move(Err)}; } - ZEN_DEBUG("S3 AbortMultipartUpload '{}' succeeded (uploadId={})", Key, UploadId); + if (m_Verbose) + { + ZEN_INFO("S3 AbortMultipartUpload '{}' succeeded (uploadId={})", Key, UploadId); + } return {}; } @@ -755,7 +789,10 @@ S3Client::PutObjectMultipart(std::string_view Key, return PutObject(Key, TotalSize > 0 ? FetchRange(0, TotalSize) : IoBuffer{}); } - ZEN_DEBUG("S3 multipart upload '{}': {} bytes in ~{} parts", Key, TotalSize, (TotalSize + PartSize - 1) / PartSize); + if (m_Verbose) + { + ZEN_INFO("S3 multipart upload '{}': {} bytes in ~{} parts", Key, TotalSize, (TotalSize + PartSize - 1) / PartSize); + } S3CreateMultipartUploadResult InitResult = CreateMultipartUpload(Key); if (!InitResult) @@ -803,7 +840,10 @@ S3Client::PutObjectMultipart(std::string_view Key, throw; } - ZEN_DEBUG("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), TotalSize); + if (m_Verbose) + { + ZEN_INFO("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), TotalSize); + } return {}; } diff --git a/src/zenutil/filesystemutils.cpp b/src/zenutil/filesystemutils.cpp new file mode 100644 index 000000000..9b7953f95 --- /dev/null +++ b/src/zenutil/filesystemutils.cpp @@ -0,0 +1,721 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/filesystemutils.h> + +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/parallelwork.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> +#include <zencore/trace.h> + +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +# include <zencore/testutils.h> +#endif // ZEN_WITH_TESTS + +namespace zen { + +BufferedOpenFile::BufferedOpenFile(const std::filesystem::path Path, + std::atomic<uint64_t>& OpenReadCount, + std::atomic<uint64_t>& CurrentOpenFileCount, + std::atomic<uint64_t>& ReadCount, + std::atomic<uint64_t>& ReadByteCount) +: m_Source(Path, BasicFile::Mode::kRead) +, m_SourceSize(m_Source.FileSize()) +, m_OpenReadCount(OpenReadCount) +, m_CurrentOpenFileCount(CurrentOpenFileCount) +, m_ReadCount(ReadCount) +, m_ReadByteCount(ReadByteCount) + +{ + m_OpenReadCount++; + m_CurrentOpenFileCount++; +} + +BufferedOpenFile::~BufferedOpenFile() +{ + m_CurrentOpenFileCount--; +} + +CompositeBuffer +BufferedOpenFile::GetRange(uint64_t Offset, uint64_t Size) +{ + ZEN_TRACE_CPU("BufferedOpenFile::GetRange"); + + ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache); + auto _ = MakeGuard([&]() { ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache); }); + + ZEN_ASSERT((Offset + Size) <= m_SourceSize); + const uint64_t BlockIndexStart = Offset / BlockSize; + const uint64_t BlockIndexEnd = (Offset + Size - 1) / BlockSize; + + std::vector<SharedBuffer> BufferRanges; + BufferRanges.reserve(BlockIndexEnd - BlockIndexStart + 1); + + uint64_t ReadOffset = Offset; + for (uint64_t BlockIndex = BlockIndexStart; BlockIndex <= BlockIndexEnd; BlockIndex++) + { + const uint64_t BlockStartOffset = BlockIndex * BlockSize; + if (m_CacheBlockIndex != BlockIndex) + { + uint64_t CacheSize = Min(BlockSize, m_SourceSize - BlockStartOffset); + ZEN_ASSERT(CacheSize > 0); + m_Cache = IoBuffer(CacheSize); + m_Source.Read(m_Cache.GetMutableView().GetData(), CacheSize, BlockStartOffset); + m_ReadCount++; + m_ReadByteCount += CacheSize; + m_CacheBlockIndex = BlockIndex; + } + + const uint64_t BytesRead = ReadOffset - Offset; + ZEN_ASSERT(BlockStartOffset <= ReadOffset); + const uint64_t OffsetIntoBlock = ReadOffset - BlockStartOffset; + ZEN_ASSERT(OffsetIntoBlock < m_Cache.GetSize()); + const uint64_t BlockBytes = Min(m_Cache.GetSize() - OffsetIntoBlock, Size - BytesRead); + BufferRanges.emplace_back(SharedBuffer(IoBuffer(m_Cache, OffsetIntoBlock, BlockBytes))); + ReadOffset += BlockBytes; + } + CompositeBuffer Result(std::move(BufferRanges)); + ZEN_ASSERT(Result.GetSize() == Size); + return Result; +} + +bool +IsFileWithRetry(const std::filesystem::path& Path) +{ + std::error_code Ec; + bool Result = IsFile(Path, Ec); + for (size_t Retries = 0; Ec && Retries < 3; Retries++) + { + Sleep(100 + int(Retries * 50)); + Ec.clear(); + Result = IsFile(Path, Ec); + } + if (Ec) + { + throw std::system_error(std::error_code(Ec.value(), std::system_category()), + fmt::format("Failed to check path '{}' is file, reason: ({}) {}", Path, Ec.value(), Ec.message())); + } + return Result; +} + +bool +SetFileReadOnlyWithRetry(const std::filesystem::path& Path, bool ReadOnly) +{ + std::error_code Ec; + bool Result = SetFileReadOnly(Path, ReadOnly, Ec); + for (size_t Retries = 0; Ec && Retries < 3; Retries++) + { + if (!IsFileWithRetry(Path)) + { + return false; + } + Sleep(100 + int(Retries * 50)); + Ec.clear(); + Result = SetFileReadOnly(Path, ReadOnly, Ec); + } + if (Ec) + { + throw std::system_error(std::error_code(Ec.value(), std::system_category()), + fmt::format("Failed {} read only flag for file '{}', reason: ({}) {}", + ReadOnly ? "setting" : "clearing", + Path, + Ec.value(), + Ec.message())); + } + return Result; +} + +std::error_code +RenameFileWithRetry(const std::filesystem::path& SourcePath, const std::filesystem::path& TargetPath) +{ + std::error_code Ec; + RenameFile(SourcePath, TargetPath, Ec); + for (size_t Retries = 0; Ec && Retries < 5; Retries++) + { + ZEN_ASSERT_SLOW(IsFile(SourcePath)); + Sleep(50 + int(Retries * 150)); + Ec.clear(); + RenameFile(SourcePath, TargetPath, Ec); + } + return Ec; +} + +std::error_code +RenameDirectoryWithRetry(const std::filesystem::path& SourcePath, const std::filesystem::path& TargetPath) +{ + std::error_code Ec; + RenameDirectory(SourcePath, TargetPath, Ec); + for (size_t Retries = 0; Ec && Retries < 5; Retries++) + { + ZEN_ASSERT_SLOW(IsDir(SourcePath)); + Sleep(50 + int(Retries * 150)); + Ec.clear(); + RenameDirectory(SourcePath, TargetPath, Ec); + } + return Ec; +} + +std::error_code +TryRemoveFile(const std::filesystem::path& Path) +{ + std::error_code Ec; + RemoveFile(Path, Ec); + if (Ec) + { + if (IsFile(Path, Ec)) + { + Ec.clear(); + RemoveFile(Path, Ec); + if (Ec) + { + return Ec; + } + } + } + return {}; +} + +void +RemoveFileWithRetry(const std::filesystem::path& Path) +{ + std::error_code Ec; + RemoveFile(Path, Ec); + for (size_t Retries = 0; Ec && Retries < 6; Retries++) + { + if (!IsFileWithRetry(Path)) + { + return; + } + Sleep(100 + int(Retries * 50)); + Ec.clear(); + RemoveFile(Path, Ec); + } + if (Ec) + { + throw std::system_error(std::error_code(Ec.value(), std::system_category()), + fmt::format("Failed removing file '{}', reason: ({}) {}", Path, Ec.value(), Ec.message())); + } +} + +void +FastCopyFile(bool AllowFileClone, + bool UseSparseFiles, + const std::filesystem::path& SourceFilePath, + const std::filesystem::path& TargetFilePath, + uint64_t RawSize, + std::atomic<uint64_t>& WriteCount, + std::atomic<uint64_t>& WriteByteCount, + std::atomic<uint64_t>& CloneCount, + std::atomic<uint64_t>& CloneByteCount) +{ + ZEN_TRACE_CPU("CopyFile"); + if (AllowFileClone && TryCloneFile(SourceFilePath, TargetFilePath)) + { + WriteCount += 1; + WriteByteCount += RawSize; + CloneCount += 1; + CloneByteCount += RawSize; + } + else + { + BasicFile TargetFile(TargetFilePath, BasicFile::Mode::kTruncate); + if (UseSparseFiles) + { + PrepareFileForScatteredWrite(TargetFile.Handle(), RawSize); + } + uint64_t Offset = 0; + if (!ScanFile(SourceFilePath, 512u * 1024u, [&](const void* Data, size_t Size) { + TargetFile.Write(Data, Size, Offset); + Offset += Size; + WriteCount++; + WriteByteCount += Size; + })) + { + throw std::runtime_error(fmt::format("Failed to copy file '{}' to '{}'", SourceFilePath, TargetFilePath)); + } + } +} + +void +GetDirectoryContent(WorkerThreadPool& WorkerPool, + const std::filesystem::path& Path, + DirectoryContentFlags Flags, + DirectoryContent& OutContent) +{ + struct Visitor : public GetDirectoryContentVisitor + { + Visitor(zen::DirectoryContent& OutContent, const std::filesystem::path& InRootPath) : Content(OutContent), RootPath(InRootPath) {} + virtual bool AsyncAllowDirectory(const std::filesystem::path& Parent, const std::filesystem::path& DirectoryName) const + { + ZEN_UNUSED(Parent, DirectoryName); + return true; + } + virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& InContent) + { + std::vector<std::filesystem::path> Files; + std::vector<std::filesystem::path> Directories; + + if (!InContent.FileNames.empty()) + { + Files.reserve(InContent.FileNames.size()); + for (const std::filesystem::path& FileName : InContent.FileNames) + { + if (RelativeRoot.empty()) + { + Files.push_back(RootPath / FileName); + } + else + { + Files.push_back(RootPath / RelativeRoot / FileName); + } + } + } + + if (!InContent.DirectoryNames.empty()) + { + Directories.reserve(InContent.DirectoryNames.size()); + for (const std::filesystem::path& DirName : InContent.DirectoryNames) + { + if (RelativeRoot.empty()) + { + Directories.push_back(RootPath / DirName); + } + else + { + Directories.push_back(RootPath / RelativeRoot / DirName); + } + } + } + + Lock.WithExclusiveLock([&]() { + if (!InContent.FileNames.empty()) + { + for (const std::filesystem::path& FileName : InContent.FileNames) + { + if (RelativeRoot.empty()) + { + Content.Files.push_back(RootPath / FileName); + } + else + { + Content.Files.push_back(RootPath / RelativeRoot / FileName); + } + } + } + if (!InContent.FileSizes.empty()) + { + Content.FileSizes.insert(Content.FileSizes.end(), InContent.FileSizes.begin(), InContent.FileSizes.end()); + } + if (!InContent.FileAttributes.empty()) + { + Content.FileAttributes.insert(Content.FileAttributes.end(), + InContent.FileAttributes.begin(), + InContent.FileAttributes.end()); + } + if (!InContent.FileModificationTicks.empty()) + { + Content.FileModificationTicks.insert(Content.FileModificationTicks.end(), + InContent.FileModificationTicks.begin(), + InContent.FileModificationTicks.end()); + } + + if (!InContent.DirectoryNames.empty()) + { + for (const std::filesystem::path& DirName : InContent.DirectoryNames) + { + if (RelativeRoot.empty()) + { + Content.Directories.push_back(RootPath / DirName); + } + else + { + Content.Directories.push_back(RootPath / RelativeRoot / DirName); + } + } + } + if (!InContent.DirectoryAttributes.empty()) + { + Content.DirectoryAttributes.insert(Content.DirectoryAttributes.end(), + InContent.DirectoryAttributes.begin(), + InContent.DirectoryAttributes.end()); + } + }); + } + RwLock Lock; + zen::DirectoryContent& Content; + const std::filesystem::path& RootPath; + }; + + Visitor RootVisitor(OutContent, Path); + + Latch PendingWork(1); + GetDirectoryContent(Path, Flags, RootVisitor, WorkerPool, PendingWork); + PendingWork.CountDown(); + PendingWork.Wait(); +} + +CleanDirectoryResult +CleanDirectory( + WorkerThreadPool& IOWorkerPool, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + const std::filesystem::path& Path, + std::span<const std::string> ExcludeDirectories, + std::function<void(const std::string_view Details, uint64_t TotalCount, uint64_t RemainingCount, bool IsPaused, bool IsAborted)>&& + ProgressFunc, + uint32_t ProgressUpdateDelayMS) +{ + ZEN_TRACE_CPU("CleanDirectory"); + Stopwatch Timer; + + std::atomic<uint64_t> DiscoveredItemCount = 0; + std::atomic<uint64_t> DeletedItemCount = 0; + std::atomic<uint64_t> DeletedByteCount = 0; + + std::vector<std::filesystem::path> DirectoriesToDelete; + CleanDirectoryResult Result; + RwLock ResultLock; + auto _ = MakeGuard([&]() { + Result.DeletedCount = DeletedItemCount.load(); + Result.DeletedByteCount = DeletedByteCount.load(); + Result.FoundCount = DiscoveredItemCount.load(); + }); + + ParallelWork Work(AbortFlag, + PauseFlag, + ProgressFunc ? WorkerThreadPool::EMode::DisableBacklog : WorkerThreadPool::EMode::EnableBacklog); + + struct AsyncVisitor : public GetDirectoryContentVisitor + { + AsyncVisitor(const std::filesystem::path& InPath, + std::atomic<bool>& InAbortFlag, + std::atomic<uint64_t>& InDiscoveredItemCount, + std::atomic<uint64_t>& InDeletedItemCount, + std::atomic<uint64_t>& InDeletedByteCount, + std::span<const std::string> InExcludeDirectories, + std::vector<std::filesystem::path>& OutDirectoriesToDelete, + CleanDirectoryResult& InResult, + RwLock& InResultLock) + : Path(InPath) + , AbortFlag(InAbortFlag) + , DiscoveredItemCount(InDiscoveredItemCount) + , DeletedItemCount(InDeletedItemCount) + , DeletedByteCount(InDeletedByteCount) + , ExcludeDirectories(InExcludeDirectories) + , DirectoriesToDelete(OutDirectoriesToDelete) + , Result(InResult) + , ResultLock(InResultLock) + { + } + + virtual bool AsyncAllowDirectory(const std::filesystem::path& Parent, const std::filesystem::path& DirectoryName) const override + { + ZEN_UNUSED(Parent); + + if (AbortFlag) + { + return false; + } + const std::string DirectoryString = DirectoryName.string(); + for (const std::string_view ExcludeDirectory : ExcludeDirectories) + { + if (DirectoryString == ExcludeDirectory) + { + return false; + } + } + return true; + } + + virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& Content) override + { + ZEN_TRACE_CPU("CleanDirectory_AsyncVisitDirectory"); + if (!AbortFlag) + { + DiscoveredItemCount += Content.FileNames.size(); + + ZEN_TRACE_CPU("DeleteFiles"); + std::vector<std::pair<std::filesystem::path, std::error_code>> FailedRemovePaths; + for (size_t FileIndex = 0; FileIndex < Content.FileNames.size(); FileIndex++) + { + const std::filesystem::path& FileName = Content.FileNames[FileIndex]; + const std::filesystem::path FilePath = (Path / RelativeRoot / FileName).make_preferred(); + + bool IsRemoved = false; + std::error_code Ec; + (void)SetFileReadOnly(FilePath, false, Ec); + for (size_t Retries = 0; Ec && Retries < 3; Retries++) + { + if (!IsFileWithRetry(FilePath)) + { + IsRemoved = true; + Ec.clear(); + break; + } + Sleep(100 + int(Retries * 50)); + Ec.clear(); + (void)SetFileReadOnly(FilePath, false, Ec); + } + if (!IsRemoved && !Ec) + { + (void)RemoveFile(FilePath, Ec); + for (size_t Retries = 0; Ec && Retries < 6; Retries++) + { + if (!IsFileWithRetry(FilePath)) + { + IsRemoved = true; + Ec.clear(); + break; + } + Sleep(100 + int(Retries * 50)); + Ec.clear(); + (void)RemoveFile(FilePath, Ec); + } + } + if (!IsRemoved && Ec) + { + FailedRemovePaths.push_back(std::make_pair(FilePath, Ec)); + } + else + { + DeletedItemCount++; + DeletedByteCount += Content.FileSizes[FileIndex]; + } + } + + if (!FailedRemovePaths.empty()) + { + RwLock::ExclusiveLockScope _(ResultLock); + Result.FailedRemovePaths.insert(Result.FailedRemovePaths.end(), FailedRemovePaths.begin(), FailedRemovePaths.end()); + } + else if (!RelativeRoot.empty()) + { + DiscoveredItemCount++; + RwLock::ExclusiveLockScope _(ResultLock); + DirectoriesToDelete.push_back(RelativeRoot); + } + } + } + const std::filesystem::path& Path; + std::atomic<bool>& AbortFlag; + std::atomic<uint64_t>& DiscoveredItemCount; + std::atomic<uint64_t>& DeletedItemCount; + std::atomic<uint64_t>& DeletedByteCount; + std::span<const std::string> ExcludeDirectories; + std::vector<std::filesystem::path>& DirectoriesToDelete; + CleanDirectoryResult& Result; + RwLock& ResultLock; + } Visitor(Path, + AbortFlag, + DiscoveredItemCount, + DeletedItemCount, + DeletedByteCount, + ExcludeDirectories, + DirectoriesToDelete, + Result, + ResultLock); + + GetDirectoryContent(Path, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFileSizes, + Visitor, + IOWorkerPool, + Work.PendingWork()); + + uint64_t LastUpdateTimeMs = Timer.GetElapsedTimeMs(); + + if (ProgressFunc && ProgressUpdateDelayMS != 0) + { + Work.Wait(ProgressUpdateDelayMS, [&](bool IsAborted, bool IsPaused, ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + LastUpdateTimeMs = Timer.GetElapsedTimeMs(); + + uint64_t Deleted = DeletedItemCount.load(); + uint64_t DeletedBytes = DeletedByteCount.load(); + uint64_t Discovered = DiscoveredItemCount.load(); + std::string Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)); + ProgressFunc(Details, Discovered, Discovered - Deleted, IsPaused, IsAborted); + }); + } + else + { + Work.Wait(); + } + + { + ZEN_TRACE_CPU("DeleteDirs"); + + std::sort(DirectoriesToDelete.begin(), + DirectoriesToDelete.end(), + [](const std::filesystem::path& Lhs, const std::filesystem::path& Rhs) { + auto DistanceLhs = std::distance(Lhs.begin(), Lhs.end()); + auto DistanceRhs = std::distance(Rhs.begin(), Rhs.end()); + return DistanceLhs > DistanceRhs; + }); + + for (const std::filesystem::path& DirectoryToDelete : DirectoriesToDelete) + { + if (AbortFlag) + { + break; + } + else + { + while (PauseFlag && !AbortFlag) + { + Sleep(2000); + } + } + + const std::filesystem::path FullPath = Path / DirectoryToDelete; + + std::error_code Ec; + RemoveDir(FullPath, Ec); + if (Ec) + { + for (size_t Retries = 0; Ec && Retries < 3; Retries++) + { + if (!IsDir(FullPath)) + { + Ec.clear(); + break; + } + Sleep(100 + int(Retries * 50)); + Ec.clear(); + RemoveDir(FullPath, Ec); + } + } + if (Ec) + { + RwLock::ExclusiveLockScope __(ResultLock); + Result.FailedRemovePaths.push_back(std::make_pair(DirectoryToDelete, Ec)); + } + else + { + DeletedItemCount++; + } + + if (ProgressFunc) + { + uint64_t NowMs = Timer.GetElapsedTimeMs(); + + if ((NowMs - LastUpdateTimeMs) > 0) + { + LastUpdateTimeMs = NowMs; + + uint64_t Deleted = DeletedItemCount.load(); + uint64_t DeletedBytes = DeletedByteCount.load(); + uint64_t Discovered = DiscoveredItemCount.load(); + std::string Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)); + ProgressFunc(Details, Discovered, Discovered - Deleted, PauseFlag, AbortFlag); + } + } + } + } + + return Result; +} + +bool +CleanAndRemoveDirectory(WorkerThreadPool& WorkerPool, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + const std::filesystem::path& Directory) +{ + if (!IsDir(Directory)) + { + return true; + } + if (CleanDirectoryResult Res = CleanDirectory( + WorkerPool, + AbortFlag, + PauseFlag, + Directory, + {}, + [](const std::string_view Details, uint64_t TotalCount, uint64_t RemainingCount, bool IsPaused, bool IsAborted) { + ZEN_UNUSED(Details, TotalCount, RemainingCount, IsPaused, IsAborted); + }, + 1000); + Res.FailedRemovePaths.empty()) + { + std::error_code Ec; + RemoveDir(Directory, Ec); + return !Ec; + } + return false; +} + +#if ZEN_WITH_TESTS + +void +filesystemutils_forcelink() +{ +} + +namespace { + void GenerateFile(const std::filesystem::path& Path) { BasicFile _(Path, BasicFile::Mode::kTruncate); } +} // namespace + +TEST_SUITE_BEGIN("zenutil.filesystemutils"); + +TEST_CASE("filesystemutils.CleanDirectory") +{ + ScopedTemporaryDirectory TmpDir; + + CreateDirectories(TmpDir.Path() / ".keepme"); + GenerateFile(TmpDir.Path() / ".keepme" / "keep"); + GenerateFile(TmpDir.Path() / "deleteme1"); + GenerateFile(TmpDir.Path() / "deleteme2"); + GenerateFile(TmpDir.Path() / "deleteme3"); + CreateDirectories(TmpDir.Path() / ".keepmenot"); + CreateDirectories(TmpDir.Path() / "no.keepme"); + + CreateDirectories(TmpDir.Path() / "DeleteMe"); + GenerateFile(TmpDir.Path() / "DeleteMe" / "delete1"); + CreateDirectories(TmpDir.Path() / "CantDeleteMe"); + GenerateFile(TmpDir.Path() / "CantDeleteMe" / "delete1"); + GenerateFile(TmpDir.Path() / "CantDeleteMe" / "delete2"); + GenerateFile(TmpDir.Path() / "CantDeleteMe" / "delete3"); + CreateDirectories(TmpDir.Path() / "CantDeleteMe" / ".keepme"); + CreateDirectories(TmpDir.Path() / "CantDeleteMe" / "DeleteMe2"); + GenerateFile(TmpDir.Path() / "CantDeleteMe" / "DeleteMe2" / "delete2"); + GenerateFile(TmpDir.Path() / "CantDeleteMe" / "DeleteMe2" / "delete3"); + CreateDirectories(TmpDir.Path() / "CantDeleteMe2" / ".keepme"); + CreateDirectories(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept"); + GenerateFile(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept" / "kept1"); + GenerateFile(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept" / "kept2"); + GenerateFile(TmpDir.Path() / "CantDeleteMe2" / "deleteme"); + + WorkerThreadPool Pool(4); + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + + CleanDirectory(Pool, AbortFlag, PauseFlag, TmpDir.Path(), std::vector<std::string>{".keepme"}, {}, 0); + + CHECK(IsDir(TmpDir.Path() / ".keepme")); + CHECK(IsFile(TmpDir.Path() / ".keepme" / "keep")); + CHECK(!IsFile(TmpDir.Path() / "deleteme1")); + CHECK(!IsFile(TmpDir.Path() / "deleteme2")); + CHECK(!IsFile(TmpDir.Path() / "deleteme3")); + CHECK(!IsFile(TmpDir.Path() / ".keepmenot")); + CHECK(!IsFile(TmpDir.Path() / "no.keepme")); + + CHECK(!IsDir(TmpDir.Path() / "DeleteMe")); + CHECK(!IsDir(TmpDir.Path() / "DeleteMe2")); + + CHECK(IsDir(TmpDir.Path() / "CantDeleteMe")); + CHECK(IsDir(TmpDir.Path() / "CantDeleteMe" / ".keepme")); + CHECK(IsDir(TmpDir.Path() / "CantDeleteMe2")); + CHECK(IsDir(TmpDir.Path() / "CantDeleteMe2" / ".keepme")); + CHECK(IsDir(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept")); + CHECK(IsFile(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept" / "kept1")); + CHECK(IsFile(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept" / "kept2")); + CHECK(!IsFile(TmpDir.Path() / "CantDeleteMe2" / "deleteme")); +} + +TEST_SUITE_END(); + +#endif + +} // namespace zen diff --git a/src/zenutil/include/zenutil/cloud/s3client.h b/src/zenutil/include/zenutil/cloud/s3client.h index f1f0df0e4..b0402d231 100644 --- a/src/zenutil/include/zenutil/cloud/s3client.h +++ b/src/zenutil/include/zenutil/cloud/s3client.h @@ -219,6 +219,7 @@ private: SigV4Credentials m_Credentials; Ref<ImdsCredentialProvider> m_CredentialProvider; HttpClient m_HttpClient; + bool m_Verbose = false; // Cached signing key (only changes once per day, protected by RwLock for thread safety) mutable RwLock m_SigningKeyLock; diff --git a/src/zenutil/include/zenutil/filesystemutils.h b/src/zenutil/include/zenutil/filesystemutils.h new file mode 100644 index 000000000..05defd1a8 --- /dev/null +++ b/src/zenutil/include/zenutil/filesystemutils.h @@ -0,0 +1,98 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/basicfile.h> +#include <zencore/filesystem.h> + +namespace zen { + +class CompositeBuffer; + +class BufferedOpenFile +{ +public: + static constexpr uint64_t BlockSize = 256u * 1024u; + + BufferedOpenFile(const std::filesystem::path Path, + std::atomic<uint64_t>& OpenReadCount, + std::atomic<uint64_t>& CurrentOpenFileCount, + std::atomic<uint64_t>& ReadCount, + std::atomic<uint64_t>& ReadByteCount); + ~BufferedOpenFile(); + BufferedOpenFile() = delete; + BufferedOpenFile(const BufferedOpenFile&) = delete; + BufferedOpenFile(BufferedOpenFile&&) = delete; + BufferedOpenFile& operator=(BufferedOpenFile&&) = delete; + BufferedOpenFile& operator=(const BufferedOpenFile&) = delete; + + CompositeBuffer GetRange(uint64_t Offset, uint64_t Size); + +public: + void* Handle() { return m_Source.Handle(); } + +private: + BasicFile m_Source; + const uint64_t m_SourceSize; + std::atomic<uint64_t>& m_OpenReadCount; + std::atomic<uint64_t>& m_CurrentOpenFileCount; + std::atomic<uint64_t>& m_ReadCount; + std::atomic<uint64_t>& m_ReadByteCount; + uint64_t m_CacheBlockIndex = (uint64_t)-1; + IoBuffer m_Cache; +}; + +bool IsFileWithRetry(const std::filesystem::path& Path); + +bool SetFileReadOnlyWithRetry(const std::filesystem::path& Path, bool ReadOnly); + +std::error_code RenameFileWithRetry(const std::filesystem::path& SourcePath, const std::filesystem::path& TargetPath); +std::error_code RenameDirectoryWithRetry(const std::filesystem::path& SourcePath, const std::filesystem::path& TargetPath); + +std::error_code TryRemoveFile(const std::filesystem::path& Path); + +void RemoveFileWithRetry(const std::filesystem::path& Path); + +void FastCopyFile(bool AllowFileClone, + bool UseSparseFiles, + const std::filesystem::path& SourceFilePath, + const std::filesystem::path& TargetFilePath, + uint64_t RawSize, + std::atomic<uint64_t>& WriteCount, + std::atomic<uint64_t>& WriteByteCount, + std::atomic<uint64_t>& CloneCount, + std::atomic<uint64_t>& CloneByteCount); + +struct CleanDirectoryResult +{ + uint64_t FoundCount = 0; + uint64_t DeletedCount = 0; + uint64_t DeletedByteCount = 0; + std::vector<std::pair<std::filesystem::path, std::error_code>> FailedRemovePaths; +}; + +class WorkerThreadPool; + +void GetDirectoryContent(WorkerThreadPool& WorkerPool, + const std::filesystem::path& Path, + DirectoryContentFlags Flags, + DirectoryContent& OutContent); + +CleanDirectoryResult CleanDirectory( + WorkerThreadPool& IOWorkerPool, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + const std::filesystem::path& Path, + std::span<const std::string> ExcludeDirectories, + std::function<void(const std::string_view Details, uint64_t TotalCount, uint64_t RemainingCount, bool IsPaused, bool IsAborted)>&& + ProgressFunc, + uint32_t ProgressUpdateDelayMS); + +bool CleanAndRemoveDirectory(WorkerThreadPool& WorkerPool, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + const std::filesystem::path& Directory); + +void filesystemutils_forcelink(); // internal + +} // namespace zen diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp index 516eec3a9..b282adc03 100644 --- a/src/zenutil/zenutil.cpp +++ b/src/zenutil/zenutil.cpp @@ -9,6 +9,7 @@ # include <zenutil/cloud/s3client.h> # include <zenutil/cloud/sigv4.h> # include <zenutil/config/commandlineoptions.h> +# include <zenutil/filesystemutils.h> # include <zenutil/rpcrecording.h> # include <zenutil/splitconsole/logstreamlistener.h> # include <zenutil/process/subprocessmanager.h> @@ -22,6 +23,7 @@ zenutil_forcelinktests() cache::rpcrecord_forcelink(); commandlineoptions_forcelink(); consul::consul_forcelink(); + filesystemutils_forcelink(); imdscredentials_forcelink(); logstreamlistener_forcelink(); subprocessmanager_forcelink(); |