diff options
| author | Florent Devillechabrol <[email protected]> | 2025-04-02 10:38:02 -0700 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-04-02 10:38:02 -0700 |
| commit | 486a22ad2c61bc1616d8745e0077eb320089bfec (patch) | |
| tree | 665d5c9002cd97c04ddffeaf211fcf8e55d01dce /src/zenutil | |
| parent | Fixed missing trailing quote when converting binary data from compact binary ... (diff) | |
| parent | added --find-max-block-count option to builds upload (#337) (diff) | |
| download | zen-486a22ad2c61bc1616d8745e0077eb320089bfec.tar.xz zen-486a22ad2c61bc1616d8745e0077eb320089bfec.zip | |
Merge branch 'main' into fd-fix-binary-json
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/buildstoragecache.cpp | 407 | ||||
| -rw-r--r-- | src/zenutil/cache/rpcrecording.cpp | 8 | ||||
| -rw-r--r-- | src/zenutil/chunkblock.cpp | 2 | ||||
| -rw-r--r-- | src/zenutil/chunkedcontent.cpp | 26 | ||||
| -rw-r--r-- | src/zenutil/chunkingcontroller.cpp | 39 | ||||
| -rw-r--r-- | src/zenutil/filebuildstorage.cpp | 48 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/buildstorage.h | 6 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/buildstoragecache.h | 57 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkingcontroller.h | 4 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/jupiter/jupitersession.h | 2 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/logging/rotatingfilesink.h | 1 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/workerpools.h | 3 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupiterbuildstorage.cpp | 37 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupitersession.cpp | 8 | ||||
| -rw-r--r-- | src/zenutil/workerpools.cpp | 14 | ||||
| -rw-r--r-- | src/zenutil/zenserverprocess.cpp | 8 |
16 files changed, 603 insertions, 67 deletions
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp new file mode 100644 index 000000000..f273ac699 --- /dev/null +++ b/src/zenutil/buildstoragecache.cpp @@ -0,0 +1,407 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/buildstoragecache.h> + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/fmtutils.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> +#include <zencore/trace.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/httpclient.h> +#include <zenhttp/packageformat.h> +#include <zenutil/workerpools.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_set.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +using namespace std::literals; + +class ZenBuildStorageCache : public BuildStorageCache +{ +public: + explicit ZenBuildStorageCache(HttpClient& HttpClient, + BuildStorageCache::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount) + : m_HttpClient(HttpClient) + , m_Stats(Stats) + , m_Namespace(Namespace.empty() ? "none" : Namespace) + , m_Bucket(Bucket.empty() ? "none" : Bucket) + , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred()) + , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount) + , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background) + : GetTinyWorkerPool(EWorkloadType::Background)) + , m_PendingBackgroundWorkCount(1) + , m_CancelBackgroundWork(false) + { + } + + virtual ~ZenBuildStorageCache() + { + try + { + m_CancelBackgroundWork.store(true); + if (!IsFlushed) + { + m_PendingBackgroundWorkCount.CountDown(); + m_PendingBackgroundWorkCount.Wait(); + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~ZenBuildStorageCache() failed with: {}", Ex.what()); + } + } + + void ScheduleBackgroundWork(std::function<void()>&& Work) + { + m_PendingBackgroundWorkCount.AddCount(1); + try + { + m_BackgroundWorkPool.ScheduleWork([this, Work = std::move(Work)]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork"); + auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); }); + if (!m_CancelBackgroundWork) + { + try + { + Work(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what()); + } + } + }); + } + catch (const std::exception& Ex) + { + m_PendingBackgroundWorkCount.CountDown(); + ZEN_ERROR("Failed scheduling background upload to build cache. Reason: {}", Ex.what()); + } + } + + virtual void PutBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + ZenContentType ContentType, + const CompositeBuffer& Payload) override + { + ZEN_ASSERT(!IsFlushed); + ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); + ScheduleBackgroundWork( + [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob"); + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + HttpClient::Response CacheResponse = + m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), + Payload, + ContentType); + AddStatistic(CacheResponse); + if (!CacheResponse.IsSuccess()) + { + ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv)); + } + }); + } + + virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::GetBuildBlob"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + HttpClient::KeyValueMap Headers; + if (RangeOffset != 0 || RangeBytes != (uint64_t)-1) + { + Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", RangeOffset, RangeOffset + RangeBytes - 1)}); + } + CreateDirectories(m_TempFolderPath); + HttpClient::Response CacheResponse = + m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), + m_TempFolderPath, + Headers); + AddStatistic(CacheResponse); + if (CacheResponse.IsSuccess()) + { + return CacheResponse.ResponsePayload; + } + return {}; + } + + virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override + { + ZEN_ASSERT(!IsFlushed); + ScheduleBackgroundWork([this, + BuildId = Oid(BuildId), + BlobRawHashes = std::vector<IoHash>(BlobHashes.begin(), BlobHashes.end()), + MetaDatas = std::vector<CbObject>(MetaDatas.begin(), MetaDatas.end())]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::PutBlobMetadatas"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + const uint64_t BlobCount = BlobRawHashes.size(); + + CbPackage RequestPackage; + std::vector<CbAttachment> Attachments; + tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes; + Attachments.reserve(BlobCount); + AttachmentHashes.reserve(BlobCount); + { + CbObjectWriter RequestWriter; + RequestWriter.BeginArray("blobHashes"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) + { + RequestWriter.AddHash(BlobRawHashes[BlockHashIndex]); + } + RequestWriter.EndArray(); // blobHashes + + RequestWriter.BeginArray("metadatas"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) + { + const IoHash ObjectHash = MetaDatas[BlockHashIndex].GetHash(); + RequestWriter.AddBinaryAttachment(ObjectHash); + if (!AttachmentHashes.contains(ObjectHash)) + { + Attachments.push_back(CbAttachment(MetaDatas[BlockHashIndex], ObjectHash)); + AttachmentHashes.insert(ObjectHash); + } + } + + RequestWriter.EndArray(); // metadatas + + RequestPackage.SetObject(RequestWriter.Save()); + } + RequestPackage.AddAttachments(Attachments); + + CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage); + + HttpClient::Response CacheResponse = + m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/putBlobMetadata", m_Namespace, m_Bucket, BuildId), + RpcRequestBuffer, + ZenContentType::kCbPackage); + AddStatistic(CacheResponse); + if (!CacheResponse.IsSuccess()) + { + ZEN_DEBUG("Failed posting blob metadata to cache: {}", CacheResponse.ErrorMessage(""sv)); + } + }); + } + + virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::GetBlobMetadatas"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + CbObjectWriter Request; + + Request.BeginArray("blobHashes"sv); + for (const IoHash& BlobHash : BlobHashes) + { + Request.AddHash(BlobHash); + } + Request.EndArray(); + + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + + HttpClient::Response Response = + m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/getBlobMetadata", m_Namespace, m_Bucket, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + AddStatistic(Response); + if (Response.IsSuccess()) + { + std::vector<CbObject> Result; + + CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); + CbObject ResponseObject = ResponsePackage.GetObject(); + + CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView(); + CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView(); + Result.reserve(MetadatasArray.Num()); + auto BlobHashesIt = BlobHashes.begin(); + auto BlobHashArrayIt = begin(BlobHashArray); + auto MetadataArrayIt = begin(MetadatasArray); + while (MetadataArrayIt != end(MetadatasArray)) + { + const IoHash BlobHash = (*BlobHashArrayIt).AsHash(); + while (BlobHash != *BlobHashesIt) + { + ZEN_ASSERT(BlobHashesIt != BlobHashes.end()); + BlobHashesIt++; + } + + ZEN_ASSERT(BlobHash == *BlobHashesIt); + + const IoHash MetaHash = (*MetadataArrayIt).AsAttachment(); + const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash); + ZEN_ASSERT(MetaAttachment); + + CbObject Metadata = MetaAttachment->AsObject(); + Result.emplace_back(std::move(Metadata)); + + BlobHashArrayIt++; + MetadataArrayIt++; + BlobHashesIt++; + } + return Result; + } + return {}; + } + + virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::BlobsExists"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + CbObjectWriter Request; + + Request.BeginArray("blobHashes"sv); + for (const IoHash& BlobHash : BlobHashes) + { + Request.AddHash(BlobHash); + } + Request.EndArray(); + + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + + HttpClient::Response Response = m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/exists", m_Namespace, m_Bucket, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + AddStatistic(Response); + if (Response.IsSuccess()) + { + CbObject ResponseObject = LoadCompactBinaryObject(Response.ResponsePayload); + if (!ResponseObject) + { + throw std::runtime_error("BlobExists reponse is invalid, failed to load payload as compact binary object"); + } + CbArrayView BlobsExistsArray = ResponseObject["blobExists"sv].AsArrayView(); + if (!BlobsExistsArray) + { + throw std::runtime_error("BlobExists reponse is invalid, 'blobExists' array is missing"); + } + if (BlobsExistsArray.Num() != BlobHashes.size()) + { + throw std::runtime_error(fmt::format("BlobExists reponse is invalid, 'blobExists' array contains {} entries, expected {}", + BlobsExistsArray.Num(), + BlobHashes.size())); + } + + CbArrayView MetadatasExistsArray = ResponseObject["metadataExists"sv].AsArrayView(); + if (!MetadatasExistsArray) + { + throw std::runtime_error("BlobExists reponse is invalid, 'metadataExists' array is missing"); + } + if (MetadatasExistsArray.Num() != BlobHashes.size()) + { + throw std::runtime_error( + fmt::format("BlobExists reponse is invalid, 'metadataExists' array contains {} entries, expected {}", + MetadatasExistsArray.Num(), + BlobHashes.size())); + } + + std::vector<BlobExistsResult> Result; + Result.reserve(BlobHashes.size()); + auto BlobExistsIt = begin(BlobsExistsArray); + auto MetadataExistsIt = begin(MetadatasExistsArray); + while (BlobExistsIt != end(BlobsExistsArray)) + { + ZEN_ASSERT(MetadataExistsIt != end(MetadatasExistsArray)); + + const bool HasBody = (*BlobExistsIt).AsBool(); + const bool HasMetadata = (*MetadataExistsIt).AsBool(); + + Result.push_back({.HasBody = HasBody, .HasMetadata = HasMetadata}); + + BlobExistsIt++; + MetadataExistsIt++; + } + return Result; + } + return {}; + } + + virtual void Flush(int32_t UpdateInteralMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override + { + if (IsFlushed) + { + return; + } + if (!IsFlushed) + { + m_PendingBackgroundWorkCount.CountDown(); + IsFlushed = true; + } + if (m_PendingBackgroundWorkCount.Wait(100)) + { + return; + } + while (true) + { + intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining(); + if (UpdateCallback(Remaining)) + { + if (m_PendingBackgroundWorkCount.Wait(UpdateInteralMS)) + { + UpdateCallback(0); + return; + } + } + else + { + m_CancelBackgroundWork.store(true); + } + } + } + +private: + void AddStatistic(const HttpClient::Response& Result) + { + m_Stats.TotalBytesWritten += Result.UploadedBytes; + m_Stats.TotalBytesRead += Result.DownloadedBytes; + m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); + m_Stats.TotalRequestCount++; + } + + HttpClient& m_HttpClient; + BuildStorageCache::Statistics& m_Stats; + const std::string m_Namespace; + const std::string m_Bucket; + const std::filesystem::path m_TempFolderPath; + const bool m_BoostBackgroundThreadCount; + bool IsFlushed = false; + + WorkerThreadPool& m_BackgroundWorkPool; + Latch m_PendingBackgroundWorkCount; + std::atomic<bool> m_CancelBackgroundWork; +}; + +std::unique_ptr<BuildStorageCache> +CreateZenBuildStorageCache(HttpClient& HttpClient, + BuildStorageCache::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount) +{ + return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount); +} + +} // namespace zen diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index 1f951167d..380c182b2 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -46,7 +46,7 @@ struct RecordedRequestsWriter void BeginWrite(const std::filesystem::path& BasePath) { m_BasePath = BasePath; - std::filesystem::create_directories(m_BasePath); + CreateDirectories(m_BasePath); } void EndWrite() @@ -426,7 +426,7 @@ RecordedRequestsSegmentWriter::BeginWrite(const std::filesystem::path& BasePath, m_BasePath = BasePath; m_SegmentIndex = SegmentIndex; m_RequestBaseIndex = RequestBaseIndex; - std::filesystem::create_directories(m_BasePath); + CreateDirectories(m_BasePath); } void @@ -1051,14 +1051,14 @@ public: static bool IsCompatible(const std::filesystem::path& BasePath) { - if (std::filesystem::exists(BasePath / "rpc_recording_info.zcb")) + if (IsFile(BasePath / "rpc_recording_info.zcb")) { return true; } const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0); - if (std::filesystem::exists(SegmentZero / "rpc_segment_info.zcb") && std::filesystem::exists(SegmentZero / "index.bin")) + if (IsFile(SegmentZero / "rpc_segment_info.zcb") && IsFile(SegmentZero / "index.bin")) { // top-level metadata is missing, possibly because of premature exit // on the recording side diff --git a/src/zenutil/chunkblock.cpp b/src/zenutil/chunkblock.cpp index f3c14edc4..abfc0fb63 100644 --- a/src/zenutil/chunkblock.cpp +++ b/src/zenutil/chunkblock.cpp @@ -52,7 +52,7 @@ ParseChunkBlockDescriptionList(const CbObjectView& BlocksObject) return {}; } std::vector<ChunkBlockDescription> Result; - CbArrayView Blocks = BlocksObject["blocks"].AsArrayView(); + CbArrayView Blocks = BlocksObject["blocks"sv].AsArrayView(); Result.reserve(Blocks.Num()); for (CbFieldView BlockView : Blocks) { diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp index bb1ee5183..32ae2d94a 100644 --- a/src/zenutil/chunkedcontent.cpp +++ b/src/zenutil/chunkedcontent.cpp @@ -140,8 +140,12 @@ namespace { { ZEN_TRACE_CPU("HashOnly"); - IoBuffer Buffer = IoBufferBuilder::MakeFromFile((FolderPath / Path).make_preferred()); - const IoHash Hash = IoHash::HashBuffer(Buffer, &Stats.BytesHashed); + IoBuffer Buffer = IoBufferBuilder::MakeFromFile((FolderPath / Path).make_preferred()); + if (Buffer.GetSize() != RawSize) + { + throw std::runtime_error(fmt::format("Failed opening file '{}' for hashing", FolderPath / Path)); + } + const IoHash Hash = IoHash::HashBuffer(Buffer, &Stats.BytesHashed); Lock.WithExclusiveLock([&]() { if (!RawHashToSequenceRawHashIndex.contains(Hash)) @@ -304,14 +308,22 @@ FolderContent GetUpdatedContent(const FolderContent& Old, const FolderContent& New, std::vector<std::filesystem::path>& OutDeletedPathIndexes) { ZEN_TRACE_CPU("FolderContent::GetUpdatedContent"); - FolderContent Result = {.Platform = Old.Platform}; + + const uint32_t NewPathCount = gsl::narrow<uint32_t>(New.Paths.size()); + + FolderContent Result = {.Platform = Old.Platform}; + Result.Paths.reserve(NewPathCount); + Result.RawSizes.reserve(NewPathCount); + Result.Attributes.reserve(NewPathCount); + Result.ModificationTicks.reserve(NewPathCount); + tsl::robin_map<std::string, uint32_t> NewPathToIndex; - const uint32_t NewPathCount = gsl::narrow<uint32_t>(New.Paths.size()); NewPathToIndex.reserve(NewPathCount); for (uint32_t NewPathIndex = 0; NewPathIndex < NewPathCount; NewPathIndex++) { NewPathToIndex.insert({New.Paths[NewPathIndex].generic_string(), NewPathIndex}); } + uint32_t OldPathCount = gsl::narrow<uint32_t>(Old.Paths.size()); for (uint32_t OldPathIndex = 0; OldPathIndex < OldPathCount; OldPathIndex++) { @@ -667,6 +679,12 @@ DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span const ChunkedContentLookup BaseLookup = BuildChunkedContentLookup(BaseContent); tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex; + const size_t ExpectedCount = BaseContent.Paths.size() - DeletedPaths.size(); + Result.Paths.reserve(ExpectedCount); + Result.RawSizes.reserve(ExpectedCount); + Result.Attributes.reserve(ExpectedCount); + Result.RawHashes.reserve(ExpectedCount); + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceRawHashIndex; for (uint32_t PathIndex = 0; PathIndex < BaseContent.Paths.size(); PathIndex++) { diff --git a/src/zenutil/chunkingcontroller.cpp b/src/zenutil/chunkingcontroller.cpp index 2a7057a46..a5ebce193 100644 --- a/src/zenutil/chunkingcontroller.cpp +++ b/src/zenutil/chunkingcontroller.cpp @@ -41,9 +41,13 @@ class BasicChunkingController : public ChunkingController { public: BasicChunkingController(std::span<const std::string_view> ExcludeExtensions, + bool ExcludeElfFiles, + bool ExcludeMachOFiles, uint64_t ChunkFileSizeLimit, const ChunkedParams& ChunkingParams) : m_ChunkExcludeExtensions(ExcludeExtensions.begin(), ExcludeExtensions.end()) + , m_ExcludeElfFiles(ExcludeElfFiles) + , m_ExcludeMachOFiles(ExcludeMachOFiles) , m_ChunkFileSizeLimit(ChunkFileSizeLimit) , m_ChunkingParams(ChunkingParams) { @@ -51,6 +55,8 @@ public: BasicChunkingController(CbObjectView Parameters) : m_ChunkExcludeExtensions(ReadStringArray(Parameters["ChunkExcludeExtensions"sv].AsArrayView())) + , m_ExcludeElfFiles(Parameters["ExcludeElfFiles"sv].AsBool(DefaultChunkingExcludeElfFiles)) + , m_ExcludeMachOFiles(Parameters["ExcludeMachOFiles"sv].AsBool(DefaultChunkingExcludeMachOFiles)) , m_ChunkFileSizeLimit(Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit)) , m_ChunkingParams(ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView())) { @@ -73,6 +79,25 @@ public: } BasicFile Buffer(InputPath, BasicFile::Mode::kRead); + if (m_ExcludeElfFiles && Buffer.FileSize() > 4) + { + uint32_t ElfCheck = 0; + Buffer.Read(&ElfCheck, 4, 0); + if (ElfCheck == 0x464c457f) + { + return false; + } + } + if (m_ExcludeMachOFiles && Buffer.FileSize() > 4) + { + uint32_t MachOCheck = 0; + Buffer.Read(&MachOCheck, 4, 0); + if ((MachOCheck == 0xfeedface) || (MachOCheck == 0xcefaedfe)) + { + return false; + } + } + OutChunked = ChunkData(Buffer, 0, RawSize, m_ChunkingParams, &BytesProcessed, &AbortFlag); return true; } @@ -90,6 +115,10 @@ public: } } Writer.EndArray(); // ChunkExcludeExtensions + + Writer.AddBool("ExcludeElfFiles"sv, m_ExcludeElfFiles); + Writer.AddBool("ExcludeMachOFiles"sv, m_ExcludeMachOFiles); + Writer.AddInteger("ChunkFileSizeLimit"sv, m_ChunkFileSizeLimit); Writer.BeginObject("ChunkingParams"sv); { @@ -106,6 +135,8 @@ public: protected: const std::vector<std::string> m_ChunkExcludeExtensions; + const bool m_ExcludeElfFiles = false; + const bool m_ExcludeMachOFiles = false; const uint64_t m_ChunkFileSizeLimit; const ChunkedParams m_ChunkingParams; }; @@ -230,10 +261,16 @@ protected: std::unique_ptr<ChunkingController> CreateBasicChunkingController(std::span<const std::string_view> ExcludeExtensions, + bool ExcludeElfFiles, + bool ExcludeMachOFiles, uint64_t ChunkFileSizeLimit, const ChunkedParams& ChunkingParams) { - return std::make_unique<BasicChunkingController>(ExcludeExtensions, ChunkFileSizeLimit, ChunkingParams); + return std::make_unique<BasicChunkingController>(ExcludeExtensions, + ExcludeElfFiles, + ExcludeMachOFiles, + ChunkFileSizeLimit, + ChunkingParams); } std::unique_ptr<ChunkingController> CreateBasicChunkingController(CbObjectView Parameters) diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp index 130fec355..7aa252e44 100644 --- a/src/zenutil/filebuildstorage.cpp +++ b/src/zenutil/filebuildstorage.cpp @@ -235,7 +235,7 @@ public: m_Stats.TotalRequestCount++; const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); - if (!std::filesystem::is_regular_file(BlockPath)) + if (!IsFile(BlockPath)) { CreateDirectories(BlockPath.parent_path()); TemporaryFile::SafeWriteFile(BlockPath, Payload.Flatten().GetView()); @@ -260,7 +260,7 @@ public: m_Stats.TotalRequestCount++; const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); - if (!std::filesystem::is_regular_file(BlockPath)) + if (!IsFile(BlockPath)) { CreateDirectories(BlockPath.parent_path()); @@ -346,7 +346,7 @@ public: m_Stats.TotalRequestCount++; const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); - if (std::filesystem::is_regular_file(BlockPath)) + if (IsFile(BlockPath)) { BasicFile File(BlockPath, BasicFile::Mode::kRead); IoBuffer Payload; @@ -383,7 +383,7 @@ public: m_Stats.TotalRequestCount++; const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); - if (std::filesystem::is_regular_file(BlockPath)) + if (IsFile(BlockPath)) { struct WorkloadData { @@ -442,63 +442,77 @@ public: SimulateLatency(0, 0); } - virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) override + virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override { ZEN_TRACE_CPU("FileBuildStorage::FindBlocks"); ZEN_UNUSED(BuildId); - SimulateLatency(0, 0); + SimulateLatency(sizeof(BuildId), 0); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); m_Stats.TotalRequestCount++; + uint64_t FoundCount = 0; + DirectoryContent Content; GetDirectoryContent(GetBlobsMetadataFolder(), DirectoryContentFlags::IncludeFiles, Content); - std::vector<ChunkBlockDescription> Result; + CbObjectWriter Writer; + Writer.BeginArray("blocks"); for (const std::filesystem::path& MetaDataFile : Content.Files) { IoHash ChunkHash; if (IoHash::TryParse(MetaDataFile.stem().string(), ChunkHash)) { std::filesystem::path BlockPath = GetBlobPayloadPath(ChunkHash); - if (std::filesystem::is_regular_file(BlockPath)) + if (IsFile(BlockPath)) { IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten(); m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize(); CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); - Result.emplace_back(ParseChunkBlockDescription(BlockObject)); + Writer.AddObject(BlockObject); + FoundCount++; + if (FoundCount == MaxBlockCount) + { + break; + } } } } - SimulateLatency(0, sizeof(IoHash) * Result.size()); + Writer.EndArray(); // blocks + CbObject Result = Writer.Save(); + SimulateLatency(0, Result.GetSize()); return Result; } - virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) override + virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override { ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata"); ZEN_UNUSED(BuildId); - SimulateLatency(0, 0); + SimulateLatency(sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(), 0); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); m_Stats.TotalRequestCount++; - std::vector<ChunkBlockDescription> Result; + CbObjectWriter Writer; + Writer.BeginArray("blocks"); + for (const IoHash& BlockHash : BlockHashes) { std::filesystem::path MetaDataFile = GetBlobMetadataPath(BlockHash); - if (std::filesystem::is_regular_file(MetaDataFile)) + if (IsFile(MetaDataFile)) { IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten(); m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize(); CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); - Result.emplace_back(ParseChunkBlockDescription(BlockObject)); + Writer.AddObject(BlockObject); } } - SimulateLatency(sizeof(BlockHashes) * BlockHashes.size(), sizeof(ChunkBlockDescription) * Result.size()); + Writer.EndArray(); // blocks + CbObject Result = Writer.Save(); + SimulateLatency(0, Result.GetSize()); return Result; } @@ -616,7 +630,7 @@ protected: BuildPartObject.IterateAttachments([&](CbFieldView FieldView) { const IoHash AttachmentHash = FieldView.AsBinaryAttachment(); const std::filesystem::path BlockPath = GetBlobPayloadPath(AttachmentHash); - if (!std::filesystem::is_regular_file(BlockPath)) + if (!IsFile(BlockPath)) { NeededAttachments.push_back(AttachmentHash); } diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h index 2ebd65a00..b0665dbf8 100644 --- a/src/zenutil/include/zenutil/buildstorage.h +++ b/src/zenutil/include/zenutil/buildstorage.h @@ -54,9 +54,9 @@ public: uint64_t ChunkSize, std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) = 0; - virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0; - virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) = 0; - virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0; + virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0; + virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) = 0; + virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0; virtual void PutBuildPartStats(const Oid& BuildId, const Oid& BuildPartId, const tsl::robin_map<std::string, double>& FloatStats) = 0; }; diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h new file mode 100644 index 000000000..cab35328d --- /dev/null +++ b/src/zenutil/include/zenutil/buildstoragecache.h @@ -0,0 +1,57 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/logging.h> + +#include <zencore/compactbinary.h> +#include <zencore/compositebuffer.h> +#include <zenutil/chunkblock.h> + +namespace zen { + +class HttpClient; + +class BuildStorageCache +{ +public: + struct Statistics + { + std::atomic<uint64_t> TotalBytesRead = 0; + std::atomic<uint64_t> TotalBytesWritten = 0; + std::atomic<uint64_t> TotalRequestCount = 0; + std::atomic<uint64_t> TotalRequestTimeUs = 0; + std::atomic<uint64_t> TotalExecutionTimeUs = 0; + }; + + virtual ~BuildStorageCache() {} + + virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) = 0; + virtual IoBuffer GetBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + uint64_t RangeOffset = 0, + uint64_t RangeBytes = (uint64_t)-1) = 0; + + virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) = 0; + virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0; + + struct BlobExistsResult + { + bool HasBody = 0; + bool HasMetadata = 0; + }; + + virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0; + + virtual void Flush( + int32_t UpdateInteralMS, + std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0; +}; + +std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& HttpClient, + BuildStorageCache::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount); +} // namespace zen diff --git a/src/zenutil/include/zenutil/chunkingcontroller.h b/src/zenutil/include/zenutil/chunkingcontroller.h index 246f4498a..970917fb0 100644 --- a/src/zenutil/include/zenutil/chunkingcontroller.h +++ b/src/zenutil/include/zenutil/chunkingcontroller.h @@ -12,6 +12,8 @@ namespace zen { const std::vector<std::string_view> DefaultChunkingExcludeExtensions = {".exe", ".dll", ".pdb", ".self", ".mp4"}; +const bool DefaultChunkingExcludeElfFiles = true; +const bool DefaultChunkingExcludeMachOFiles = true; const ChunkedParams DefaultChunkedParams = {.MinSize = ((8u * 1u) * 1024u) - 128u, .MaxSize = 128u * 1024u, @@ -40,6 +42,8 @@ public: std::unique_ptr<ChunkingController> CreateBasicChunkingController( std::span<const std::string_view> ExcludeExtensions = DefaultChunkingExcludeExtensions, + bool ExcludeElfFiles = DefaultChunkingExcludeElfFiles, + bool ExcludeMachOFiles = DefaultChunkingExcludeMachOFiles, uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit, const ChunkedParams& ChunkingParams = DefaultChunkedParams); std::unique_ptr<ChunkingController> CreateBasicChunkingController(CbObjectView Parameters); diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h index fda4a7bfe..417ed7384 100644 --- a/src/zenutil/include/zenutil/jupiter/jupitersession.h +++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h @@ -152,7 +152,7 @@ public: const Oid& BuildId, const Oid& PartId, const IoHash& RawHash); - JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); + JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount); JupiterResult GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload); JupiterResult PutBuildPartStats(std::string_view Namespace, diff --git a/src/zenutil/include/zenutil/logging/rotatingfilesink.h b/src/zenutil/include/zenutil/logging/rotatingfilesink.h index 758722156..cd28bdcb2 100644 --- a/src/zenutil/include/zenutil/logging/rotatingfilesink.h +++ b/src/zenutil/include/zenutil/logging/rotatingfilesink.h @@ -27,7 +27,6 @@ public: { ZEN_MEMSCOPE(ELLMTag::Logging); - ZEN_MEMSCOPE(ELLMTag::Logging); std::error_code Ec; if (RotateOnOpen) { diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h index 9683ad720..df2033bca 100644 --- a/src/zenutil/include/zenutil/workerpools.h +++ b/src/zenutil/include/zenutil/workerpools.h @@ -21,6 +21,9 @@ WorkerThreadPool& GetMediumWorkerPool(EWorkloadType WorkloadType); // Worker pool with std::thread::hardware_concurrency() / 8 worker threads, but at least one thread WorkerThreadPool& GetSmallWorkerPool(EWorkloadType WorkloadType); +// Worker pool with minimum number of worker threads, but at least one thread +WorkerThreadPool& GetTinyWorkerPool(EWorkloadType WorkloadType); + // Special worker pool that does not use worker thread but issues all scheduled work on the calling thread // This is useful for debugging when multiple async thread can make stepping in debugger complicated WorkerThreadPool& GetSyncWorkerPool(); diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp index d70fd8c00..f2d190408 100644 --- a/src/zenutil/jupiter/jupiterbuildstorage.cpp +++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp @@ -49,7 +49,7 @@ public: { throw std::runtime_error(fmt::format("Failed listing builds: {} ({})", ListResult.Reason, ListResult.ErrorCode)); } - return PayloadToJson("Failed listing builds"sv, ListResult.Response); + return PayloadToCbObject("Failed listing builds"sv, ListResult.Response); } virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override @@ -66,7 +66,7 @@ public: { throw std::runtime_error(fmt::format("Failed creating build: {} ({})", PutResult.Reason, PutResult.ErrorCode)); } - return PayloadToJson(fmt::format("Failed creating build: {}", BuildId), PutResult.Response); + return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response); } virtual CbObject GetBuild(const Oid& BuildId) override @@ -81,7 +81,7 @@ public: { throw std::runtime_error(fmt::format("Failed fetching build: {} ({})", GetBuildResult.Reason, GetBuildResult.ErrorCode)); } - return PayloadToJson(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response); + return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response); } virtual void FinalizeBuild(const Oid& BuildId) override @@ -134,7 +134,7 @@ public: GetBuildPartResult.Reason, GetBuildPartResult.ErrorCode)); } - return PayloadToJson(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response); + return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response); } virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override @@ -289,22 +289,22 @@ public: } } - virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) override + virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override { ZEN_TRACE_CPU("Jupiter::FindBlocks"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult FindResult = m_Session.FindBlocks(m_Namespace, m_Bucket, BuildId); + JupiterResult FindResult = m_Session.FindBlocks(m_Namespace, m_Bucket, BuildId, MaxBlockCount); AddStatistic(FindResult); if (!FindResult.Success) { throw std::runtime_error(fmt::format("Failed fetching known blocks: {} ({})", FindResult.Reason, FindResult.ErrorCode)); } - return ParseChunkBlockDescriptionList(PayloadToJson("Failed fetching known blocks"sv, FindResult.Response)); + return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response); } - virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) override + virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override { ZEN_TRACE_CPU("Jupiter::GetBlockMetadata"); @@ -328,24 +328,7 @@ public: throw std::runtime_error( fmt::format("Failed fetching block metadatas: {} ({})", GetBlockMetadataResult.Reason, GetBlockMetadataResult.ErrorCode)); } - std::vector<ChunkBlockDescription> UnorderedList = - ParseChunkBlockDescriptionList(PayloadToJson("Failed fetching block metadatas", GetBlockMetadataResult.Response)); - tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup; - for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++) - { - const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex]; - BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex); - } - std::vector<ChunkBlockDescription> SortedBlockDescriptions; - SortedBlockDescriptions.reserve(BlockDescriptionLookup.size()); - for (const IoHash& BlockHash : BlockHashes) - { - if (auto It = BlockDescriptionLookup.find(BlockHash); It != BlockDescriptionLookup.end()) - { - SortedBlockDescriptions.push_back(std::move(UnorderedList[It->second])); - } - } - return SortedBlockDescriptions; + return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response); } virtual void PutBuildPartStats(const Oid& BuildId, @@ -373,7 +356,7 @@ public: } private: - static CbObject PayloadToJson(std::string_view Context, const IoBuffer& Payload) + static CbObject PayloadToCbObject(std::string_view Context, const IoBuffer& Payload) { if (Payload.GetContentType() == ZenContentType::kJSON) { diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp index 2e4fe5258..fde86a478 100644 --- a/src/zenutil/jupiter/jupitersession.cpp +++ b/src/zenutil/jupiter/jupitersession.cpp @@ -758,10 +758,12 @@ JupiterSession::FinalizeBuildPart(std::string_view Namespace, } JupiterResult -JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) +JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount) { - HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks", Namespace, BucketId, BuildId), - HttpClient::Accept(ZenContentType::kCbObject)); + const std::string Parameters = MaxBlockCount == (uint64_t)-1 ? "" : fmt::format("?count={}", MaxBlockCount); + HttpClient::Response Response = + m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks{}", Namespace, BucketId, BuildId, Parameters), + HttpClient::Accept(ZenContentType::kCbObject)); return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv); } diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp index e3165e838..797034978 100644 --- a/src/zenutil/workerpools.cpp +++ b/src/zenutil/workerpools.cpp @@ -11,9 +11,10 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { namespace { - const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(std::thread::hardware_concurrency()); + const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(Max(std::thread::hardware_concurrency() - 1u, 2u)); const int MediumWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 2u)); const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 8u), 1u)); + const int TinyWorkerThreadPoolTreadCount = 1; static bool IsShutDown = false; @@ -35,6 +36,9 @@ namespace { WorkerPool BurstSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(burst)"}; WorkerPool BackgroundSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(bkg)"}; + WorkerPool BurstTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "TinyThreadPool(burst)"}; + WorkerPool BackgroundTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "TinyThreadPool(bkg)"}; + WorkerPool SyncWorkerPool = {.TreadCount = 0, .Name = "SyncThreadPool"}; WorkerThreadPool& EnsurePoolPtr(WorkerPool& Pool) @@ -75,6 +79,12 @@ GetSmallWorkerPool(EWorkloadType WorkloadType) } WorkerThreadPool& +GetTinyWorkerPool(EWorkloadType WorkloadType) +{ + return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstTinyWorkerPool : BackgroundTinyWorkerPool); +} + +WorkerThreadPool& GetSyncWorkerPool() { return EnsurePoolPtr(SyncWorkerPool); @@ -91,6 +101,8 @@ ShutdownWorkerPools() BackgroundMediumWorkerPool.Pool.reset(); BurstSmallWorkerPool.Pool.reset(); BackgroundSmallWorkerPool.Pool.reset(); + BurstTinyWorkerPool.Pool.reset(); + BackgroundTinyWorkerPool.Pool.reset(); SyncWorkerPool.Pool.reset(); } } // namespace zen diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp index b36f11741..c0c2a754a 100644 --- a/src/zenutil/zenserverprocess.cpp +++ b/src/zenutil/zenserverprocess.cpp @@ -534,7 +534,7 @@ ZenServerEnvironment::CreateNewTestDir() TestDir << "test"sv << int64_t(ZenServerTestCounter.fetch_add(1)); std::filesystem::path TestPath = m_TestBaseDir / TestDir.c_str(); - ZEN_ASSERT(!std::filesystem::exists(TestPath)); + ZEN_ASSERT(!IsDir(TestPath)); ZEN_INFO("Creating new test dir @ '{}'", TestPath); @@ -568,7 +568,7 @@ ZenServerInstance::~ZenServerInstance() { Shutdown(); std::error_code DummyEc; - std::filesystem::remove(std::filesystem::temp_directory_path() / ("zenserver_" + m_Name + ".log"), DummyEc); + RemoveFile(std::filesystem::temp_directory_path() / ("zenserver_" + m_Name + ".log"), DummyEc); } catch (const std::exception& Err) { @@ -1033,7 +1033,7 @@ std::string ZenServerInstance::GetLogOutput() const { std::filesystem::path OutputPath = std::filesystem::temp_directory_path() / ("zenserver_" + m_Name + ".log"); - if (std::filesystem::is_regular_file(OutputPath)) + if (IsFile(OutputPath)) { FileContents Contents = ReadFile(OutputPath); if (!Contents.ErrorCode) @@ -1123,7 +1123,7 @@ ValidateLockFileInfo(const LockFileInfo& Info, std::string& OutReason) OutReason = fmt::format("listen port ({}) is not valid", Info.EffectiveListenPort); return false; } - if (!std::filesystem::is_directory(Info.DataDir)) + if (!IsDir(Info.DataDir)) { OutReason = fmt::format("data directory ('{}') does not exist", Info.DataDir); return false; |