diff options
Diffstat (limited to 'src/zenutil')
28 files changed, 2263 insertions, 454 deletions
diff --git a/src/zenutil/bufferedwritefilecache.cpp b/src/zenutil/bufferedwritefilecache.cpp new file mode 100644 index 000000000..a52850314 --- /dev/null +++ b/src/zenutil/bufferedwritefilecache.cpp @@ -0,0 +1,177 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/bufferedwritefilecache.h> + +#include <zencore/logging.h> +#include <zencore/trace.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +BufferedWriteFileCache::BufferedWriteFileCache() : m_CacheHitCount(0), m_CacheMissCount(0), m_OpenHandleCount(0), m_DroppedHandleCount(0) +{ +} + +BufferedWriteFileCache::~BufferedWriteFileCache() +{ + ZEN_TRACE_CPU("~BufferedWriteFileCache()"); + + try + { + for (TOpenHandles& OpenHandles : m_OpenFiles) + { + while (BasicFile* File = OpenHandles.Pop()) + { + std::unique_ptr<BasicFile> FileToClose(File); + m_OpenHandleCount--; + } + } + m_OpenFiles.clear(); + m_ChunkWriters.clear(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~BufferedWriteFileCache() threw exeption: {}", Ex.what()); + } +} + +std::unique_ptr<BasicFile> +BufferedWriteFileCache::Get(uint32_t FileIndex) +{ + ZEN_TRACE_CPU("BufferedWriteFileCache::Get"); + + RwLock::ExclusiveLockScope _(m_WriterLock); + if (auto It = m_ChunkWriters.find(FileIndex); It != m_ChunkWriters.end()) + { + const uint32_t HandleIndex = It->second; + TOpenHandles& OpenHandles = m_OpenFiles[HandleIndex]; + if (BasicFile* File = OpenHandles.Pop(); File != nullptr) + { + m_OpenHandleCount--; + m_CacheHitCount++; + return std::unique_ptr<BasicFile>(File); + } + } + m_CacheMissCount++; + return nullptr; +} + +void +BufferedWriteFileCache::Put(uint32_t FileIndex, std::unique_ptr<BasicFile>&& Writer) +{ + ZEN_TRACE_CPU("BufferedWriteFileCache::Put"); + + if (m_OpenHandleCount.load() >= MaxBufferedCount) + { + m_DroppedHandleCount++; + return; + } + RwLock::ExclusiveLockScope _(m_WriterLock); + if (auto It = m_ChunkWriters.find(FileIndex); It != m_ChunkWriters.end()) + { + const uint32_t HandleIndex = It->second; + TOpenHandles& OpenHandles = m_OpenFiles[HandleIndex]; + if (OpenHandles.Push(Writer.get())) + { + Writer.release(); + m_OpenHandleCount++; + } + else + { + m_DroppedHandleCount++; + } + } + else + { + const uint32_t HandleIndex = gsl::narrow<uint32_t>(m_OpenFiles.size()); + m_OpenFiles.push_back(TOpenHandles{}); + m_OpenFiles.back().Push(Writer.release()); + m_ChunkWriters.insert_or_assign(FileIndex, HandleIndex); + m_OpenHandleCount++; + } +} + +void +BufferedWriteFileCache::Close(std::span<uint32_t> FileIndexes) +{ + ZEN_TRACE_CPU("BufferedWriteFileCache::Close"); + + std::vector<std::unique_ptr<BasicFile>> FilesToClose; + FilesToClose.reserve(FileIndexes.size()); + { + RwLock::ExclusiveLockScope _(m_WriterLock); + for (uint32_t FileIndex : FileIndexes) + { + if (auto It = m_ChunkWriters.find(FileIndex); It != m_ChunkWriters.end()) + { + const uint32_t HandleIndex = It->second; + TOpenHandles& OpenHandles = m_OpenFiles[HandleIndex]; + while (BasicFile* File = OpenHandles.Pop()) + { + FilesToClose.emplace_back(std::unique_ptr<BasicFile>(File)); + m_OpenHandleCount--; + } + m_ChunkWriters.erase(It); + } + } + } + FilesToClose.clear(); +} + +BufferedWriteFileCache::Local::Local(BufferedWriteFileCache& Cache) : m_Cache(Cache) +{ +} + +BufferedWriteFileCache::Local::Writer* +BufferedWriteFileCache::Local::GetWriter(uint32_t FileIndex) +{ + if (auto It = m_FileIndexToWriterIndex.find(FileIndex); It != m_FileIndexToWriterIndex.end()) + { + return m_ChunkWriters[It->second].get(); + } + std::unique_ptr<BasicFile> File = m_Cache.Get(FileIndex); + if (File) + { + const uint32_t WriterIndex = gsl::narrow<uint32_t>(m_ChunkWriters.size()); + m_FileIndexToWriterIndex.insert_or_assign(FileIndex, WriterIndex); + m_ChunkWriters.emplace_back(std::make_unique<Writer>(Writer{.File = std::move(File)})); + return m_ChunkWriters.back().get(); + } + return nullptr; +} + +BufferedWriteFileCache::Local::Writer* +BufferedWriteFileCache::Local::PutWriter(uint32_t FileIndex, std::unique_ptr<Writer> Writer) +{ + ZEN_ASSERT(!m_FileIndexToWriterIndex.contains(FileIndex)); + const uint32_t WriterIndex = gsl::narrow<uint32_t>(m_ChunkWriters.size()); + m_FileIndexToWriterIndex.insert_or_assign(FileIndex, WriterIndex); + m_ChunkWriters.emplace_back(std::move(Writer)); + return m_ChunkWriters.back().get(); +} + +BufferedWriteFileCache::Local::~Local() +{ + ZEN_TRACE_CPU("BufferedWriteFileCache::~Local()"); + try + { + for (auto& It : m_FileIndexToWriterIndex) + { + const uint32_t FileIndex = It.first; + const uint32_t WriterIndex = It.second; + m_ChunkWriters[WriterIndex]->Writer.reset(); + std::unique_ptr<BasicFile> File; + File.swap(m_ChunkWriters[WriterIndex]->File); + m_Cache.Put(FileIndex, std::move(File)); + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("BufferedWriteFileCache::~Local() threw exeption: {}", Ex.what()); + } +} + +} // namespace zen diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp new file mode 100644 index 000000000..88238effd --- /dev/null +++ b/src/zenutil/buildstoragecache.cpp @@ -0,0 +1,407 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/buildstoragecache.h> + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/fmtutils.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> +#include <zencore/trace.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/httpclient.h> +#include <zenhttp/packageformat.h> +#include <zenutil/workerpools.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_set.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +using namespace std::literals; + +class ZenBuildStorageCache : public BuildStorageCache +{ +public: + explicit ZenBuildStorageCache(HttpClient& HttpClient, + BuildStorageCache::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount) + : m_HttpClient(HttpClient) + , m_Stats(Stats) + , m_Namespace(Namespace.empty() ? "none" : Namespace) + , m_Bucket(Bucket.empty() ? "none" : Bucket) + , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred()) + , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount) + , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background) + : GetTinyWorkerPool(EWorkloadType::Background)) + , m_PendingBackgroundWorkCount(1) + , m_CancelBackgroundWork(false) + { + } + + virtual ~ZenBuildStorageCache() + { + try + { + m_CancelBackgroundWork.store(true); + if (!IsFlushed) + { + m_PendingBackgroundWorkCount.CountDown(); + m_PendingBackgroundWorkCount.Wait(); + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~ZenBuildStorageCache() failed with: {}", Ex.what()); + } + } + + void ScheduleBackgroundWork(std::function<void()>&& Work) + { + m_PendingBackgroundWorkCount.AddCount(1); + try + { + m_BackgroundWorkPool.ScheduleWork([this, Work = std::move(Work)]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork"); + auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); }); + if (!m_CancelBackgroundWork) + { + try + { + Work(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what()); + } + } + }); + } + catch (const std::exception& Ex) + { + m_PendingBackgroundWorkCount.CountDown(); + ZEN_ERROR("Failed scheduling background upload to build cache. Reason: {}", Ex.what()); + } + } + + virtual void PutBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + ZenContentType ContentType, + const CompositeBuffer& Payload) override + { + ZEN_ASSERT(!IsFlushed); + ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); + ScheduleBackgroundWork( + [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob"); + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + HttpClient::Response CacheResponse = + m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), + Payload, + ContentType); + AddStatistic(CacheResponse); + if (!CacheResponse.IsSuccess()) + { + ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv)); + } + }); + } + + virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::GetBuildBlob"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + HttpClient::KeyValueMap Headers; + if (RangeOffset != 0 || RangeBytes != (uint64_t)-1) + { + Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", RangeOffset, RangeOffset + RangeBytes - 1)}); + } + CreateDirectories(m_TempFolderPath); + HttpClient::Response CacheResponse = + m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), + m_TempFolderPath, + Headers); + AddStatistic(CacheResponse); + if (CacheResponse.IsSuccess()) + { + return CacheResponse.ResponsePayload; + } + return {}; + } + + virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override + { + ZEN_ASSERT(!IsFlushed); + ScheduleBackgroundWork([this, + BuildId = Oid(BuildId), + BlobRawHashes = std::vector<IoHash>(BlobHashes.begin(), BlobHashes.end()), + MetaDatas = std::vector<CbObject>(MetaDatas.begin(), MetaDatas.end())]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::PutBlobMetadatas"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + const uint64_t BlobCount = BlobRawHashes.size(); + + CbPackage RequestPackage; + std::vector<CbAttachment> Attachments; + tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes; + Attachments.reserve(BlobCount); + AttachmentHashes.reserve(BlobCount); + { + CbObjectWriter RequestWriter; + RequestWriter.BeginArray("blobHashes"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) + { + RequestWriter.AddHash(BlobRawHashes[BlockHashIndex]); + } + RequestWriter.EndArray(); // blobHashes + + RequestWriter.BeginArray("metadatas"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) + { + const IoHash ObjectHash = MetaDatas[BlockHashIndex].GetHash(); + RequestWriter.AddBinaryAttachment(ObjectHash); + if (!AttachmentHashes.contains(ObjectHash)) + { + Attachments.push_back(CbAttachment(MetaDatas[BlockHashIndex], ObjectHash)); + AttachmentHashes.insert(ObjectHash); + } + } + + RequestWriter.EndArray(); // metadatas + + RequestPackage.SetObject(RequestWriter.Save()); + } + RequestPackage.AddAttachments(Attachments); + + CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage); + + HttpClient::Response CacheResponse = + m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/putBlobMetadata", m_Namespace, m_Bucket, BuildId), + RpcRequestBuffer, + ZenContentType::kCbPackage); + AddStatistic(CacheResponse); + if (!CacheResponse.IsSuccess()) + { + ZEN_DEBUG("Failed posting blob metadata to cache: {}", CacheResponse.ErrorMessage(""sv)); + } + }); + } + + virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::GetBlobMetadatas"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + CbObjectWriter Request; + + Request.BeginArray("blobHashes"sv); + for (const IoHash& BlobHash : BlobHashes) + { + Request.AddHash(BlobHash); + } + Request.EndArray(); + + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + + HttpClient::Response Response = + m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/getBlobMetadata", m_Namespace, m_Bucket, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + AddStatistic(Response); + if (Response.IsSuccess()) + { + std::vector<CbObject> Result; + + CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); + CbObject ResponseObject = ResponsePackage.GetObject(); + + CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView(); + CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView(); + Result.reserve(MetadatasArray.Num()); + auto BlobHashesIt = BlobHashes.begin(); + auto BlobHashArrayIt = begin(BlobHashArray); + auto MetadataArrayIt = begin(MetadatasArray); + while (MetadataArrayIt != end(MetadatasArray)) + { + const IoHash BlobHash = (*BlobHashArrayIt).AsHash(); + while (BlobHash != *BlobHashesIt) + { + ZEN_ASSERT(BlobHashesIt != BlobHashes.end()); + BlobHashesIt++; + } + + ZEN_ASSERT(BlobHash == *BlobHashesIt); + + const IoHash MetaHash = (*MetadataArrayIt).AsAttachment(); + const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash); + ZEN_ASSERT(MetaAttachment); + + CbObject Metadata = MetaAttachment->AsObject(); + Result.emplace_back(std::move(Metadata)); + + BlobHashArrayIt++; + MetadataArrayIt++; + BlobHashesIt++; + } + return Result; + } + return {}; + } + + virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::BlobsExists"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + CbObjectWriter Request; + + Request.BeginArray("blobHashes"sv); + for (const IoHash& BlobHash : BlobHashes) + { + Request.AddHash(BlobHash); + } + Request.EndArray(); + + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + + HttpClient::Response Response = m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/exists", m_Namespace, m_Bucket, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + AddStatistic(Response); + if (Response.IsSuccess()) + { + CbObject ResponseObject = LoadCompactBinaryObject(Response.ResponsePayload); + if (!ResponseObject) + { + throw std::runtime_error("BlobExists reponse is invalid, failed to load payload as compact binary object"); + } + CbArrayView BlobsExistsArray = ResponseObject["blobExists"sv].AsArrayView(); + if (!BlobsExistsArray) + { + throw std::runtime_error("BlobExists reponse is invalid, 'blobExists' array is missing"); + } + if (BlobsExistsArray.Num() != BlobHashes.size()) + { + throw std::runtime_error(fmt::format("BlobExists reponse is invalid, 'blobExists' array contains {} entries, expected {}", + BlobsExistsArray.Num(), + BlobHashes.size())); + } + + CbArrayView MetadatasExistsArray = ResponseObject["metadataExists"sv].AsArrayView(); + if (!MetadatasExistsArray) + { + throw std::runtime_error("BlobExists reponse is invalid, 'metadataExists' array is missing"); + } + if (MetadatasExistsArray.Num() != BlobHashes.size()) + { + throw std::runtime_error( + fmt::format("BlobExists reponse is invalid, 'metadataExists' array contains {} entries, expected {}", + MetadatasExistsArray.Num(), + BlobHashes.size())); + } + + std::vector<BlobExistsResult> Result; + Result.reserve(BlobHashes.size()); + auto BlobExistsIt = begin(BlobsExistsArray); + auto MetadataExistsIt = begin(MetadatasExistsArray); + while (BlobExistsIt != end(BlobsExistsArray)) + { + ZEN_ASSERT(MetadataExistsIt != end(MetadatasExistsArray)); + + const bool HasBody = (*BlobExistsIt).AsBool(); + const bool HasMetadata = (*MetadataExistsIt).AsBool(); + + Result.push_back({.HasBody = HasBody, .HasMetadata = HasMetadata}); + + BlobExistsIt++; + MetadataExistsIt++; + } + return Result; + } + return {}; + } + + virtual void Flush(int32_t UpdateIntervalMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override + { + if (IsFlushed) + { + return; + } + if (!IsFlushed) + { + m_PendingBackgroundWorkCount.CountDown(); + IsFlushed = true; + } + if (m_PendingBackgroundWorkCount.Wait(100)) + { + return; + } + while (true) + { + intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining(); + if (UpdateCallback(Remaining)) + { + if (m_PendingBackgroundWorkCount.Wait(UpdateIntervalMS)) + { + UpdateCallback(0); + return; + } + } + else + { + m_CancelBackgroundWork.store(true); + } + } + } + +private: + void AddStatistic(const HttpClient::Response& Result) + { + m_Stats.TotalBytesWritten += Result.UploadedBytes; + m_Stats.TotalBytesRead += Result.DownloadedBytes; + m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); + m_Stats.TotalRequestCount++; + } + + HttpClient& m_HttpClient; + BuildStorageCache::Statistics& m_Stats; + const std::string m_Namespace; + const std::string m_Bucket; + const std::filesystem::path m_TempFolderPath; + const bool m_BoostBackgroundThreadCount; + bool IsFlushed = false; + + WorkerThreadPool& m_BackgroundWorkPool; + Latch m_PendingBackgroundWorkCount; + std::atomic<bool> m_CancelBackgroundWork; +}; + +std::unique_ptr<BuildStorageCache> +CreateZenBuildStorageCache(HttpClient& HttpClient, + BuildStorageCache::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount) +{ + return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount); +} + +} // namespace zen diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index 1f951167d..46e80f6b7 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -46,7 +46,7 @@ struct RecordedRequestsWriter void BeginWrite(const std::filesystem::path& BasePath) { m_BasePath = BasePath; - std::filesystem::create_directories(m_BasePath); + CreateDirectories(m_BasePath); } void EndWrite() @@ -366,6 +366,7 @@ private: }; std::unique_ptr<std::thread> m_WriterThread; + std::atomic_bool m_IsWriterReady{false}; std::atomic_bool m_IsActive{false}; std::atomic_int64_t m_PendingRequests{0}; RwLock m_RequestQueueLock; @@ -426,7 +427,7 @@ RecordedRequestsSegmentWriter::BeginWrite(const std::filesystem::path& BasePath, m_BasePath = BasePath; m_SegmentIndex = SegmentIndex; m_RequestBaseIndex = RequestBaseIndex; - std::filesystem::create_directories(m_BasePath); + CreateDirectories(m_BasePath); } void @@ -658,6 +659,8 @@ RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath) m_IsActive = true; m_WriterThread.reset(new std::thread(&RecordedRequestsWriter::WriterThreadMain, this)); + + m_IsWriterReady.wait(false); } void @@ -707,6 +710,9 @@ RecordedRequestsWriter::WriterThreadMain() SetCurrentThreadName("rpc_writer"); EnsureCurrentSegment(); + m_IsWriterReady.store(true); + m_IsWriterReady.notify_all(); + while (m_IsActive) { m_PendingRequests.wait(0); @@ -1051,14 +1057,14 @@ public: static bool IsCompatible(const std::filesystem::path& BasePath) { - if (std::filesystem::exists(BasePath / "rpc_recording_info.zcb")) + if (IsFile(BasePath / "rpc_recording_info.zcb")) { return true; } const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0); - if (std::filesystem::exists(SegmentZero / "rpc_segment_info.zcb") && std::filesystem::exists(SegmentZero / "index.bin")) + if (IsFile(SegmentZero / "rpc_segment_info.zcb") && IsFile(SegmentZero / "index.bin")) { // top-level metadata is missing, possibly because of premature exit // on the recording side diff --git a/src/zenutil/chunkblock.cpp b/src/zenutil/chunkblock.cpp index f3c14edc4..abfc0fb63 100644 --- a/src/zenutil/chunkblock.cpp +++ b/src/zenutil/chunkblock.cpp @@ -52,7 +52,7 @@ ParseChunkBlockDescriptionList(const CbObjectView& BlocksObject) return {}; } std::vector<ChunkBlockDescription> Result; - CbArrayView Blocks = BlocksObject["blocks"].AsArrayView(); + CbArrayView Blocks = BlocksObject["blocks"sv].AsArrayView(); Result.reserve(Blocks.Num()); for (CbFieldView BlockView : Blocks) { diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp index bb1ee5183..cd1bf7dd7 100644 --- a/src/zenutil/chunkedcontent.cpp +++ b/src/zenutil/chunkedcontent.cpp @@ -11,7 +11,7 @@ #include <zenutil/chunkedfile.h> #include <zenutil/chunkingcontroller.h> -#include <zenutil/parallellwork.h> +#include <zenutil/parallelwork.h> #include <zenutil/workerpools.h> ZEN_THIRD_PARTY_INCLUDES_START @@ -140,8 +140,12 @@ namespace { { ZEN_TRACE_CPU("HashOnly"); - IoBuffer Buffer = IoBufferBuilder::MakeFromFile((FolderPath / Path).make_preferred()); - const IoHash Hash = IoHash::HashBuffer(Buffer, &Stats.BytesHashed); + IoBuffer Buffer = IoBufferBuilder::MakeFromFile((FolderPath / Path).make_preferred()); + if (Buffer.GetSize() != RawSize) + { + throw std::runtime_error(fmt::format("Failed opening file '{}' for hashing", FolderPath / Path)); + } + const IoHash Hash = IoHash::HashBuffer(Buffer, &Stats.BytesHashed); Lock.WithExclusiveLock([&]() { if (!RawHashToSequenceRawHashIndex.contains(Hash)) @@ -301,17 +305,25 @@ FolderContent::UpdateState(const FolderContent& Rhs, std::vector<uint32_t>& OutP } FolderContent -GetUpdatedContent(const FolderContent& Old, const FolderContent& New, std::vector<std::filesystem::path>& OutDeletedPathIndexes) +GetUpdatedContent(const FolderContent& Old, const FolderContent& New, std::vector<std::filesystem::path>& OutDeletedPaths) { ZEN_TRACE_CPU("FolderContent::GetUpdatedContent"); - FolderContent Result = {.Platform = Old.Platform}; + + const uint32_t NewPathCount = gsl::narrow<uint32_t>(New.Paths.size()); + + FolderContent Result = {.Platform = Old.Platform}; + Result.Paths.reserve(NewPathCount); + Result.RawSizes.reserve(NewPathCount); + Result.Attributes.reserve(NewPathCount); + Result.ModificationTicks.reserve(NewPathCount); + tsl::robin_map<std::string, uint32_t> NewPathToIndex; - const uint32_t NewPathCount = gsl::narrow<uint32_t>(New.Paths.size()); NewPathToIndex.reserve(NewPathCount); for (uint32_t NewPathIndex = 0; NewPathIndex < NewPathCount; NewPathIndex++) { NewPathToIndex.insert({New.Paths[NewPathIndex].generic_string(), NewPathIndex}); } + uint32_t OldPathCount = gsl::narrow<uint32_t>(Old.Paths.size()); for (uint32_t OldPathIndex = 0; OldPathIndex < OldPathCount; OldPathIndex++) { @@ -330,7 +342,7 @@ GetUpdatedContent(const FolderContent& Old, const FolderContent& New, std::vecto } else { - OutDeletedPathIndexes.push_back(Old.Paths[OldPathIndex]); + OutDeletedPaths.push_back(Old.Paths[OldPathIndex]); } } return Result; @@ -366,7 +378,7 @@ GetFolderContent(GetFolderContentStatistics& Stats, std::function<bool(const std::string_view& RelativePath)>&& AcceptDirectory, std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& AcceptFile, WorkerThreadPool& WorkerPool, - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, std::atomic<bool>& AbortFlag) { @@ -455,7 +467,7 @@ GetFolderContent(GetFolderContentStatistics& Stats, WorkerPool, PendingWork); PendingWork.CountDown(); - while (!PendingWork.Wait(UpdateInteralMS)) + while (!PendingWork.Wait(UpdateIntervalMS)) { UpdateCallback(AbortFlag.load(), PendingWork.Remaining()); } @@ -650,7 +662,9 @@ MergeChunkedFolderContents(const ChunkedFolderContent& Base, std::span<const Chu } ChunkedFolderContent -DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span<const std::filesystem::path> DeletedPaths) +DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, + const ChunkedContentLookup& BaseContentLookup, + std::span<const std::filesystem::path> DeletedPaths) { ZEN_TRACE_CPU("DeletePathsFromChunkedContent"); @@ -664,8 +678,18 @@ DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span { DeletedPathSet.insert(PathCompareString(DeletedPath)); } - const ChunkedContentLookup BaseLookup = BuildChunkedContentLookup(BaseContent); - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex; + + const size_t BaseChunkCount = BaseContent.ChunkedContent.ChunkHashes.size(); + std::vector<uint32_t> NewChunkIndexes(BaseChunkCount, (uint32_t)-1); + + const size_t ExpectedPathCount = BaseContent.Paths.size() - DeletedPaths.size(); + Result.Paths.reserve(ExpectedPathCount); + Result.RawSizes.reserve(ExpectedPathCount); + Result.Attributes.reserve(ExpectedPathCount); + Result.RawHashes.reserve(ExpectedPathCount); + + Result.ChunkedContent.ChunkHashes.reserve(BaseChunkCount); + Result.ChunkedContent.ChunkRawSizes.reserve(BaseChunkCount); tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceRawHashIndex; for (uint32_t PathIndex = 0; PathIndex < BaseContent.Paths.size(); PathIndex++) @@ -685,20 +709,33 @@ DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span { RawHashToSequenceRawHashIndex.insert( {RawHash, gsl::narrow<uint32_t>(Result.ChunkedContent.SequenceRawHashes.size())}); - const uint32_t SequenceRawHashIndex = BaseLookup.RawHashToSequenceIndex.at(RawHash); - const uint32_t OrderIndexOffset = BaseLookup.SequenceIndexChunkOrderOffset[SequenceRawHashIndex]; - const uint32_t ChunkCount = BaseContent.ChunkedContent.ChunkCounts[SequenceRawHashIndex]; - ChunkingStatistics Stats; + const uint32_t SequenceRawHashIndex = BaseContentLookup.RawHashToSequenceIndex.at(RawHash); + const uint32_t OrderIndexOffset = BaseContentLookup.SequenceIndexChunkOrderOffset[SequenceRawHashIndex]; + const uint32_t ChunkCount = BaseContent.ChunkedContent.ChunkCounts[SequenceRawHashIndex]; + std::span<const uint32_t> OriginalChunkOrder = std::span<const uint32_t>(BaseContent.ChunkedContent.ChunkOrders).subspan(OrderIndexOffset, ChunkCount); - AddChunkSequence(Stats, - Result.ChunkedContent, - ChunkHashToChunkIndex, - RawHash, - OriginalChunkOrder, - BaseContent.ChunkedContent.ChunkHashes, - BaseContent.ChunkedContent.ChunkRawSizes); - Stats.UniqueSequencesFound++; + + Result.ChunkedContent.ChunkCounts.push_back(gsl::narrow<uint32_t>(OriginalChunkOrder.size())); + + for (uint32_t OldChunkIndex : OriginalChunkOrder) + { + if (uint32_t FoundChunkIndex = NewChunkIndexes[OldChunkIndex]; FoundChunkIndex != (uint32_t)-1) + { + Result.ChunkedContent.ChunkOrders.push_back(FoundChunkIndex); + } + else + { + const uint32_t NewChunkIndex = gsl::narrow<uint32_t>(Result.ChunkedContent.ChunkHashes.size()); + NewChunkIndexes[OldChunkIndex] = NewChunkIndex; + const IoHash& ChunkHash = BaseContent.ChunkedContent.ChunkHashes[OldChunkIndex]; + const uint64_t OldChunkSize = BaseContent.ChunkedContent.ChunkRawSizes[OldChunkIndex]; + Result.ChunkedContent.ChunkHashes.push_back(ChunkHash); + Result.ChunkedContent.ChunkRawSizes.push_back(OldChunkSize); + Result.ChunkedContent.ChunkOrders.push_back(NewChunkIndex); + } + } + Result.ChunkedContent.SequenceRawHashes.push_back(RawHash); } } } @@ -708,14 +745,28 @@ DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span } ChunkedFolderContent -ChunkFolderContent(ChunkingStatistics& Stats, - WorkerThreadPool& WorkerPool, - const std::filesystem::path& RootPath, - const FolderContent& Content, - const ChunkingController& InChunkingController, - int32_t UpdateInteralMS, - std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, - std::atomic<bool>& AbortFlag) +DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span<const std::filesystem::path> DeletedPaths) +{ + ZEN_TRACE_CPU("DeletePathsFromChunkedContent"); + ZEN_ASSERT(DeletedPaths.size() <= BaseContent.Paths.size()); + if (DeletedPaths.size() == BaseContent.Paths.size()) + { + return {}; + } + const ChunkedContentLookup BaseLookup = BuildChunkedContentLookup(BaseContent); + return DeletePathsFromChunkedContent(BaseContent, BaseLookup, DeletedPaths); +} + +ChunkedFolderContent +ChunkFolderContent(ChunkingStatistics& Stats, + WorkerThreadPool& WorkerPool, + const std::filesystem::path& RootPath, + const FolderContent& Content, + const ChunkingController& InChunkingController, + int32_t UpdateIntervalMS, + std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)>&& UpdateCallback, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag) { ZEN_TRACE_CPU("ChunkFolderContent"); @@ -754,7 +805,7 @@ ChunkFolderContent(ChunkingStatistics& Stats, RwLock Lock; - ParallellWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t PathIndex : Order) { @@ -762,31 +813,28 @@ ChunkFolderContent(ChunkingStatistics& Stats, { break; } - Work.ScheduleWork( - WorkerPool, // GetSyncWorkerPool() - [&, PathIndex](std::atomic<bool>& AbortFlag) { - if (!AbortFlag) - { - IoHash RawHash = HashOneFile(Stats, - InChunkingController, - Result, - ChunkHashToChunkIndex, - RawHashToSequenceRawHashIndex, - Lock, - RootPath, - PathIndex, - AbortFlag); - Lock.WithExclusiveLock([&]() { Result.RawHashes[PathIndex] = RawHash; }); - Stats.FilesProcessed++; - } - }, - Work.DefaultErrorFunction()); - } - - Work.Wait(UpdateInteralMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted); + Work.ScheduleWork(WorkerPool, // GetSyncWorkerPool() + [&, PathIndex](std::atomic<bool>& AbortFlag) { + if (!AbortFlag) + { + IoHash RawHash = HashOneFile(Stats, + InChunkingController, + Result, + ChunkHashToChunkIndex, + RawHashToSequenceRawHashIndex, + Lock, + RootPath, + PathIndex, + AbortFlag); + Lock.WithExclusiveLock([&]() { Result.RawHashes[PathIndex] = RawHash; }); + Stats.FilesProcessed++; + } + }); + } + + Work.Wait(UpdateIntervalMS, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); - UpdateCallback(Work.IsAborted(), Work.PendingWork().Remaining()); + UpdateCallback(IsAborted, IsPaused, Work.PendingWork().Remaining()); }); } return Result; @@ -799,8 +847,9 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content) struct ChunkLocationReference { - uint32_t ChunkIndex = (uint32_t)-1; - ChunkedContentLookup::ChunkSequenceLocation Location; + uint32_t ChunkIndex = (uint32_t)-1; + uint32_t SequenceIndex = (uint32_t)-1; + uint64_t Offset = (uint64_t)-1; }; ChunkedContentLookup Result; @@ -829,8 +878,7 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content) { uint32_t ChunkIndex = Content.ChunkedContent.ChunkOrders[OrderIndex]; - Locations.push_back( - ChunkLocationReference{ChunkIndex, ChunkedContentLookup::ChunkSequenceLocation{SequenceIndex, LocationOffset}}); + Locations.push_back(ChunkLocationReference{.ChunkIndex = ChunkIndex, .SequenceIndex = SequenceIndex, .Offset = LocationOffset}); LocationOffset += Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; } @@ -845,15 +893,15 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content) { return false; } - if (Lhs.Location.SequenceIndex < Rhs.Location.SequenceIndex) + if (Lhs.SequenceIndex < Rhs.SequenceIndex) { return true; } - if (Lhs.Location.SequenceIndex > Rhs.Location.SequenceIndex) + if (Lhs.SequenceIndex > Rhs.SequenceIndex) { return false; } - return Lhs.Location.Offset < Rhs.Location.Offset; + return Lhs.Offset < Rhs.Offset; }); Result.ChunkSequenceLocations.reserve(Locations.size()); @@ -866,7 +914,10 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content) uint32_t Count = 0; while ((RangeOffset + Count < Locations.size()) && (Locations[RangeOffset + Count].ChunkIndex == ChunkIndex)) { - Result.ChunkSequenceLocations.push_back(Locations[RangeOffset + Count].Location); + const ChunkLocationReference& LocationReference = Locations[RangeOffset + Count]; + Result.ChunkSequenceLocations.push_back( + ChunkedContentLookup::ChunkSequenceLocation{.SequenceIndex = LocationReference.SequenceIndex, + .Offset = LocationReference.Offset}); Count++; } Result.ChunkSequenceLocationOffset.push_back(RangeOffset); @@ -875,8 +926,12 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content) } Result.SequenceIndexFirstPathIndex.resize(Content.ChunkedContent.SequenceRawHashes.size(), (uint32_t)-1); + Result.PathExtensionHash.resize(Content.Paths.size()); for (uint32_t PathIndex = 0; PathIndex < Content.Paths.size(); PathIndex++) { + std::string LowercaseExtension = Content.Paths[PathIndex].extension().string(); + std::transform(LowercaseExtension.begin(), LowercaseExtension.end(), LowercaseExtension.begin(), ::tolower); + Result.PathExtensionHash[PathIndex] = HashStringDjb2(LowercaseExtension); if (Content.RawSizes[PathIndex] > 0) { const IoHash& RawHash = Content.RawHashes[PathIndex]; diff --git a/src/zenutil/chunkingcontroller.cpp b/src/zenutil/chunkingcontroller.cpp index 2a7057a46..6fb4182c0 100644 --- a/src/zenutil/chunkingcontroller.cpp +++ b/src/zenutil/chunkingcontroller.cpp @@ -4,6 +4,7 @@ #include <zencore/basicfile.h> #include <zencore/compactbinarybuilder.h> +#include <zencore/filesystem.h> #include <zencore/trace.h> ZEN_THIRD_PARTY_INCLUDES_START @@ -35,26 +36,54 @@ namespace { return ChunkedParams{.UseThreshold = UseThreshold, .MinSize = MinSize, .MaxSize = MaxSize, .AvgSize = AvgSize}; } -} // namespace + void WriteChunkParams(CbObjectWriter& Writer, const ChunkedParams& Params) + { + Writer.BeginObject("ChunkingParams"sv); + { + Writer.AddBool("UseThreshold"sv, Params.UseThreshold); -class BasicChunkingController : public ChunkingController -{ -public: - BasicChunkingController(std::span<const std::string_view> ExcludeExtensions, - uint64_t ChunkFileSizeLimit, - const ChunkedParams& ChunkingParams) - : m_ChunkExcludeExtensions(ExcludeExtensions.begin(), ExcludeExtensions.end()) - , m_ChunkFileSizeLimit(ChunkFileSizeLimit) - , m_ChunkingParams(ChunkingParams) + Writer.AddInteger("MinSize"sv, (uint64_t)Params.MinSize); + Writer.AddInteger("MaxSize"sv, (uint64_t)Params.MaxSize); + Writer.AddInteger("AvgSize"sv, (uint64_t)Params.AvgSize); + } + Writer.EndObject(); // ChunkingParams + } + + bool IsElfFile(BasicFile& Buffer) { + if (Buffer.FileSize() > 4) + { + uint32_t ElfCheck = 0; + Buffer.Read(&ElfCheck, 4, 0); + if (ElfCheck == 0x464c457f) + { + return true; + } + } + return false; } - BasicChunkingController(CbObjectView Parameters) - : m_ChunkExcludeExtensions(ReadStringArray(Parameters["ChunkExcludeExtensions"sv].AsArrayView())) - , m_ChunkFileSizeLimit(Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit)) - , m_ChunkingParams(ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView())) + bool IsMachOFile(BasicFile& Buffer) { + if (Buffer.FileSize() > 4) + { + uint32_t MachOCheck = 0; + Buffer.Read(&MachOCheck, 4, 0); + if ((MachOCheck == 0xfeedface) || (MachOCheck == 0xcefaedfe)) + { + return true; + } + } + return false; } +} // namespace + +class BasicChunkingController : public ChunkingController +{ +public: + BasicChunkingController(const BasicChunkingControllerSettings& Settings) : m_Settings(Settings) {} + + BasicChunkingController(CbObjectView Parameters) : m_Settings(ReadSettings(Parameters)) {} virtual bool ProcessFile(const std::filesystem::path& InputPath, uint64_t RawSize, @@ -64,16 +93,25 @@ public: { ZEN_TRACE_CPU("BasicChunkingController::ProcessFile"); const bool ExcludeFromChunking = - std::find(m_ChunkExcludeExtensions.begin(), m_ChunkExcludeExtensions.end(), InputPath.extension()) != - m_ChunkExcludeExtensions.end(); + std::find(m_Settings.ExcludeExtensions.begin(), m_Settings.ExcludeExtensions.end(), InputPath.extension()) != + m_Settings.ExcludeExtensions.end(); - if (ExcludeFromChunking || (RawSize < m_ChunkFileSizeLimit)) + if (ExcludeFromChunking || (RawSize < m_Settings.ChunkFileSizeLimit)) { return false; } BasicFile Buffer(InputPath, BasicFile::Mode::kRead); - OutChunked = ChunkData(Buffer, 0, RawSize, m_ChunkingParams, &BytesProcessed, &AbortFlag); + if (m_Settings.ExcludeElfFiles && IsElfFile(Buffer)) + { + return false; + } + if (m_Settings.ExcludeMachOFiles && IsMachOFile(Buffer)) + { + return false; + } + + OutChunked = ChunkData(Buffer, 0, RawSize, m_Settings.ChunkingParams, &BytesProcessed, &AbortFlag); return true; } @@ -84,53 +122,43 @@ public: CbObjectWriter Writer; Writer.BeginArray("ChunkExcludeExtensions"sv); { - for (const std::string& Extension : m_ChunkExcludeExtensions) + for (const std::string& Extension : m_Settings.ExcludeExtensions) { Writer.AddString(Extension); } } Writer.EndArray(); // ChunkExcludeExtensions - Writer.AddInteger("ChunkFileSizeLimit"sv, m_ChunkFileSizeLimit); - Writer.BeginObject("ChunkingParams"sv); - { - Writer.AddBool("UseThreshold"sv, m_ChunkingParams.UseThreshold); - Writer.AddInteger("MinSize"sv, (uint64_t)m_ChunkingParams.MinSize); - Writer.AddInteger("MaxSize"sv, (uint64_t)m_ChunkingParams.MaxSize); - Writer.AddInteger("AvgSize"sv, (uint64_t)m_ChunkingParams.AvgSize); - } - Writer.EndObject(); // ChunkingParams + Writer.AddBool("ExcludeElfFiles"sv, m_Settings.ExcludeElfFiles); + Writer.AddBool("ExcludeMachOFiles"sv, m_Settings.ExcludeMachOFiles); + Writer.AddInteger("ChunkFileSizeLimit"sv, m_Settings.ChunkFileSizeLimit); + + WriteChunkParams(Writer, m_Settings.ChunkingParams); + return Writer.Save(); } static constexpr std::string_view Name = "BasicChunkingController"sv; -protected: - const std::vector<std::string> m_ChunkExcludeExtensions; - const uint64_t m_ChunkFileSizeLimit; - const ChunkedParams m_ChunkingParams; +private: + static BasicChunkingControllerSettings ReadSettings(CbObjectView Parameters) + { + return BasicChunkingControllerSettings{ + .ExcludeExtensions = ReadStringArray(Parameters["ChunkExcludeExtensions"sv].AsArrayView()), + .ExcludeElfFiles = Parameters["ExcludeElfFiles"sv].AsBool(DefaultChunkingExcludeElfFiles), + .ExcludeMachOFiles = Parameters["ExcludeMachOFiles"sv].AsBool(DefaultChunkingExcludeMachOFiles), + .ChunkFileSizeLimit = Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit), + .ChunkingParams = ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView())}; + } + + const BasicChunkingControllerSettings m_Settings; }; class ChunkingControllerWithFixedChunking : public ChunkingController { public: - ChunkingControllerWithFixedChunking(std::span<const std::string_view> FixedChunkingExtensions, - uint64_t ChunkFileSizeLimit, - const ChunkedParams& ChunkingParams, - uint32_t FixedChunkingChunkSize) - : m_FixedChunkingExtensions(FixedChunkingExtensions.begin(), FixedChunkingExtensions.end()) - , m_ChunkFileSizeLimit(ChunkFileSizeLimit) - , m_ChunkingParams(ChunkingParams) - , m_FixedChunkingChunkSize(FixedChunkingChunkSize) - { - } + ChunkingControllerWithFixedChunking(const ChunkingControllerWithFixedChunkingSettings& Settings) : m_Settings(Settings) {} - ChunkingControllerWithFixedChunking(CbObjectView Parameters) - : m_FixedChunkingExtensions(ReadStringArray(Parameters["FixedChunkingExtensions"sv].AsArrayView())) - , m_ChunkFileSizeLimit(Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit)) - , m_ChunkingParams(ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView())) - , m_FixedChunkingChunkSize(Parameters["FixedChunkingChunkSize"sv].AsUInt32(16u * 1024u * 1024u)) - { - } + ChunkingControllerWithFixedChunking(CbObjectView Parameters) : m_Settings(ReadSettings(Parameters)) {} virtual bool ProcessFile(const std::filesystem::path& InputPath, uint64_t RawSize, @@ -139,33 +167,71 @@ public: std::atomic<bool>& AbortFlag) const override { ZEN_TRACE_CPU("ChunkingControllerWithFixedChunking::ProcessFile"); - if (RawSize < m_ChunkFileSizeLimit) + const bool ExcludeFromChunking = + std::find(m_Settings.ExcludeExtensions.begin(), m_Settings.ExcludeExtensions.end(), InputPath.extension()) != + m_Settings.ExcludeExtensions.end(); + + if (ExcludeFromChunking || (RawSize < m_Settings.ChunkFileSizeLimit)) { return false; } - const bool FixedChunking = std::find(m_FixedChunkingExtensions.begin(), m_FixedChunkingExtensions.end(), InputPath.extension()) != - m_FixedChunkingExtensions.end(); - if (FixedChunking) + const bool FixedChunkingExtension = + std::find(m_Settings.FixedChunkingExtensions.begin(), m_Settings.FixedChunkingExtensions.end(), InputPath.extension()) != + m_Settings.FixedChunkingExtensions.end(); + + if (FixedChunkingExtension) { + if (RawSize < m_Settings.MinSizeForFixedChunking) + { + return false; + } ZEN_TRACE_CPU("FixedChunking"); - IoHashStream FullHash; - IoBuffer Source = IoBufferBuilder::MakeFromFile(InputPath); + IoHashStream FullHasher; + BasicFile Source(InputPath, BasicFile::Mode::kRead); uint64_t Offset = 0; tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex; - ChunkHashToChunkIndex.reserve(1 + (RawSize / m_FixedChunkingChunkSize)); + const uint64_t ExpectedChunkCount = 1 + (RawSize / m_Settings.FixedChunkingChunkSize); + ChunkHashToChunkIndex.reserve(ExpectedChunkCount); + OutChunked.Info.ChunkHashes.reserve(ExpectedChunkCount); + OutChunked.Info.ChunkSequence.reserve(ExpectedChunkCount); + OutChunked.ChunkSources.reserve(ExpectedChunkCount); + + static const uint64_t BufferingSize = 256u * 1024u; + + IoHashStream ChunkHasher; + while (Offset < RawSize) { if (AbortFlag) { return false; } - uint64_t ChunkSize = std::min<uint64_t>(RawSize - Offset, m_FixedChunkingChunkSize); - IoBuffer Chunk(Source, Offset, ChunkSize); - MemoryView ChunkData = Chunk.GetView(); - FullHash.Append(ChunkData); - IoHash ChunkHash = IoHash::HashBuffer(ChunkData); + ChunkHasher.Reset(); + + uint64_t ChunkSize = std::min<uint64_t>(RawSize - Offset, m_Settings.FixedChunkingChunkSize); + if (ChunkSize >= (BufferingSize + BufferingSize / 2)) + { + ScanFile(Source.Handle(), + Offset, + ChunkSize, + BufferingSize, + [&FullHasher, &ChunkHasher, &BytesProcessed](const void* Data, size_t Size) { + FullHasher.Append(Data, Size); + ChunkHasher.Append(Data, Size); + BytesProcessed.fetch_add(Size); + }); + } + else + { + IoBuffer ChunkData = Source.ReadRange(Offset, ChunkSize); + FullHasher.Append(ChunkData); + ChunkHasher.Append(ChunkData); + BytesProcessed.fetch_add(ChunkSize); + } + + const IoHash ChunkHash = ChunkHasher.GetHash(); if (auto It = ChunkHashToChunkIndex.find(ChunkHash); It != ChunkHashToChunkIndex.end()) { OutChunked.Info.ChunkSequence.push_back(It->second); @@ -178,16 +244,24 @@ public: OutChunked.ChunkSources.push_back({.Offset = Offset, .Size = gsl::narrow<uint32_t>(ChunkSize)}); } Offset += ChunkSize; - BytesProcessed.fetch_add(ChunkSize); } OutChunked.Info.RawSize = RawSize; - OutChunked.Info.RawHash = FullHash.GetHash(); + OutChunked.Info.RawHash = FullHasher.GetHash(); return true; } else { BasicFile Buffer(InputPath, BasicFile::Mode::kRead); - OutChunked = ChunkData(Buffer, 0, RawSize, m_ChunkingParams, &BytesProcessed); + if (m_Settings.ExcludeElfFiles && IsElfFile(Buffer)) + { + return false; + } + if (m_Settings.ExcludeMachOFiles && IsMachOFile(Buffer)) + { + return false; + } + + OutChunked = ChunkData(Buffer, 0, RawSize, m_Settings.ChunkingParams, &BytesProcessed, &AbortFlag); return true; } } @@ -199,41 +273,57 @@ public: CbObjectWriter Writer; Writer.BeginArray("FixedChunkingExtensions"); { - for (const std::string& Extension : m_FixedChunkingExtensions) + for (const std::string& Extension : m_Settings.FixedChunkingExtensions) { Writer.AddString(Extension); } } Writer.EndArray(); // ChunkExcludeExtensions - Writer.AddInteger("ChunkFileSizeLimit"sv, m_ChunkFileSizeLimit); - Writer.BeginObject("ChunkingParams"sv); - { - Writer.AddBool("UseThreshold"sv, m_ChunkingParams.UseThreshold); - Writer.AddInteger("MinSize"sv, (uint64_t)m_ChunkingParams.MinSize); - Writer.AddInteger("MaxSize"sv, (uint64_t)m_ChunkingParams.MaxSize); - Writer.AddInteger("AvgSize"sv, (uint64_t)m_ChunkingParams.AvgSize); + Writer.BeginArray("ChunkExcludeExtensions"sv); + { + for (const std::string& Extension : m_Settings.ExcludeExtensions) + { + Writer.AddString(Extension); + } } - Writer.EndObject(); // ChunkingParams - Writer.AddInteger("FixedChunkingChunkSize"sv, m_FixedChunkingChunkSize); + Writer.EndArray(); // ChunkExcludeExtensions + + Writer.AddBool("ExcludeElfFiles"sv, m_Settings.ExcludeElfFiles); + Writer.AddBool("ExcludeMachOFiles"sv, m_Settings.ExcludeMachOFiles); + + Writer.AddInteger("ChunkFileSizeLimit"sv, m_Settings.ChunkFileSizeLimit); + + WriteChunkParams(Writer, m_Settings.ChunkingParams); + + Writer.AddInteger("FixedChunkingChunkSize"sv, m_Settings.FixedChunkingChunkSize); + Writer.AddInteger("MinSizeForFixedChunking"sv, m_Settings.MinSizeForFixedChunking); return Writer.Save(); } static constexpr std::string_view Name = "ChunkingControllerWithFixedChunking"sv; -protected: - const std::vector<std::string> m_FixedChunkingExtensions; - const uint64_t m_ChunkFileSizeLimit; - const ChunkedParams m_ChunkingParams; - const uint32_t m_FixedChunkingChunkSize; +private: + static ChunkingControllerWithFixedChunkingSettings ReadSettings(CbObjectView Parameters) + { + return ChunkingControllerWithFixedChunkingSettings{ + .FixedChunkingExtensions = ReadStringArray(Parameters["FixedChunkingExtensions"sv].AsArrayView()), + .ExcludeExtensions = ReadStringArray(Parameters["ChunkExcludeExtensions"sv].AsArrayView()), + .ExcludeElfFiles = Parameters["ExcludeElfFiles"sv].AsBool(DefaultChunkingExcludeElfFiles), + .ExcludeMachOFiles = Parameters["ExcludeMachOFiles"sv].AsBool(DefaultChunkingExcludeMachOFiles), + .ChunkFileSizeLimit = Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit), + .ChunkingParams = ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView()), + .FixedChunkingChunkSize = Parameters["FixedChunkingChunkSize"sv].AsUInt64(DefaultFixedChunkingChunkSize), + .MinSizeForFixedChunking = Parameters["MinSizeForFixedChunking"sv].AsUInt64(DefaultFixedChunkingChunkSize)}; + } + + const ChunkingControllerWithFixedChunkingSettings m_Settings; }; std::unique_ptr<ChunkingController> -CreateBasicChunkingController(std::span<const std::string_view> ExcludeExtensions, - uint64_t ChunkFileSizeLimit, - const ChunkedParams& ChunkingParams) +CreateBasicChunkingController(const BasicChunkingControllerSettings& Settings) { - return std::make_unique<BasicChunkingController>(ExcludeExtensions, ChunkFileSizeLimit, ChunkingParams); + return std::make_unique<BasicChunkingController>(Settings); } std::unique_ptr<ChunkingController> CreateBasicChunkingController(CbObjectView Parameters) @@ -242,15 +332,9 @@ CreateBasicChunkingController(CbObjectView Parameters) } std::unique_ptr<ChunkingController> -CreateChunkingControllerWithFixedChunking(std::span<const std::string_view> FixedChunkingExtensions, - uint64_t ChunkFileSizeLimit, - const ChunkedParams& ChunkingParams, - uint32_t FixedChunkingChunkSize) +CreateChunkingControllerWithFixedChunking(const ChunkingControllerWithFixedChunkingSettings& Setting) { - return std::make_unique<ChunkingControllerWithFixedChunking>(FixedChunkingExtensions, - ChunkFileSizeLimit, - ChunkingParams, - FixedChunkingChunkSize); + return std::make_unique<ChunkingControllerWithFixedChunking>(Setting); } std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(CbObjectView Parameters) diff --git a/src/zenutil/commandlineoptions.cpp b/src/zenutil/commandlineoptions.cpp new file mode 100644 index 000000000..afef7f6f2 --- /dev/null +++ b/src/zenutil/commandlineoptions.cpp @@ -0,0 +1,221 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/commandlineoptions.h> + +#include <filesystem> +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +#endif // ZEN_WITH_TESTS + +void +cxxopts::values::parse_value(const std::string& text, std::filesystem::path& value) +{ + value = zen::StringToPath(text); +} + +namespace zen { + +std::vector<std::string> +ParseCommandLine(std::string_view CommandLine) +{ + auto IsWhitespaceOrEnd = [](std::string_view CommandLine, std::string::size_type Pos) { + if (Pos == CommandLine.length()) + { + return true; + } + if (CommandLine[Pos] == ' ') + { + return true; + } + return false; + }; + + bool IsParsingArg = false; + bool IsInQuote = false; + + std::string::size_type Pos = 0; + std::string::size_type ArgStart = 0; + std::vector<std::string> Args; + while (Pos < CommandLine.length()) + { + if (IsInQuote) + { + if (CommandLine[Pos] == '"' && IsWhitespaceOrEnd(CommandLine, Pos + 1)) + { + Args.push_back(std::string(CommandLine.substr(ArgStart, Pos - ArgStart + 1))); + Pos++; + IsInQuote = false; + IsParsingArg = false; + } + else + { + Pos++; + } + } + else if (IsParsingArg) + { + ZEN_ASSERT(Pos > ArgStart); + if (CommandLine[Pos] == ' ') + { + Args.push_back(std::string(CommandLine.substr(ArgStart, Pos - ArgStart))); + Pos++; + IsParsingArg = false; + } + else if (CommandLine[Pos] == '"') + { + IsInQuote = true; + Pos++; + } + else + { + Pos++; + } + } + else if (CommandLine[Pos] == '"') + { + IsInQuote = true; + IsParsingArg = true; + ArgStart = Pos; + Pos++; + } + else if (CommandLine[Pos] != ' ') + { + IsParsingArg = true; + ArgStart = Pos; + Pos++; + } + else + { + Pos++; + } + } + if (IsParsingArg) + { + ZEN_ASSERT(Pos > ArgStart); + Args.push_back(std::string(CommandLine.substr(ArgStart))); + } + + return Args; +} + +std::vector<char*> +StripCommandlineQuotes(std::vector<std::string>& InOutArgs) +{ + std::vector<char*> RawArgs; + RawArgs.reserve(InOutArgs.size()); + for (std::string& Arg : InOutArgs) + { + std::string::size_type EscapedQuotePos = Arg.find("\\\"", 1); + while (EscapedQuotePos != std::string::npos && Arg.rfind('\"', EscapedQuotePos - 1) != std::string::npos) + { + Arg.erase(EscapedQuotePos, 1); + EscapedQuotePos = Arg.find("\\\"", EscapedQuotePos); + } + + if (Arg.starts_with("\"")) + { + if (Arg.find('"', 1) == Arg.length() - 1) + { + if (Arg.find(' ', 1) == std::string::npos) + { + Arg = Arg.substr(1, Arg.length() - 2); + } + } + } + RawArgs.push_back(const_cast<char*>(Arg.c_str())); + } + return RawArgs; +} + +void +MakeSafeAbsolutePathÍnPlace(std::filesystem::path& Path) +{ + if (!Path.empty()) + { + std::filesystem::path AbsolutePath = std::filesystem::absolute(Path).make_preferred(); +#if ZEN_PLATFORM_WINDOWS + const std::string_view Prefix = "\\\\?\\"; + const std::u8string PrefixU8(Prefix.begin(), Prefix.end()); + std::u8string PathString = AbsolutePath.u8string(); + if (!PathString.empty() && !PathString.starts_with(PrefixU8)) + { + PathString.insert(0, PrefixU8); + Path = PathString; + } +#endif // ZEN_PLATFORM_WINDOWS + } +} + +std::filesystem::path +MakeSafeAbsolutePath(const std::filesystem::path& Path) +{ + std::filesystem::path Tmp(Path); + MakeSafeAbsolutePathÍnPlace(Tmp); + return Tmp; +} + +std::filesystem::path +StringToPath(const std::string_view& Path) +{ + std::string_view UnquotedPath = RemoveQuotes(Path); + + if (UnquotedPath.ends_with('/') || UnquotedPath.ends_with('\\') || UnquotedPath.ends_with(std::filesystem::path::preferred_separator)) + { + UnquotedPath = UnquotedPath.substr(0, UnquotedPath.length() - 1); + } + + return std::filesystem::path(UnquotedPath).make_preferred(); +} + +std::string_view +RemoveQuotes(const std::string_view& Arg) +{ + if (Arg.length() > 2) + { + if (Arg[0] == '"' && Arg[Arg.length() - 1] == '"') + { + return Arg.substr(1, Arg.length() - 2); + } + } + return Arg; +} + +#if ZEN_WITH_TESTS + +void +commandlineoptions_forcelink() +{ +} + +TEST_CASE("CommandLine") +{ + std::vector<std::string> v1 = ParseCommandLine("c:\\my\\exe.exe \"quoted arg\" \"one\",two,\"three\\\""); + CHECK_EQ(v1[0], "c:\\my\\exe.exe"); + CHECK_EQ(v1[1], "\"quoted arg\""); + CHECK_EQ(v1[2], "\"one\",two,\"three\\\""); + + std::vector<std::string> v2 = ParseCommandLine( + "--tracehost 127.0.0.1 builds download --url=https://jupiter.devtools.epicgames.com --namespace=ue.oplog " + "--bucket=citysample.packaged-build.fortnite-main.windows \"c:\\just\\a\\path\" " + "--access-token-path=\"C:\\Users\\dan.engelbrecht\\jupiter-token.json\" \"D:\\Dev\\Spaced Folder\\Target\\\" " + "--alt-path=\"D:\\Dev\\Spaced Folder2\\Target\\\" 07dn23ifiwesnvoasjncasab --build-part-name win64,linux,ps5"); + + std::vector<char*> v2Stripped = StripCommandlineQuotes(v2); + CHECK_EQ(v2Stripped[0], std::string("--tracehost")); + CHECK_EQ(v2Stripped[1], std::string("127.0.0.1")); + CHECK_EQ(v2Stripped[2], std::string("builds")); + CHECK_EQ(v2Stripped[3], std::string("download")); + CHECK_EQ(v2Stripped[4], std::string("--url=https://jupiter.devtools.epicgames.com")); + CHECK_EQ(v2Stripped[5], std::string("--namespace=ue.oplog")); + CHECK_EQ(v2Stripped[6], std::string("--bucket=citysample.packaged-build.fortnite-main.windows")); + CHECK_EQ(v2Stripped[7], std::string("c:\\just\\a\\path")); + CHECK_EQ(v2Stripped[8], std::string("--access-token-path=\"C:\\Users\\dan.engelbrecht\\jupiter-token.json\"")); + CHECK_EQ(v2Stripped[9], std::string("\"D:\\Dev\\Spaced Folder\\Target\"")); + CHECK_EQ(v2Stripped[10], std::string("--alt-path=\"D:\\Dev\\Spaced Folder2\\Target\"")); + CHECK_EQ(v2Stripped[11], std::string("07dn23ifiwesnvoasjncasab")); + CHECK_EQ(v2Stripped[12], std::string("--build-part-name")); + CHECK_EQ(v2Stripped[13], std::string("win64,linux,ps5")); +} + +#endif +} // namespace zen diff --git a/src/zenutil/environmentoptions.cpp b/src/zenutil/environmentoptions.cpp new file mode 100644 index 000000000..1b7ce8029 --- /dev/null +++ b/src/zenutil/environmentoptions.cpp @@ -0,0 +1,84 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/environmentoptions.h> + +#include <zencore/filesystem.h> + +namespace zen { + +EnvironmentOptions::StringOption::StringOption(std::string& Value) : RefValue(Value) +{ +} +void +EnvironmentOptions::StringOption::Parse(std::string_view Value) +{ + RefValue = std::string(Value); +} + +EnvironmentOptions::FilePathOption::FilePathOption(std::filesystem::path& Value) : RefValue(Value) +{ +} +void +EnvironmentOptions::FilePathOption::Parse(std::string_view Value) +{ + RefValue = MakeSafeAbsolutePath(Value); +} + +EnvironmentOptions::BoolOption::BoolOption(bool& Value) : RefValue(Value) +{ +} +void +EnvironmentOptions::BoolOption::Parse(std::string_view Value) +{ + const std::string Lower = ToLower(Value); + if (Lower == "true" || Lower == "y" || Lower == "yes") + { + RefValue = true; + } + else if (Lower == "false" || Lower == "n" || Lower == "no") + { + RefValue = false; + } +} + +std::shared_ptr<EnvironmentOptions::OptionValue> +EnvironmentOptions::MakeOption(std::string& Value) +{ + return std::make_shared<StringOption>(Value); +} + +std::shared_ptr<EnvironmentOptions::OptionValue> +EnvironmentOptions::MakeOption(std::filesystem::path& Value) +{ + return std::make_shared<FilePathOption>(Value); +} + +std::shared_ptr<EnvironmentOptions::OptionValue> +EnvironmentOptions::MakeOption(bool& Value) +{ + return std::make_shared<BoolOption>(Value); +} + +EnvironmentOptions::EnvironmentOptions() +{ +} + +void +EnvironmentOptions::Parse(const cxxopts::ParseResult& CmdLineResult) +{ + for (auto& It : OptionMap) + { + std::string_view EnvName = It.first; + const Option& Opt = It.second; + if (CmdLineResult.count(Opt.CommandLineOptionName) == 0) + { + std::string EnvValue = GetEnvVariable(It.first); + if (!EnvValue.empty()) + { + Opt.Value->Parse(EnvValue); + } + } + } +} + +} // namespace zen diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp index 47a4e1cc4..c2cc5ab3c 100644 --- a/src/zenutil/filebuildstorage.cpp +++ b/src/zenutil/filebuildstorage.cpp @@ -35,6 +35,27 @@ public: virtual ~FileBuildStorage() {} + virtual CbObject ListNamespaces(bool bRecursive) override + { + ZEN_TRACE_CPU("FileBuildStorage::ListNamespaces"); + ZEN_UNUSED(bRecursive); + + SimulateLatency(0, 0); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + m_Stats.TotalRequestCount++; + + CbObjectWriter Writer; + Writer.BeginArray("results"); + { + } + Writer.EndArray(); // results + Writer.Save(); + SimulateLatency(Writer.GetSaveSize(), 0); + return Writer.Save(); + } + virtual CbObject ListBuilds(CbObject Query) override { ZEN_TRACE_CPU("FileBuildStorage::ListBuilds"); @@ -66,7 +87,7 @@ public: } } } - Writer.EndArray(); // builds + Writer.EndArray(); // results Writer.Save(); SimulateLatency(Writer.GetSaveSize(), 0); return Writer.Save(); @@ -235,7 +256,7 @@ public: m_Stats.TotalRequestCount++; const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); - if (!std::filesystem::is_regular_file(BlockPath)) + if (!IsFile(BlockPath)) { CreateDirectories(BlockPath.parent_path()); TemporaryFile::SafeWriteFile(BlockPath, Payload.Flatten().GetView()); @@ -260,7 +281,7 @@ public: m_Stats.TotalRequestCount++; const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); - if (!std::filesystem::is_regular_file(BlockPath)) + if (!IsFile(BlockPath)) { CreateDirectories(BlockPath.parent_path()); @@ -346,7 +367,7 @@ public: m_Stats.TotalRequestCount++; const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); - if (std::filesystem::is_regular_file(BlockPath)) + if (IsFile(BlockPath)) { BasicFile File(BlockPath, BasicFile::Mode::kRead); IoBuffer Payload; @@ -369,11 +390,11 @@ public: return IoBuffer{}; } - virtual std::vector<std::function<void()>> GetLargeBuildBlob( - const Oid& BuildId, - const IoHash& RawHash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) override + virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete) override { ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob"); ZEN_UNUSED(BuildId); @@ -383,20 +404,22 @@ public: m_Stats.TotalRequestCount++; const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); - if (std::filesystem::is_regular_file(BlockPath)) + if (IsFile(BlockPath)) { struct WorkloadData { - std::atomic<uint64_t> BytesRemaining; - BasicFile BlobFile; - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver; + std::atomic<uint64_t> BytesRemaining; + BasicFile BlobFile; + std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive; + std::function<void()> OnComplete; }; std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); Workload->BlobFile.Open(BlockPath, BasicFile::Mode::kRead); const uint64_t BlobSize = Workload->BlobFile.FileSize(); - Workload->Receiver = std::move(Receiver); + Workload->OnReceive = std::move(OnReceive); + Workload->OnComplete = std::move(OnComplete); Workload->BytesRemaining = BlobSize; std::vector<std::function<void()>> WorkItems; @@ -410,8 +433,12 @@ public: IoBuffer PartPayload(Size); Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset); m_Stats.TotalBytesRead += PartPayload.GetSize(); + Workload->OnReceive(Offset, PartPayload); uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size); - Workload->Receiver(Offset, PartPayload, ByteRemaning); + if (ByteRemaning == Size) + { + Workload->OnComplete(); + } SimulateLatency(Size, PartPayload.GetSize()); }); @@ -423,7 +450,7 @@ public: return {}; } - virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override + virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override { ZEN_TRACE_CPU("FileBuildStorage::PutBlockMetadata"); ZEN_UNUSED(BuildId); @@ -440,68 +467,107 @@ public: m_Stats.TotalBytesWritten += MetaData.GetSize(); WriteAsJson(BlockMetaDataPath, MetaData); SimulateLatency(0, 0); + return true; } - virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) override + virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override { ZEN_TRACE_CPU("FileBuildStorage::FindBlocks"); ZEN_UNUSED(BuildId); - SimulateLatency(0, 0); + SimulateLatency(sizeof(BuildId), 0); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); m_Stats.TotalRequestCount++; + uint64_t FoundCount = 0; + DirectoryContent Content; GetDirectoryContent(GetBlobsMetadataFolder(), DirectoryContentFlags::IncludeFiles, Content); - std::vector<ChunkBlockDescription> Result; + CbObjectWriter Writer; + Writer.BeginArray("blocks"); for (const std::filesystem::path& MetaDataFile : Content.Files) { IoHash ChunkHash; if (IoHash::TryParse(MetaDataFile.stem().string(), ChunkHash)) { std::filesystem::path BlockPath = GetBlobPayloadPath(ChunkHash); - if (std::filesystem::is_regular_file(BlockPath)) + if (IsFile(BlockPath)) { IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten(); m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize(); CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); - Result.emplace_back(ParseChunkBlockDescription(BlockObject)); + Writer.AddObject(BlockObject); + FoundCount++; + if (FoundCount == MaxBlockCount) + { + break; + } } } } - SimulateLatency(0, sizeof(IoHash) * Result.size()); + Writer.EndArray(); // blocks + CbObject Result = Writer.Save(); + SimulateLatency(0, Result.GetSize()); return Result; } - virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) override + virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override { ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata"); ZEN_UNUSED(BuildId); - SimulateLatency(0, 0); + SimulateLatency(sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(), 0); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); m_Stats.TotalRequestCount++; - std::vector<ChunkBlockDescription> Result; + CbObjectWriter Writer; + Writer.BeginArray("blocks"); + for (const IoHash& BlockHash : BlockHashes) { std::filesystem::path MetaDataFile = GetBlobMetadataPath(BlockHash); - if (std::filesystem::is_regular_file(MetaDataFile)) + if (IsFile(MetaDataFile)) { IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten(); m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize(); CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); - Result.emplace_back(ParseChunkBlockDescription(BlockObject)); + Writer.AddObject(BlockObject); } } - SimulateLatency(sizeof(BlockHashes) * BlockHashes.size(), sizeof(ChunkBlockDescription) * Result.size()); + Writer.EndArray(); // blocks + CbObject Result = Writer.Save(); + SimulateLatency(0, Result.GetSize()); return Result; } + virtual void PutBuildPartStats(const Oid& BuildId, + const Oid& BuildPartId, + const tsl::robin_map<std::string, double>& FloatStats) override + { + CbObjectWriter Request; + Request.BeginObject("floatStats"sv); + for (auto It : FloatStats) + { + Request.AddFloat(It.first, It.second); + } + Request.EndObject(); + CbObject Payload = Request.Save(); + + SimulateLatency(Payload.GetSize(), 0); + + const std::filesystem::path BuildPartStatsDataPath = GetBuildPartStatsPath(BuildId, BuildPartId); + CreateDirectories(BuildPartStatsDataPath.parent_path()); + + TemporaryFile::SafeWriteFile(BuildPartStatsDataPath, Payload.GetView()); + WriteAsJson(BuildPartStatsDataPath, Payload); + + SimulateLatency(0, 0); + } + protected: std::filesystem::path GetBuildsFolder() const { return m_StoragePath / "builds"; } std::filesystem::path GetBlobsFolder() const { return m_StoragePath / "blobs"; } @@ -520,6 +586,11 @@ protected: return GetBuildPartFolder(BuildId, BuildPartId) / "metadata.cb"; } + std::filesystem::path GetBuildPartStatsPath(const Oid& BuildId, const Oid& BuildPartId) const + { + return GetBuildPartFolder(BuildId, BuildPartId) / fmt::format("stats_{}.cb", Oid::NewOid()); + } + std::filesystem::path GetBlobPayloadPath(const IoHash& RawHash) const { return GetBlobsFolder() / fmt::format("{}.cbz", RawHash); } std::filesystem::path GetBlobMetadataPath(const IoHash& RawHash) const @@ -587,7 +658,7 @@ protected: BuildPartObject.IterateAttachments([&](CbFieldView FieldView) { const IoHash AttachmentHash = FieldView.AsBinaryAttachment(); const std::filesystem::path BlockPath = GetBlobPayloadPath(AttachmentHash); - if (!std::filesystem::is_regular_file(BlockPath)) + if (!IsFile(BlockPath)) { NeededAttachments.push_back(AttachmentHash); } @@ -608,13 +679,24 @@ protected: { return false; } - CompositeBuffer Decompressed = ValidateBuffer.DecompressToComposite(); - if (!Decompressed) + + IoHashStream Hash; + bool CouldDecompress = ValidateBuffer.DecompressToStream( + 0, + (uint64_t)-1, + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset, SourceSize, Offset); + for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + { + Hash.Append(Segment.GetView()); + } + return true; + }); + if (!CouldDecompress) { return false; } - IoHash Hash = IoHash::HashBuffer(Decompressed); - if (Hash != RawHash) + if (Hash.GetHash() != VerifyHash) { return false; } diff --git a/src/zenutil/include/zenutil/bufferedwritefilecache.h b/src/zenutil/include/zenutil/bufferedwritefilecache.h new file mode 100644 index 000000000..68d6c375e --- /dev/null +++ b/src/zenutil/include/zenutil/bufferedwritefilecache.h @@ -0,0 +1,106 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/basicfile.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +class CompositeBuffer; + +class BufferedWriteFileCache +{ +public: + BufferedWriteFileCache(const BufferedWriteFileCache&) = delete; + BufferedWriteFileCache& operator=(const BufferedWriteFileCache&) = delete; + + BufferedWriteFileCache(); + + ~BufferedWriteFileCache(); + + std::unique_ptr<BasicFile> Get(uint32_t FileIndex); + + void Put(uint32_t FileIndex, std::unique_ptr<BasicFile>&& Writer); + + void Close(std::span<uint32_t> FileIndexes); + + class Local + { + public: + struct Writer + { + std::unique_ptr<BasicFile> File; + std::unique_ptr<BasicFileWriter> Writer; + + inline void Write(const CompositeBuffer& Chunk, uint64_t FileOffset) + { + if (Writer) + { + Writer->Write(Chunk, FileOffset); + } + else + { + File->Write(Chunk, FileOffset); + } + } + }; + + Local(const Local&) = delete; + Local& operator=(const Local&) = delete; + + explicit Local(BufferedWriteFileCache& Cache); + ~Local(); + + Writer* GetWriter(uint32_t FileIndex); + Writer* PutWriter(uint32_t FileIndex, std::unique_ptr<Writer> Writer); + + private: + tsl::robin_map<uint32_t, uint32_t> m_FileIndexToWriterIndex; + std::vector<std::unique_ptr<Writer>> m_ChunkWriters; + BufferedWriteFileCache& m_Cache; + }; + +private: + static constexpr size_t MaxHandlesPerPath = 7; + static constexpr size_t MaxBufferedCount = 1024; + struct TOpenHandles + { + BasicFile* Files[MaxHandlesPerPath]; + uint64_t Size = 0; + inline BasicFile* Pop() + { + if (Size > 0) + { + return Files[--Size]; + } + else + { + return nullptr; + } + } + inline bool Push(BasicFile* File) + { + if (Size < MaxHandlesPerPath) + { + Files[Size++] = File; + return true; + } + return false; + } + }; + static_assert(sizeof(TOpenHandles) == 64); + + RwLock m_WriterLock; + tsl::robin_map<uint32_t, uint32_t> m_ChunkWriters; + std::vector<TOpenHandles> m_OpenFiles; + std::atomic<uint32_t> m_CacheHitCount; + std::atomic<uint32_t> m_CacheMissCount; + std::atomic<uint32_t> m_OpenHandleCount; + std::atomic<uint32_t> m_DroppedHandleCount; +}; + +} // namespace zen diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h index 9d2bab170..f49d4b42a 100644 --- a/src/zenutil/include/zenutil/buildstorage.h +++ b/src/zenutil/include/zenutil/buildstorage.h @@ -5,6 +5,10 @@ #include <zencore/compactbinary.h> #include <zenutil/chunkblock.h> +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + namespace zen { class BuildStorage @@ -21,6 +25,7 @@ public: virtual ~BuildStorage() {} + virtual CbObject ListNamespaces(bool bRecursive = false) = 0; virtual CbObject ListBuilds(CbObject Query) = 0; virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) = 0; virtual CbObject GetBuild(const Oid& BuildId) = 0; @@ -43,16 +48,18 @@ public: virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset = 0, - uint64_t RangeBytes = (uint64_t)-1) = 0; - virtual std::vector<std::function<void()>> GetLargeBuildBlob( - const Oid& BuildId, - const IoHash& RawHash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) = 0; - - virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0; - virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) = 0; - virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0; + uint64_t RangeBytes = (uint64_t)-1) = 0; + virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete) = 0; + + [[nodiscard]] virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0; + virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) = 0; + virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0; + + virtual void PutBuildPartStats(const Oid& BuildId, const Oid& BuildPartId, const tsl::robin_map<std::string, double>& FloatStats) = 0; }; } // namespace zen diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h new file mode 100644 index 000000000..e1fb73fd4 --- /dev/null +++ b/src/zenutil/include/zenutil/buildstoragecache.h @@ -0,0 +1,57 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/logging.h> + +#include <zencore/compactbinary.h> +#include <zencore/compositebuffer.h> +#include <zenutil/chunkblock.h> + +namespace zen { + +class HttpClient; + +class BuildStorageCache +{ +public: + struct Statistics + { + std::atomic<uint64_t> TotalBytesRead = 0; + std::atomic<uint64_t> TotalBytesWritten = 0; + std::atomic<uint64_t> TotalRequestCount = 0; + std::atomic<uint64_t> TotalRequestTimeUs = 0; + std::atomic<uint64_t> TotalExecutionTimeUs = 0; + }; + + virtual ~BuildStorageCache() {} + + virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) = 0; + virtual IoBuffer GetBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + uint64_t RangeOffset = 0, + uint64_t RangeBytes = (uint64_t)-1) = 0; + + virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) = 0; + virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0; + + struct BlobExistsResult + { + bool HasBody = 0; + bool HasMetadata = 0; + }; + + virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0; + + virtual void Flush( + int32_t UpdateIntervalMS, + std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0; +}; + +std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& HttpClient, + BuildStorageCache::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount); +} // namespace zen diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h index 57b55cb8e..306a5d990 100644 --- a/src/zenutil/include/zenutil/chunkedcontent.h +++ b/src/zenutil/include/zenutil/chunkedcontent.h @@ -67,7 +67,7 @@ FolderContent GetFolderContent(GetFolderContentStatistics& Stats, std::function<bool(const std::string_view& RelativePath)>&& AcceptDirectory, std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& AcceptFile, WorkerThreadPool& WorkerPool, - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, std::atomic<bool>& AbortFlag); @@ -94,10 +94,31 @@ struct ChunkedFolderContent ChunkedContentData ChunkedContent; }; +struct ChunkedContentLookup +{ + struct ChunkSequenceLocation + { + uint32_t SequenceIndex = (uint32_t)-1; + uint64_t Offset = (uint64_t)-1; + }; + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex; + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceIndex; + std::vector<uint32_t> SequenceIndexChunkOrderOffset; + std::vector<ChunkSequenceLocation> ChunkSequenceLocations; + std::vector<size_t> + ChunkSequenceLocationOffset; // ChunkSequenceLocations[ChunkLocationOffset[ChunkIndex]] -> start of sources for ChunkIndex + std::vector<uint32_t> ChunkSequenceLocationCounts; // ChunkSequenceLocationCounts[ChunkIndex] count of chunk locations for ChunkIndex + std::vector<uint32_t> SequenceIndexFirstPathIndex; // SequenceIndexFirstPathIndex[SequenceIndex] -> first path index with that RawHash + std::vector<uint32_t> PathExtensionHash; +}; + void SaveChunkedFolderContentToCompactBinary(const ChunkedFolderContent& Content, CbWriter& Output); ChunkedFolderContent LoadChunkedFolderContentToCompactBinary(CbObjectView Input); ChunkedFolderContent MergeChunkedFolderContents(const ChunkedFolderContent& Base, std::span<const ChunkedFolderContent> Overlays); +ChunkedFolderContent DeletePathsFromChunkedContent(const ChunkedFolderContent& Base, + const ChunkedContentLookup& BaseContentLookup, + std::span<const std::filesystem::path> DeletedPaths); ChunkedFolderContent DeletePathsFromChunkedContent(const ChunkedFolderContent& Base, std::span<const std::filesystem::path> DeletedPaths); struct ChunkingStatistics @@ -111,31 +132,15 @@ struct ChunkingStatistics uint64_t ElapsedWallTimeUS = 0; }; -ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats, - WorkerThreadPool& WorkerPool, - const std::filesystem::path& RootPath, - const FolderContent& Content, - const ChunkingController& InChunkingController, - int32_t UpdateInteralMS, - std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, - std::atomic<bool>& AbortFlag); - -struct ChunkedContentLookup -{ - struct ChunkSequenceLocation - { - uint32_t SequenceIndex = (uint32_t)-1; - uint64_t Offset = (uint64_t)-1; - }; - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex; - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceIndex; - std::vector<uint32_t> SequenceIndexChunkOrderOffset; - std::vector<ChunkSequenceLocation> ChunkSequenceLocations; - std::vector<size_t> - ChunkSequenceLocationOffset; // ChunkSequenceLocations[ChunkLocationOffset[ChunkIndex]] -> start of sources for ChunkIndex - std::vector<uint32_t> ChunkSequenceLocationCounts; // ChunkSequenceLocationCounts[ChunkIndex] count of chunk locations for ChunkIndex - std::vector<uint32_t> SequenceIndexFirstPathIndex; // SequenceIndexFirstPathIndex[SequenceIndex] -> first path index with that RawHash -}; +ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats, + WorkerThreadPool& WorkerPool, + const std::filesystem::path& RootPath, + const FolderContent& Content, + const ChunkingController& InChunkingController, + int32_t UpdateIntervalMS, + std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)>&& UpdateCallback, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag); ChunkedContentLookup BuildChunkedContentLookup(const ChunkedFolderContent& Content); diff --git a/src/zenutil/include/zenutil/chunkingcontroller.h b/src/zenutil/include/zenutil/chunkingcontroller.h index 246f4498a..315502265 100644 --- a/src/zenutil/include/zenutil/chunkingcontroller.h +++ b/src/zenutil/include/zenutil/chunkingcontroller.h @@ -11,7 +11,11 @@ namespace zen { -const std::vector<std::string_view> DefaultChunkingExcludeExtensions = {".exe", ".dll", ".pdb", ".self", ".mp4"}; +const std::vector<std::string> DefaultChunkingExcludeExtensions = + {".exe", ".dll", ".pdb", ".self", ".mp4", ".zip", ".7z", ".bzip", ".rar", ".gzip"}; +const std::vector<std::string> DefaultFixedChunkingExtensions = {".apk", ".nsp", ".xvc", ".pkg", ".dmg", ".ipa"}; +const bool DefaultChunkingExcludeElfFiles = true; +const bool DefaultChunkingExcludeMachOFiles = true; const ChunkedParams DefaultChunkedParams = {.MinSize = ((8u * 1u) * 1024u) - 128u, .MaxSize = 128u * 1024u, @@ -19,7 +23,8 @@ const ChunkedParams DefaultChunkedParams = {.MinSize = ((8u * 1u) * 1024u) - 128 const size_t DefaultChunkingFileSizeLimit = DefaultChunkedParams.MaxSize; -const uint32_t DefaultFixedChunkingChunkSize = 16u * 1024u * 1024u; +const uint64_t DefaultFixedChunkingChunkSize = 32u * 1024u * 1024u; +const uint64_t DefaultMinSizeForFixedChunking = DefaultFixedChunkingChunkSize * 8u; struct ChunkedInfoWithSource; @@ -38,17 +43,31 @@ public: virtual CbObject GetParameters() const = 0; }; -std::unique_ptr<ChunkingController> CreateBasicChunkingController( - std::span<const std::string_view> ExcludeExtensions = DefaultChunkingExcludeExtensions, - uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit, - const ChunkedParams& ChunkingParams = DefaultChunkedParams); +struct BasicChunkingControllerSettings +{ + std::vector<std::string> ExcludeExtensions = DefaultChunkingExcludeExtensions; + bool ExcludeElfFiles = DefaultChunkingExcludeElfFiles; + bool ExcludeMachOFiles = DefaultChunkingExcludeMachOFiles; + uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit; + ChunkedParams ChunkingParams = DefaultChunkedParams; +}; + +std::unique_ptr<ChunkingController> CreateBasicChunkingController(const BasicChunkingControllerSettings& Settings); std::unique_ptr<ChunkingController> CreateBasicChunkingController(CbObjectView Parameters); -std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking( - std::span<const std::string_view> ExcludeExtensions = DefaultChunkingExcludeExtensions, - uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit, - const ChunkedParams& ChunkingParams = DefaultChunkedParams, - uint32_t FixedChunkingChunkSize = DefaultFixedChunkingChunkSize); +struct ChunkingControllerWithFixedChunkingSettings +{ + std::vector<std::string> FixedChunkingExtensions = DefaultFixedChunkingExtensions; + std::vector<std::string> ExcludeExtensions = DefaultChunkingExcludeExtensions; + bool ExcludeElfFiles = DefaultChunkingExcludeElfFiles; + bool ExcludeMachOFiles = DefaultChunkingExcludeMachOFiles; + uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit; + ChunkedParams ChunkingParams = DefaultChunkedParams; + uint64_t FixedChunkingChunkSize = DefaultFixedChunkingChunkSize; + uint64_t MinSizeForFixedChunking = DefaultMinSizeForFixedChunking; +}; + +std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(const ChunkingControllerWithFixedChunkingSettings& Setting); std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(CbObjectView Parameters); std::unique_ptr<ChunkingController> CreateChunkingController(std::string_view Name, CbObjectView Parameters); diff --git a/src/zenutil/include/zenutil/commandlineoptions.h b/src/zenutil/include/zenutil/commandlineoptions.h new file mode 100644 index 000000000..f927d41e5 --- /dev/null +++ b/src/zenutil/include/zenutil/commandlineoptions.h @@ -0,0 +1,29 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> +#include <filesystem> + +ZEN_THIRD_PARTY_INCLUDES_START + +namespace cxxopts::values { +// We declare this specialization before including cxxopts to make it stick +void parse_value(const std::string& text, std::filesystem::path& value); +} // namespace cxxopts::values + +#include <cxxopts.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +std::vector<std::string> ParseCommandLine(std::string_view CommandLine); +std::vector<char*> StripCommandlineQuotes(std::vector<std::string>& InOutArgs); +void MakeSafeAbsolutePathÍnPlace(std::filesystem::path& Path); +[[nodiscard]] std::filesystem::path MakeSafeAbsolutePath(const std::filesystem::path& Path); +std::filesystem::path StringToPath(const std::string_view& Path); +std::string_view RemoveQuotes(const std::string_view& Arg); + +void commandlineoptions_forcelink(); // internal + +} // namespace zen diff --git a/src/zenutil/include/zenutil/environmentoptions.h b/src/zenutil/include/zenutil/environmentoptions.h new file mode 100644 index 000000000..7418608e4 --- /dev/null +++ b/src/zenutil/include/zenutil/environmentoptions.h @@ -0,0 +1,92 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/string.h> +#include <zenutil/commandlineoptions.h> + +namespace zen { + +class EnvironmentOptions +{ +public: + class OptionValue + { + public: + virtual void Parse(std::string_view Value) = 0; + + virtual ~OptionValue() {} + }; + + class StringOption : public OptionValue + { + public: + explicit StringOption(std::string& Value); + virtual void Parse(std::string_view Value) override; + std::string& RefValue; + }; + + class FilePathOption : public OptionValue + { + public: + explicit FilePathOption(std::filesystem::path& Value); + virtual void Parse(std::string_view Value) override; + std::filesystem::path& RefValue; + }; + + class BoolOption : public OptionValue + { + public: + explicit BoolOption(bool& Value); + virtual void Parse(std::string_view Value); + bool& RefValue; + }; + + template<Integral T> + class NumberOption : public OptionValue + { + public: + explicit NumberOption(T& Value) : RefValue(Value) {} + virtual void Parse(std::string_view Value) override + { + if (std::optional<T> OptionalValue = ParseInt<T>(Value); OptionalValue.has_value()) + { + RefValue = OptionalValue.value(); + } + } + T& RefValue; + }; + + struct Option + { + std::string CommandLineOptionName; + std::shared_ptr<OptionValue> Value; + }; + + std::shared_ptr<OptionValue> MakeOption(std::string& Value); + std::shared_ptr<OptionValue> MakeOption(std::filesystem::path& Value); + + template<Integral T> + std::shared_ptr<OptionValue> MakeOption(T& Value) + { + return std::make_shared<NumberOption<T>>(Value); + }; + + std::shared_ptr<OptionValue> MakeOption(bool& Value); + + template<typename T> + void AddOption(std::string_view EnvName, T& Value, std::string_view CommandLineOptionName = "") + { + OptionMap.insert_or_assign(std::string(EnvName), + Option{.CommandLineOptionName = std::string(CommandLineOptionName), .Value = MakeOption(Value)}); + }; + + EnvironmentOptions(); + + void Parse(const cxxopts::ParseResult& CmdLineResult); + +private: + std::unordered_map<std::string, Option> OptionMap; +}; + +} // namespace zen diff --git a/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h b/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h index 89fc70140..bbf070993 100644 --- a/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h +++ b/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h @@ -13,5 +13,6 @@ std::unique_ptr<BuildStorage> CreateJupiterBuildStorage(LoggerRef InLog, BuildStorage::Statistics& Stats, std::string_view Namespace, std::string_view Bucket, + bool AllowRedirect, const std::filesystem::path& TempFolderPath); } // namespace zen diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h index 2c5fc73b8..b79790f25 100644 --- a/src/zenutil/include/zenutil/jupiter/jupitersession.h +++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h @@ -65,7 +65,7 @@ struct FinalizeBuildPartResult : JupiterResult class JupiterSession { public: - JupiterSession(LoggerRef InLog, HttpClient& InHttpClient); + JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect); ~JupiterSession(); JupiterResult Authenticate(); @@ -102,6 +102,8 @@ public: std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); + JupiterResult ListBuildNamespaces(); + JupiterResult ListBuildBuckets(std::string_view Namespace); JupiterResult ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload); JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload); JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); @@ -135,13 +137,14 @@ public: uint64_t PayloadSize, std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems); - JupiterResult GetMultipartBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver, - std::vector<std::function<JupiterResult()>>& OutWorkItems); + JupiterResult GetMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete, + std::vector<std::function<JupiterResult()>>& OutWorkItems); JupiterResult PutBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, @@ -152,9 +155,15 @@ public: const Oid& BuildId, const Oid& PartId, const IoHash& RawHash); - JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); + JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount); JupiterResult GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload); + JupiterResult PutBuildPartStats(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& BuildPartId, + IoBuffer Payload); + private: inline LoggerRef Log() { return m_Log; } @@ -164,6 +173,7 @@ private: LoggerRef m_Log; HttpClient& m_HttpClient; + const bool m_AllowRedirect = false; }; } // namespace zen diff --git a/src/zenutil/include/zenutil/logging/rotatingfilesink.h b/src/zenutil/include/zenutil/logging/rotatingfilesink.h index 758722156..cd28bdcb2 100644 --- a/src/zenutil/include/zenutil/logging/rotatingfilesink.h +++ b/src/zenutil/include/zenutil/logging/rotatingfilesink.h @@ -27,7 +27,6 @@ public: { ZEN_MEMSCOPE(ELLMTag::Logging); - ZEN_MEMSCOPE(ELLMTag::Logging); std::error_code Ec; if (RotateOnOpen) { diff --git a/src/zenutil/include/zenutil/parallellwork.h b/src/zenutil/include/zenutil/parallellwork.h deleted file mode 100644 index 79798fc8d..000000000 --- a/src/zenutil/include/zenutil/parallellwork.h +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/except.h> -#include <zencore/fmtutils.h> -#include <zencore/thread.h> -#include <zencore/workthreadpool.h> - -#include <atomic> - -namespace zen { - -class ParallellWork -{ -public: - ParallellWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) {} - - ~ParallellWork() - { - // Make sure to call Wait before destroying - ZEN_ASSERT(m_PendingWork.Remaining() == 0); - } - - std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)> DefaultErrorFunction() - { - return [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex.what()); }); - AbortFlag = true; - }; - } - - void ScheduleWork(WorkerThreadPool& WorkerPool, - std::function<void(std::atomic<bool>& AbortFlag)>&& Work, - std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)>&& OnError) - { - m_PendingWork.AddCount(1); - try - { - WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = std::move(OnError)] { - try - { - Work(m_AbortFlag); - } - catch (const AssertException& AssertEx) - { - OnError( - std::runtime_error(fmt::format("Caught assert exception while handling request: {}", AssertEx.FullDescription())), - m_AbortFlag); - } - catch (const std::system_error& SystemError) - { - if (IsOOM(SystemError.code())) - { - OnError(std::runtime_error(fmt::format("Out of memory. Reason: {}", SystemError.what())), m_AbortFlag); - } - else if (IsOOD(SystemError.code())) - { - OnError(std::runtime_error(fmt::format("Out of disk. Reason: {}", SystemError.what())), m_AbortFlag); - } - else - { - OnError(std::runtime_error(fmt::format("System error. Reason: {}", SystemError.what())), m_AbortFlag); - } - } - catch (const std::exception& Ex) - { - OnError(Ex, m_AbortFlag); - } - m_PendingWork.CountDown(); - }); - } - catch (const std::exception&) - { - m_PendingWork.CountDown(); - throw; - } - } - - void Abort() { m_AbortFlag = true; } - - bool IsAborted() const { return m_AbortFlag.load(); } - - void Wait(int32_t UpdateInteralMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback) - { - ZEN_ASSERT(m_PendingWork.Remaining() > 0); - m_PendingWork.CountDown(); - while (!m_PendingWork.Wait(UpdateInteralMS)) - { - UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining()); - } - if (m_Errors.size() == 1) - { - throw std::runtime_error(m_Errors.front()); - } - else if (m_Errors.size() > 1) - { - ExtendableStringBuilder<128> SB; - SB.Append("Multiple errors:"); - for (const std::string& Error : m_Errors) - { - SB.Append(fmt::format("\n {}", Error)); - } - throw std::runtime_error(SB.ToString()); - } - } - Latch& PendingWork() { return m_PendingWork; } - -private: - std::atomic<bool>& m_AbortFlag; - Latch m_PendingWork; - - RwLock m_ErrorLock; - std::vector<std::string> m_Errors; -}; - -} // namespace zen diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h new file mode 100644 index 000000000..639c6968c --- /dev/null +++ b/src/zenutil/include/zenutil/parallelwork.h @@ -0,0 +1,77 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/scopeguard.h> +#include <zencore/thread.h> +#include <zencore/workthreadpool.h> + +#include <atomic> + +namespace zen { + +class ParallelWork +{ +public: + ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag); + + ~ParallelWork(); + + typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback; + typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback; + typedef std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)> UpdateCallback; + + void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {}) + { + m_PendingWork.AddCount(1); + try + { + WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] { + auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); }); + try + { + while (m_PauseFlag && !m_AbortFlag) + { + Sleep(2000); + } + Work(m_AbortFlag); + } + catch (...) + { + OnError(std::current_exception(), m_AbortFlag); + } + }); + } + catch (const std::exception&) + { + m_PendingWork.CountDown(); + throw; + } + } + + void Abort() { m_AbortFlag = true; } + + bool IsAborted() const { return m_AbortFlag.load(); } + + void Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback); + + void Wait(); + + Latch& PendingWork() { return m_PendingWork; } + +private: + ExceptionCallback DefaultErrorFunction(); + void RethrowErrors(); + + std::atomic<bool>& m_AbortFlag; + std::atomic<bool>& m_PauseFlag; + bool m_DispatchComplete = false; + Latch m_PendingWork; + + RwLock m_ErrorLock; + std::vector<std::exception_ptr> m_Errors; +}; + +void parallellwork_forcelink(); + +} // namespace zen diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h index 9683ad720..df2033bca 100644 --- a/src/zenutil/include/zenutil/workerpools.h +++ b/src/zenutil/include/zenutil/workerpools.h @@ -21,6 +21,9 @@ WorkerThreadPool& GetMediumWorkerPool(EWorkloadType WorkloadType); // Worker pool with std::thread::hardware_concurrency() / 8 worker threads, but at least one thread WorkerThreadPool& GetSmallWorkerPool(EWorkloadType WorkloadType); +// Worker pool with minimum number of worker threads, but at least one thread +WorkerThreadPool& GetTinyWorkerPool(EWorkloadType WorkloadType); + // Special worker pool that does not use worker thread but issues all scheduled work on the calling thread // This is useful for debugging when multiple async thread can make stepping in debugger complicated WorkerThreadPool& GetSyncWorkerPool(); diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp index bf89ce785..9974725ff 100644 --- a/src/zenutil/jupiter/jupiterbuildstorage.cpp +++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp @@ -25,8 +25,9 @@ public: Statistics& Stats, std::string_view Namespace, std::string_view Bucket, + bool AllowRedirect, const std::filesystem::path& TempFolderPath) - : m_Session(InLog, InHttpClient) + : m_Session(InLog, InHttpClient, AllowRedirect) , m_Stats(Stats) , m_Namespace(Namespace) , m_Bucket(Bucket) @@ -35,6 +36,61 @@ public: } virtual ~JupiterBuildStorage() {} + virtual CbObject ListNamespaces(bool bRecursive) override + { + ZEN_TRACE_CPU("Jupiter::ListNamespaces"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + JupiterResult ListResult = m_Session.ListBuildNamespaces(); + AddStatistic(ListResult); + if (!ListResult.Success) + { + throw std::runtime_error(fmt::format("Failed listing namespaces: {} ({})", ListResult.Reason, ListResult.ErrorCode)); + } + CbObject NamespaceResponse = PayloadToCbObject("Failed listing namespaces"sv, ListResult.Response); + + CbObjectWriter Response; + Response.BeginArray("results"sv); + for (CbFieldView NamespaceField : NamespaceResponse["namespaces"]) + { + std::string_view Namespace = NamespaceField.AsString(); + if (!Namespace.empty()) + { + Response.BeginObject(); + Response.AddString("name", Namespace); + + if (bRecursive) + { + JupiterResult BucketsResult = m_Session.ListBuildBuckets(Namespace); + AddStatistic(BucketsResult); + if (!BucketsResult.Success) + { + throw std::runtime_error( + fmt::format("Failed listing namespaces: {} ({})", BucketsResult.Reason, BucketsResult.ErrorCode)); + } + CbObject BucketResponse = PayloadToCbObject("Failed listing namespaces"sv, BucketsResult.Response); + + Response.BeginArray("items"); + for (CbFieldView BucketField : BucketResponse["buckets"]) + { + std::string_view Bucket = BucketField.AsString(); + if (!Bucket.empty()) + { + Response.AddString(Bucket); + } + } + Response.EndArray(); + } + + Response.EndObject(); + } + } + Response.EndArray(); + + return Response.Save(); + } + virtual CbObject ListBuilds(CbObject Query) override { ZEN_TRACE_CPU("Jupiter::ListBuilds"); @@ -49,7 +105,7 @@ public: { throw std::runtime_error(fmt::format("Failed listing builds: {} ({})", ListResult.Reason, ListResult.ErrorCode)); } - return PayloadToJson("Failed listing builds"sv, ListResult.Response); + return PayloadToCbObject("Failed listing builds"sv, ListResult.Response); } virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override @@ -66,7 +122,7 @@ public: { throw std::runtime_error(fmt::format("Failed creating build: {} ({})", PutResult.Reason, PutResult.ErrorCode)); } - return PayloadToJson(fmt::format("Failed creating build: {}", BuildId), PutResult.Response); + return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response); } virtual CbObject GetBuild(const Oid& BuildId) override @@ -81,7 +137,7 @@ public: { throw std::runtime_error(fmt::format("Failed fetching build: {} ({})", GetBuildResult.Reason, GetBuildResult.ErrorCode)); } - return PayloadToJson(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response); + return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response); } virtual void FinalizeBuild(const Oid& BuildId) override @@ -134,7 +190,7 @@ public: GetBuildPartResult.Reason, GetBuildPartResult.ErrorCode)); } - return PayloadToJson(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response); + return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response); } virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override @@ -235,19 +291,25 @@ public: return std::move(GetBuildBlobResult.Response); } - virtual std::vector<std::function<void()>> GetLargeBuildBlob( - const Oid& BuildId, - const IoHash& RawHash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) override + virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete) override { ZEN_TRACE_CPU("Jupiter::GetLargeBuildBlob"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); std::vector<std::function<JupiterResult()>> WorkItems; - JupiterResult GetMultipartBlobResult = - m_Session.GetMultipartBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ChunkSize, std::move(Receiver), WorkItems); + JupiterResult GetMultipartBlobResult = m_Session.GetMultipartBuildBlob(m_Namespace, + m_Bucket, + BuildId, + RawHash, + ChunkSize, + std::move(OnReceive), + std::move(OnComplete), + WorkItems); AddStatistic(GetMultipartBlobResult); if (!GetMultipartBlobResult.Success) @@ -272,7 +334,7 @@ public: return WorkList; } - virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override + virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override { ZEN_TRACE_CPU("Jupiter::PutBlockMetadata"); @@ -284,27 +346,32 @@ public: AddStatistic(PutMetaResult); if (!PutMetaResult.Success) { + if (PutMetaResult.ErrorCode == int32_t(HttpResponseCode::NotFound)) + { + return false; + } throw std::runtime_error( fmt::format("Failed putting build block metadata: {} ({})", PutMetaResult.Reason, PutMetaResult.ErrorCode)); } + return true; } - virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) override + virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override { ZEN_TRACE_CPU("Jupiter::FindBlocks"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult FindResult = m_Session.FindBlocks(m_Namespace, m_Bucket, BuildId); + JupiterResult FindResult = m_Session.FindBlocks(m_Namespace, m_Bucket, BuildId, MaxBlockCount); AddStatistic(FindResult); if (!FindResult.Success) { throw std::runtime_error(fmt::format("Failed fetching known blocks: {} ({})", FindResult.Reason, FindResult.ErrorCode)); } - return ParseChunkBlockDescriptionList(PayloadToJson("Failed fetching known blocks"sv, FindResult.Response)); + return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response); } - virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) override + virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override { ZEN_TRACE_CPU("Jupiter::GetBlockMetadata"); @@ -328,28 +395,35 @@ public: throw std::runtime_error( fmt::format("Failed fetching block metadatas: {} ({})", GetBlockMetadataResult.Reason, GetBlockMetadataResult.ErrorCode)); } - std::vector<ChunkBlockDescription> UnorderedList = - ParseChunkBlockDescriptionList(PayloadToJson("Failed fetching block metadatas", GetBlockMetadataResult.Response)); - tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup; - for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++) + return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response); + } + + virtual void PutBuildPartStats(const Oid& BuildId, + const Oid& BuildPartId, + const tsl::robin_map<std::string, double>& FloatStats) override + { + ZEN_UNUSED(BuildId, BuildPartId, FloatStats); + CbObjectWriter Request; + Request.BeginObject("floatStats"sv); + for (auto It : FloatStats) { - const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex]; - BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex); + Request.AddFloat(It.first, It.second); } - std::vector<ChunkBlockDescription> SortedBlockDescriptions; - SortedBlockDescriptions.reserve(BlockDescriptionLookup.size()); - for (const IoHash& BlockHash : BlockHashes) + Request.EndObject(); + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + JupiterResult PutBuildPartStatsResult = m_Session.PutBuildPartStats(m_Namespace, m_Bucket, BuildId, BuildPartId, Payload); + AddStatistic(PutBuildPartStatsResult); + if (!PutBuildPartStatsResult.Success) { - if (auto It = BlockDescriptionLookup.find(BlockHash); It != BlockDescriptionLookup.end()) - { - SortedBlockDescriptions.push_back(std::move(UnorderedList[It->second])); - } + throw std::runtime_error(fmt::format("Failed posting build part statistics: {} ({})", + PutBuildPartStatsResult.Reason, + PutBuildPartStatsResult.ErrorCode)); } - return SortedBlockDescriptions; } private: - static CbObject PayloadToJson(std::string_view Context, const IoBuffer& Payload) + static CbObject PayloadToCbObject(std::string_view Context, const IoBuffer& Payload) { if (Payload.GetContentType() == ZenContentType::kJSON) { @@ -394,11 +468,12 @@ CreateJupiterBuildStorage(LoggerRef InLog, BuildStorage::Statistics& Stats, std::string_view Namespace, std::string_view Bucket, + bool AllowRedirect, const std::filesystem::path& TempFolderPath) { ZEN_TRACE_CPU("CreateJupiterBuildStorage"); - return std::make_unique<JupiterBuildStorage>(InLog, InHttpClient, Stats, Namespace, Bucket, TempFolderPath); + return std::make_unique<JupiterBuildStorage>(InLog, InHttpClient, Stats, Namespace, Bucket, AllowRedirect, TempFolderPath); } } // namespace zen diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp index 68f214c06..1fd59acdf 100644 --- a/src/zenutil/jupiter/jupitersession.cpp +++ b/src/zenutil/jupiter/jupitersession.cpp @@ -5,6 +5,7 @@ #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compositebuffer.h> +#include <zencore/compress.h> #include <zencore/fmtutils.h> #include <zencore/trace.h> @@ -48,7 +49,10 @@ namespace detail { } } // namespace detail -JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient) : m_Log(InLog), m_HttpClient(InHttpClient) +JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect) +: m_Log(InLog) +, m_HttpClient(InHttpClient) +, m_AllowRedirect(AllowRedirect) { } @@ -357,12 +361,28 @@ JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view Typ } JupiterResult +JupiterSession::ListBuildNamespaces() +{ + HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds"), {HttpClient::Accept(ZenContentType::kJSON)}); + return detail::ConvertResponse(Response, "JupiterSession::ListBuildNamespaces"sv); +} + +JupiterResult +JupiterSession::ListBuildBuckets(std::string_view Namespace) +{ + HttpClient::Response Response = + m_HttpClient.Get(fmt::format("/api/v2/builds/{}", Namespace), {HttpClient::Accept(ZenContentType::kJSON)}); + return detail::ConvertResponse(Response, "JupiterSession::ListBuildBuckets"sv); +} + +JupiterResult JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload) { ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/search", Namespace, BucketId), - Payload, - {HttpClient::Accept(ZenContentType::kCbObject)}); + std::string OptionalBucketPath = BucketId.empty() ? "" : fmt::format("/{}", BucketId); + HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}{}/search", Namespace, OptionalBucketPath), + Payload, + {HttpClient::Accept(ZenContentType::kCbObject)}); return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv); } @@ -527,12 +547,14 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, bool& OutIsComplete) -> JupiterResult { const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex]; IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte); - std::string MultipartUploadResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}", - Namespace, - BucketId, - BuildId, - Hash.ToHexString(), - Part.QueryString); + std::string MultipartUploadResponseRequestString = + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + Part.QueryString, + m_AllowRedirect ? "true"sv : "false"sv); // ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString); HttpClient::Response MultipartUploadResponse = m_HttpClient.Put(MultipartUploadResponseRequestString, PartPayload); if (!MultipartUploadResponse.IsSuccess()) @@ -590,12 +612,13 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, IoBuffer RetryPartPayload = Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte - 1); std::string RetryMultipartUploadResponseRequestString = - fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}", + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}", Namespace, BucketId, BuildId, Hash.ToHexString(), - RetryPart.QueryString); + RetryPart.QueryString, + m_AllowRedirect ? "true"sv : "false"sv); MultipartUploadResponse = m_HttpClient.Put(RetryMultipartUploadResponseRequestString, RetryPartPayload); TotalUploadedBytes = MultipartUploadResponse.UploadedBytes; @@ -626,15 +649,21 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, } JupiterResult -JupiterSession::GetMultipartBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver, - std::vector<std::function<JupiterResult()>>& OutWorkItems) -{ - std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()); +JupiterSession::GetMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete, + std::vector<std::function<JupiterResult()>>& OutWorkItems) +{ + std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + m_AllowRedirect ? "true"sv : "false"sv); HttpClient::Response Response = m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}})); if (Response.IsSuccess()) @@ -649,46 +678,68 @@ JupiterSession::GetMultipartBuildBlob(std::string_view Namespa uint64_t TotalSize = TotalSizeMaybe.value(); uint64_t PayloadSize = Response.ResponsePayload.GetSize(); - Receiver(0, Response.ResponsePayload, TotalSize); + OnReceive(0, Response.ResponsePayload); if (TotalSize > PayloadSize) { struct WorkloadData { - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver; - std::atomic<uint64_t> BytesRemaining; + std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive; + std::function<void()> OnComplete; + std::atomic<uint64_t> BytesRemaining; }; std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - Workload->Receiver = std::move(Receiver); + Workload->OnReceive = std::move(OnReceive); + Workload->OnComplete = std::move(OnComplete); Workload->BytesRemaining = TotalSize - PayloadSize; uint64_t Offset = PayloadSize; while (Offset < TotalSize) { uint64_t PartSize = Min(ChunkSize, TotalSize - Offset); - OutWorkItems.emplace_back( - [this, Namespace, BucketId, BuildId, Hash, TotalSize, Workload, Offset, PartSize]() -> JupiterResult { - std::string RequestUrl = - fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()); - HttpClient::Response Response = m_HttpClient.Get( - RequestUrl, - HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}})); - if (Response.IsSuccess()) + OutWorkItems.emplace_back([this, + Namespace = std::string(Namespace), + BucketId = std::string(BucketId), + BuildId = Oid(BuildId), + Hash = IoHash(Hash), + TotalSize, + Workload, + Offset, + PartSize]() -> JupiterResult { + std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + m_AllowRedirect ? "true"sv : "false"sv); + HttpClient::Response Response = m_HttpClient.Get( + RequestUrl, + HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}})); + if (Response.IsSuccess()) + { + Workload->OnReceive(Offset, Response.ResponsePayload); + uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize()); + if (ByteRemaning == Response.ResponsePayload.GetSize()) { - uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize()); - Workload->Receiver(Offset, Response.ResponsePayload, ByteRemaning); + Workload->OnComplete(); } - return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); - }); + } + return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); + }); Offset += PartSize; } } + else + { + OnComplete(); + } return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); } } } - Receiver(0, Response.ResponsePayload, Response.ResponsePayload.GetSize()); + OnReceive(0, Response.ResponsePayload); + OnComplete(); } return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); } @@ -707,11 +758,28 @@ JupiterSession::GetBuildBlob(std::string_view Namespace, { Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", Offset, Offset + Size - 1)}); } - HttpClient::Response Response = - m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()), - TempFolderPath, - Headers); - + HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + m_AllowRedirect ? "true"sv : "false"sv), + TempFolderPath, + Headers); + if (Response.IsSuccess()) + { + // If we get a redirect to S3 or a non-Jupiter endpoint the content type will not be correct, validate it and set it + if (m_AllowRedirect && (Response.ResponsePayload.GetContentType() == HttpContentType::kBinary)) + { + IoHash ValidateRawHash; + uint64_t ValidateRawSize = 0; + ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload, ValidateRawHash, ValidateRawSize)); + ZEN_ASSERT_SLOW(ValidateRawHash == Hash); + ZEN_ASSERT_SLOW(ValidateRawSize > 0); + ZEN_UNUSED(ValidateRawHash, ValidateRawSize); + Response.ResponsePayload.SetContentType(ZenContentType::kCompressedBinary); + } + } return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv); } @@ -758,10 +826,12 @@ JupiterSession::FinalizeBuildPart(std::string_view Namespace, } JupiterResult -JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) +JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount) { - HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks", Namespace, BucketId, BuildId), - HttpClient::Accept(ZenContentType::kCbObject)); + const std::string Parameters = MaxBlockCount == (uint64_t)-1 ? "" : fmt::format("?count={}", MaxBlockCount); + HttpClient::Response Response = + m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks{}", Namespace, BucketId, BuildId, Parameters), + HttpClient::Accept(ZenContentType::kCbObject)); return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv); } @@ -776,4 +846,19 @@ JupiterSession::GetBlockMetadata(std::string_view Namespace, std::string_view Bu return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv); } +JupiterResult +JupiterSession::PutBuildPartStats(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& BuildPartId, + IoBuffer Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = + m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/stats", Namespace, BucketId, BuildId, BuildPartId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "JupiterSession::PutBuildPartStats"sv); +} + } // namespace zen diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp new file mode 100644 index 000000000..aa806438b --- /dev/null +++ b/src/zenutil/parallelwork.cpp @@ -0,0 +1,225 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/parallelwork.h> + +#include <zencore/callstack.h> +#include <zencore/except.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> + +#include <typeinfo> + +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +#endif // ZEN_WITH_TESTS + +namespace zen { + +ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag) +: m_AbortFlag(AbortFlag) +, m_PauseFlag(PauseFlag) +, m_PendingWork(1) +{ +} + +ParallelWork::~ParallelWork() +{ + try + { + if (!m_DispatchComplete) + { + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + ZEN_WARN( + "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads " + "to complete"); + m_PendingWork.CountDown(); + } + m_PendingWork.Wait(); + ptrdiff_t RemainingWork = m_PendingWork.Remaining(); + if (RemainingWork != 0) + { + void* Frames[8]; + uint32_t FrameCount = GetCallstack(2, 8, Frames); + CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames); + ZEN_ERROR("ParallelWork destructor waited for outstanding work but pending work count is {} instead of 0\n{}", + RemainingWork, + CallstackToString(Callstack, " ")); + FreeCallstack(Callstack); + + uint32_t WaitedMs = 0; + while (m_PendingWork.Remaining() > 0 && WaitedMs < 2000) + { + Sleep(50); + WaitedMs += 50; + } + RemainingWork = m_PendingWork.Remaining(); + if (RemainingWork != 0) + { + ZEN_WARN("ParallelWork destructor safety wait failed, pending work count at {}", RemainingWork) + } + else + { + ZEN_INFO("ParallelWork destructor safety wait succeeded"); + } + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Exception in ~ParallelWork: {}", Ex.what()); + } +} + +ParallelWork::ExceptionCallback +ParallelWork::DefaultErrorFunction() +{ + return [&](std::exception_ptr Ex, std::atomic<bool>& AbortFlag) { + m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); }); + AbortFlag = true; + }; +} + +void +ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback) +{ + ZEN_ASSERT(!m_DispatchComplete); + m_DispatchComplete = true; + + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + m_PendingWork.CountDown(); + + while (!m_PendingWork.Wait(UpdateIntervalMS)) + { + UpdateCallback(m_AbortFlag.load(), m_PauseFlag.load(), m_PendingWork.Remaining()); + } + + RethrowErrors(); +} + +void +ParallelWork::Wait() +{ + ZEN_ASSERT(!m_DispatchComplete); + m_DispatchComplete = true; + + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + m_PendingWork.CountDown(); + m_PendingWork.Wait(); + + RethrowErrors(); +} + +void +ParallelWork::RethrowErrors() +{ + if (!m_Errors.empty()) + { + if (m_Errors.size() > 1) + { + ZEN_INFO("Multiple exceptions thrown during ParallelWork execution, dropping the following exceptions:"); + auto It = m_Errors.begin() + 1; + while (It != m_Errors.end()) + { + try + { + std::rethrow_exception(*It); + } + catch (const std::exception& Ex) + { + ZEN_INFO(" {}", Ex.what()); + } + It++; + } + } + std::exception_ptr Ex = m_Errors.front(); + m_Errors.clear(); + std::rethrow_exception(Ex); + } +} + +#if ZEN_WITH_TESTS + +TEST_CASE("parallellwork.nowork") +{ + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + Work.Wait(); +} + +TEST_CASE("parallellwork.basic") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + for (uint32_t I = 0; I < 5; I++) + { + Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); }); + } + Work.Wait(); +} + +TEST_CASE("parallellwork.throws_in_work") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + for (uint32_t I = 0; I < 10; I++) + { + Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) { + ZEN_UNUSED(AbortFlag); + if (I > 3) + { + throw std::runtime_error("We throw in async thread"); + } + else + { + Sleep(10); + } + }); + } + CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread"); +} + +TEST_CASE("parallellwork.throws_in_dispatch") +{ + WorkerThreadPool WorkerPool(2); + std::atomic<uint32_t> ExecutedCount; + try + { + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + for (uint32_t I = 0; I < 5; I++) + { + Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + ExecutedCount++; + }); + if (I == 3) + { + throw std::runtime_error("We throw in dispatcher thread"); + } + } + CHECK(false); + } + catch (const std::runtime_error& Ex) + { + CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what())); + CHECK_LE(ExecutedCount.load(), 4); + } +} + +void +parallellwork_forcelink() +{ +} +#endif // ZEN_WITH_TESTS + +} // namespace zen diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp index e3165e838..797034978 100644 --- a/src/zenutil/workerpools.cpp +++ b/src/zenutil/workerpools.cpp @@ -11,9 +11,10 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { namespace { - const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(std::thread::hardware_concurrency()); + const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(Max(std::thread::hardware_concurrency() - 1u, 2u)); const int MediumWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 2u)); const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 8u), 1u)); + const int TinyWorkerThreadPoolTreadCount = 1; static bool IsShutDown = false; @@ -35,6 +36,9 @@ namespace { WorkerPool BurstSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(burst)"}; WorkerPool BackgroundSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(bkg)"}; + WorkerPool BurstTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "TinyThreadPool(burst)"}; + WorkerPool BackgroundTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "TinyThreadPool(bkg)"}; + WorkerPool SyncWorkerPool = {.TreadCount = 0, .Name = "SyncThreadPool"}; WorkerThreadPool& EnsurePoolPtr(WorkerPool& Pool) @@ -75,6 +79,12 @@ GetSmallWorkerPool(EWorkloadType WorkloadType) } WorkerThreadPool& +GetTinyWorkerPool(EWorkloadType WorkloadType) +{ + return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstTinyWorkerPool : BackgroundTinyWorkerPool); +} + +WorkerThreadPool& GetSyncWorkerPool() { return EnsurePoolPtr(SyncWorkerPool); @@ -91,6 +101,8 @@ ShutdownWorkerPools() BackgroundMediumWorkerPool.Pool.reset(); BurstSmallWorkerPool.Pool.reset(); BackgroundSmallWorkerPool.Pool.reset(); + BurstTinyWorkerPool.Pool.reset(); + BackgroundTinyWorkerPool.Pool.reset(); SyncWorkerPool.Pool.reset(); } } // namespace zen diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp index 0409cb976..4fd1cef9a 100644 --- a/src/zenutil/zenserverprocess.cpp +++ b/src/zenutil/zenserverprocess.cpp @@ -547,7 +547,7 @@ ZenServerEnvironment::CreateNewTestDir() TestDir << "test"sv << int64_t(ZenServerTestCounter.fetch_add(1)); std::filesystem::path TestPath = m_TestBaseDir / TestDir.c_str(); - ZEN_ASSERT(!std::filesystem::exists(TestPath)); + ZEN_ASSERT(!IsDir(TestPath)); ZEN_INFO("Creating new test dir @ '{}'", TestPath); @@ -581,7 +581,7 @@ ZenServerInstance::~ZenServerInstance() { Shutdown(); std::error_code DummyEc; - std::filesystem::remove(std::filesystem::temp_directory_path() / ("zenserver_" + m_Name + ".log"), DummyEc); + RemoveFile(std::filesystem::temp_directory_path() / ("zenserver_" + m_Name + ".log"), DummyEc); } catch (const std::exception& Err) { @@ -632,6 +632,7 @@ ZenServerInstance::Shutdown() std::error_code Ec; if (SignalShutdown(Ec)) { + Stopwatch Timer; ZEN_DEBUG("Waiting for zenserver process {} ({}) to shut down", m_Name, m_Process.Pid()); while (!m_Process.Wait(1000)) { @@ -645,7 +646,10 @@ ZenServerInstance::Shutdown() ZEN_WARN("Wait abandoned by exited process"); return 0; } - ZEN_WARN("Waiting for zenserver process {} ({}) timed out", m_Name, m_Process.Pid()); + ZEN_WARN("Waited for zenserver process {} ({}) to exit for {}", + m_Name, + m_Process.Pid(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } ZEN_DEBUG("zenserver process {} ({}) exited", m_Name, m_Process.Pid()); int ExitCode = m_Process.GetExitCode(); @@ -1046,7 +1050,7 @@ std::string ZenServerInstance::GetLogOutput() const { std::filesystem::path OutputPath = std::filesystem::temp_directory_path() / ("zenserver_" + m_Name + ".log"); - if (std::filesystem::is_regular_file(OutputPath)) + if (IsFile(OutputPath)) { FileContents Contents = ReadFile(OutputPath); if (!Contents.ErrorCode) @@ -1068,7 +1072,7 @@ ZenServerInstance::Terminate() const std::filesystem::path BaseDir = m_Env.ProgramBaseDir(); const std::filesystem::path Executable = BaseDir / "zenserver" ZEN_EXE_SUFFIX_LITERAL; ProcessHandle RunningProcess; - std::error_code Ec = FindProcess(Executable, RunningProcess); + std::error_code Ec = FindProcess(Executable, RunningProcess, /*IncludeSelf*/ false); if (Ec) { throw std::system_error(Ec, fmt::format("failed to look up running server executable '{}'", Executable)); @@ -1136,7 +1140,7 @@ ValidateLockFileInfo(const LockFileInfo& Info, std::string& OutReason) OutReason = fmt::format("listen port ({}) is not valid", Info.EffectiveListenPort); return false; } - if (!std::filesystem::is_directory(Info.DataDir)) + if (!IsDir(Info.DataDir)) { OutReason = fmt::format("data directory ('{}') does not exist", Info.DataDir); return false; diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp index 19eb63ce9..fe23b00c1 100644 --- a/src/zenutil/zenutil.cpp +++ b/src/zenutil/zenutil.cpp @@ -7,6 +7,8 @@ # include <zenutil/cache/cacherequests.h> # include <zenutil/cache/rpcrecording.h> # include <zenutil/chunkedfile.h> +# include <zenutil/commandlineoptions.h> +# include <zenutil/parallelwork.h> namespace zen { @@ -17,6 +19,8 @@ zenutil_forcelinktests() cache::rpcrecord_forcelink(); cacherequests_forcelink(); chunkedfile_forcelink(); + commandlineoptions_forcelink(); + parallellwork_forcelink(); } } // namespace zen |