aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
authorFlorent Devillechabrol <[email protected]>2025-04-02 10:38:02 -0700
committerGitHub Enterprise <[email protected]>2025-04-02 10:38:02 -0700
commit486a22ad2c61bc1616d8745e0077eb320089bfec (patch)
tree665d5c9002cd97c04ddffeaf211fcf8e55d01dce /src/zenutil
parentFixed missing trailing quote when converting binary data from compact binary ... (diff)
parentadded --find-max-block-count option to builds upload (#337) (diff)
downloadzen-486a22ad2c61bc1616d8745e0077eb320089bfec.tar.xz
zen-486a22ad2c61bc1616d8745e0077eb320089bfec.zip
Merge branch 'main' into fd-fix-binary-json
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/buildstoragecache.cpp407
-rw-r--r--src/zenutil/cache/rpcrecording.cpp8
-rw-r--r--src/zenutil/chunkblock.cpp2
-rw-r--r--src/zenutil/chunkedcontent.cpp26
-rw-r--r--src/zenutil/chunkingcontroller.cpp39
-rw-r--r--src/zenutil/filebuildstorage.cpp48
-rw-r--r--src/zenutil/include/zenutil/buildstorage.h6
-rw-r--r--src/zenutil/include/zenutil/buildstoragecache.h57
-rw-r--r--src/zenutil/include/zenutil/chunkingcontroller.h4
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupitersession.h2
-rw-r--r--src/zenutil/include/zenutil/logging/rotatingfilesink.h1
-rw-r--r--src/zenutil/include/zenutil/workerpools.h3
-rw-r--r--src/zenutil/jupiter/jupiterbuildstorage.cpp37
-rw-r--r--src/zenutil/jupiter/jupitersession.cpp8
-rw-r--r--src/zenutil/workerpools.cpp14
-rw-r--r--src/zenutil/zenserverprocess.cpp8
16 files changed, 603 insertions, 67 deletions
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp
new file mode 100644
index 000000000..f273ac699
--- /dev/null
+++ b/src/zenutil/buildstoragecache.cpp
@@ -0,0 +1,407 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/buildstoragecache.h>
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/fmtutils.h>
+#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
+#include <zenhttp/httpclient.h>
+#include <zenhttp/packageformat.h>
+#include <zenutil/workerpools.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_set.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+using namespace std::literals;
+
+class ZenBuildStorageCache : public BuildStorageCache
+{
+public:
+ explicit ZenBuildStorageCache(HttpClient& HttpClient,
+ BuildStorageCache::Statistics& Stats,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const std::filesystem::path& TempFolderPath,
+ bool BoostBackgroundThreadCount)
+ : m_HttpClient(HttpClient)
+ , m_Stats(Stats)
+ , m_Namespace(Namespace.empty() ? "none" : Namespace)
+ , m_Bucket(Bucket.empty() ? "none" : Bucket)
+ , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred())
+ , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount)
+ , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background)
+ : GetTinyWorkerPool(EWorkloadType::Background))
+ , m_PendingBackgroundWorkCount(1)
+ , m_CancelBackgroundWork(false)
+ {
+ }
+
+ virtual ~ZenBuildStorageCache()
+ {
+ try
+ {
+ m_CancelBackgroundWork.store(true);
+ if (!IsFlushed)
+ {
+ m_PendingBackgroundWorkCount.CountDown();
+ m_PendingBackgroundWorkCount.Wait();
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~ZenBuildStorageCache() failed with: {}", Ex.what());
+ }
+ }
+
+ void ScheduleBackgroundWork(std::function<void()>&& Work)
+ {
+ m_PendingBackgroundWorkCount.AddCount(1);
+ try
+ {
+ m_BackgroundWorkPool.ScheduleWork([this, Work = std::move(Work)]() {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork");
+ auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); });
+ if (!m_CancelBackgroundWork)
+ {
+ try
+ {
+ Work();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what());
+ }
+ }
+ });
+ }
+ catch (const std::exception& Ex)
+ {
+ m_PendingBackgroundWorkCount.CountDown();
+ ZEN_ERROR("Failed scheduling background upload to build cache. Reason: {}", Ex.what());
+ }
+ }
+
+ virtual void PutBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload) override
+ {
+ ZEN_ASSERT(!IsFlushed);
+ ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
+ ScheduleBackgroundWork(
+ [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob");
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ HttpClient::Response CacheResponse =
+ m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()),
+ Payload,
+ ContentType);
+ AddStatistic(CacheResponse);
+ if (!CacheResponse.IsSuccess())
+ {
+ ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv));
+ }
+ });
+ }
+
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override
+ {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::GetBuildBlob");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ HttpClient::KeyValueMap Headers;
+ if (RangeOffset != 0 || RangeBytes != (uint64_t)-1)
+ {
+ Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", RangeOffset, RangeOffset + RangeBytes - 1)});
+ }
+ CreateDirectories(m_TempFolderPath);
+ HttpClient::Response CacheResponse =
+ m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()),
+ m_TempFolderPath,
+ Headers);
+ AddStatistic(CacheResponse);
+ if (CacheResponse.IsSuccess())
+ {
+ return CacheResponse.ResponsePayload;
+ }
+ return {};
+ }
+
+ virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override
+ {
+ ZEN_ASSERT(!IsFlushed);
+ ScheduleBackgroundWork([this,
+ BuildId = Oid(BuildId),
+ BlobRawHashes = std::vector<IoHash>(BlobHashes.begin(), BlobHashes.end()),
+ MetaDatas = std::vector<CbObject>(MetaDatas.begin(), MetaDatas.end())]() {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::PutBlobMetadatas");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ const uint64_t BlobCount = BlobRawHashes.size();
+
+ CbPackage RequestPackage;
+ std::vector<CbAttachment> Attachments;
+ tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes;
+ Attachments.reserve(BlobCount);
+ AttachmentHashes.reserve(BlobCount);
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.BeginArray("blobHashes");
+ for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++)
+ {
+ RequestWriter.AddHash(BlobRawHashes[BlockHashIndex]);
+ }
+ RequestWriter.EndArray(); // blobHashes
+
+ RequestWriter.BeginArray("metadatas");
+ for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++)
+ {
+ const IoHash ObjectHash = MetaDatas[BlockHashIndex].GetHash();
+ RequestWriter.AddBinaryAttachment(ObjectHash);
+ if (!AttachmentHashes.contains(ObjectHash))
+ {
+ Attachments.push_back(CbAttachment(MetaDatas[BlockHashIndex], ObjectHash));
+ AttachmentHashes.insert(ObjectHash);
+ }
+ }
+
+ RequestWriter.EndArray(); // metadatas
+
+ RequestPackage.SetObject(RequestWriter.Save());
+ }
+ RequestPackage.AddAttachments(Attachments);
+
+ CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage);
+
+ HttpClient::Response CacheResponse =
+ m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/putBlobMetadata", m_Namespace, m_Bucket, BuildId),
+ RpcRequestBuffer,
+ ZenContentType::kCbPackage);
+ AddStatistic(CacheResponse);
+ if (!CacheResponse.IsSuccess())
+ {
+ ZEN_DEBUG("Failed posting blob metadata to cache: {}", CacheResponse.ErrorMessage(""sv));
+ }
+ });
+ }
+
+ virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) override
+ {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::GetBlobMetadatas");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ CbObjectWriter Request;
+
+ Request.BeginArray("blobHashes"sv);
+ for (const IoHash& BlobHash : BlobHashes)
+ {
+ Request.AddHash(BlobHash);
+ }
+ Request.EndArray();
+
+ IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+
+ HttpClient::Response Response =
+ m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/getBlobMetadata", m_Namespace, m_Bucket, BuildId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ AddStatistic(Response);
+ if (Response.IsSuccess())
+ {
+ std::vector<CbObject> Result;
+
+ CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload);
+ CbObject ResponseObject = ResponsePackage.GetObject();
+
+ CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView();
+ CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView();
+ Result.reserve(MetadatasArray.Num());
+ auto BlobHashesIt = BlobHashes.begin();
+ auto BlobHashArrayIt = begin(BlobHashArray);
+ auto MetadataArrayIt = begin(MetadatasArray);
+ while (MetadataArrayIt != end(MetadatasArray))
+ {
+ const IoHash BlobHash = (*BlobHashArrayIt).AsHash();
+ while (BlobHash != *BlobHashesIt)
+ {
+ ZEN_ASSERT(BlobHashesIt != BlobHashes.end());
+ BlobHashesIt++;
+ }
+
+ ZEN_ASSERT(BlobHash == *BlobHashesIt);
+
+ const IoHash MetaHash = (*MetadataArrayIt).AsAttachment();
+ const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash);
+ ZEN_ASSERT(MetaAttachment);
+
+ CbObject Metadata = MetaAttachment->AsObject();
+ Result.emplace_back(std::move(Metadata));
+
+ BlobHashArrayIt++;
+ MetadataArrayIt++;
+ BlobHashesIt++;
+ }
+ return Result;
+ }
+ return {};
+ }
+
+ virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) override
+ {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::BlobsExists");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ CbObjectWriter Request;
+
+ Request.BeginArray("blobHashes"sv);
+ for (const IoHash& BlobHash : BlobHashes)
+ {
+ Request.AddHash(BlobHash);
+ }
+ Request.EndArray();
+
+ IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+
+ HttpClient::Response Response = m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/exists", m_Namespace, m_Bucket, BuildId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ AddStatistic(Response);
+ if (Response.IsSuccess())
+ {
+ CbObject ResponseObject = LoadCompactBinaryObject(Response.ResponsePayload);
+ if (!ResponseObject)
+ {
+ throw std::runtime_error("BlobExists reponse is invalid, failed to load payload as compact binary object");
+ }
+ CbArrayView BlobsExistsArray = ResponseObject["blobExists"sv].AsArrayView();
+ if (!BlobsExistsArray)
+ {
+ throw std::runtime_error("BlobExists reponse is invalid, 'blobExists' array is missing");
+ }
+ if (BlobsExistsArray.Num() != BlobHashes.size())
+ {
+ throw std::runtime_error(fmt::format("BlobExists reponse is invalid, 'blobExists' array contains {} entries, expected {}",
+ BlobsExistsArray.Num(),
+ BlobHashes.size()));
+ }
+
+ CbArrayView MetadatasExistsArray = ResponseObject["metadataExists"sv].AsArrayView();
+ if (!MetadatasExistsArray)
+ {
+ throw std::runtime_error("BlobExists reponse is invalid, 'metadataExists' array is missing");
+ }
+ if (MetadatasExistsArray.Num() != BlobHashes.size())
+ {
+ throw std::runtime_error(
+ fmt::format("BlobExists reponse is invalid, 'metadataExists' array contains {} entries, expected {}",
+ MetadatasExistsArray.Num(),
+ BlobHashes.size()));
+ }
+
+ std::vector<BlobExistsResult> Result;
+ Result.reserve(BlobHashes.size());
+ auto BlobExistsIt = begin(BlobsExistsArray);
+ auto MetadataExistsIt = begin(MetadatasExistsArray);
+ while (BlobExistsIt != end(BlobsExistsArray))
+ {
+ ZEN_ASSERT(MetadataExistsIt != end(MetadatasExistsArray));
+
+ const bool HasBody = (*BlobExistsIt).AsBool();
+ const bool HasMetadata = (*MetadataExistsIt).AsBool();
+
+ Result.push_back({.HasBody = HasBody, .HasMetadata = HasMetadata});
+
+ BlobExistsIt++;
+ MetadataExistsIt++;
+ }
+ return Result;
+ }
+ return {};
+ }
+
+ virtual void Flush(int32_t UpdateInteralMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override
+ {
+ if (IsFlushed)
+ {
+ return;
+ }
+ if (!IsFlushed)
+ {
+ m_PendingBackgroundWorkCount.CountDown();
+ IsFlushed = true;
+ }
+ if (m_PendingBackgroundWorkCount.Wait(100))
+ {
+ return;
+ }
+ while (true)
+ {
+ intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining();
+ if (UpdateCallback(Remaining))
+ {
+ if (m_PendingBackgroundWorkCount.Wait(UpdateInteralMS))
+ {
+ UpdateCallback(0);
+ return;
+ }
+ }
+ else
+ {
+ m_CancelBackgroundWork.store(true);
+ }
+ }
+ }
+
+private:
+ void AddStatistic(const HttpClient::Response& Result)
+ {
+ m_Stats.TotalBytesWritten += Result.UploadedBytes;
+ m_Stats.TotalBytesRead += Result.DownloadedBytes;
+ m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0);
+ m_Stats.TotalRequestCount++;
+ }
+
+ HttpClient& m_HttpClient;
+ BuildStorageCache::Statistics& m_Stats;
+ const std::string m_Namespace;
+ const std::string m_Bucket;
+ const std::filesystem::path m_TempFolderPath;
+ const bool m_BoostBackgroundThreadCount;
+ bool IsFlushed = false;
+
+ WorkerThreadPool& m_BackgroundWorkPool;
+ Latch m_PendingBackgroundWorkCount;
+ std::atomic<bool> m_CancelBackgroundWork;
+};
+
+std::unique_ptr<BuildStorageCache>
+CreateZenBuildStorageCache(HttpClient& HttpClient,
+ BuildStorageCache::Statistics& Stats,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const std::filesystem::path& TempFolderPath,
+ bool BoostBackgroundThreadCount)
+{
+ return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount);
+}
+
+} // namespace zen
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
index 1f951167d..380c182b2 100644
--- a/src/zenutil/cache/rpcrecording.cpp
+++ b/src/zenutil/cache/rpcrecording.cpp
@@ -46,7 +46,7 @@ struct RecordedRequestsWriter
void BeginWrite(const std::filesystem::path& BasePath)
{
m_BasePath = BasePath;
- std::filesystem::create_directories(m_BasePath);
+ CreateDirectories(m_BasePath);
}
void EndWrite()
@@ -426,7 +426,7 @@ RecordedRequestsSegmentWriter::BeginWrite(const std::filesystem::path& BasePath,
m_BasePath = BasePath;
m_SegmentIndex = SegmentIndex;
m_RequestBaseIndex = RequestBaseIndex;
- std::filesystem::create_directories(m_BasePath);
+ CreateDirectories(m_BasePath);
}
void
@@ -1051,14 +1051,14 @@ public:
static bool IsCompatible(const std::filesystem::path& BasePath)
{
- if (std::filesystem::exists(BasePath / "rpc_recording_info.zcb"))
+ if (IsFile(BasePath / "rpc_recording_info.zcb"))
{
return true;
}
const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0);
- if (std::filesystem::exists(SegmentZero / "rpc_segment_info.zcb") && std::filesystem::exists(SegmentZero / "index.bin"))
+ if (IsFile(SegmentZero / "rpc_segment_info.zcb") && IsFile(SegmentZero / "index.bin"))
{
// top-level metadata is missing, possibly because of premature exit
// on the recording side
diff --git a/src/zenutil/chunkblock.cpp b/src/zenutil/chunkblock.cpp
index f3c14edc4..abfc0fb63 100644
--- a/src/zenutil/chunkblock.cpp
+++ b/src/zenutil/chunkblock.cpp
@@ -52,7 +52,7 @@ ParseChunkBlockDescriptionList(const CbObjectView& BlocksObject)
return {};
}
std::vector<ChunkBlockDescription> Result;
- CbArrayView Blocks = BlocksObject["blocks"].AsArrayView();
+ CbArrayView Blocks = BlocksObject["blocks"sv].AsArrayView();
Result.reserve(Blocks.Num());
for (CbFieldView BlockView : Blocks)
{
diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp
index bb1ee5183..32ae2d94a 100644
--- a/src/zenutil/chunkedcontent.cpp
+++ b/src/zenutil/chunkedcontent.cpp
@@ -140,8 +140,12 @@ namespace {
{
ZEN_TRACE_CPU("HashOnly");
- IoBuffer Buffer = IoBufferBuilder::MakeFromFile((FolderPath / Path).make_preferred());
- const IoHash Hash = IoHash::HashBuffer(Buffer, &Stats.BytesHashed);
+ IoBuffer Buffer = IoBufferBuilder::MakeFromFile((FolderPath / Path).make_preferred());
+ if (Buffer.GetSize() != RawSize)
+ {
+ throw std::runtime_error(fmt::format("Failed opening file '{}' for hashing", FolderPath / Path));
+ }
+ const IoHash Hash = IoHash::HashBuffer(Buffer, &Stats.BytesHashed);
Lock.WithExclusiveLock([&]() {
if (!RawHashToSequenceRawHashIndex.contains(Hash))
@@ -304,14 +308,22 @@ FolderContent
GetUpdatedContent(const FolderContent& Old, const FolderContent& New, std::vector<std::filesystem::path>& OutDeletedPathIndexes)
{
ZEN_TRACE_CPU("FolderContent::GetUpdatedContent");
- FolderContent Result = {.Platform = Old.Platform};
+
+ const uint32_t NewPathCount = gsl::narrow<uint32_t>(New.Paths.size());
+
+ FolderContent Result = {.Platform = Old.Platform};
+ Result.Paths.reserve(NewPathCount);
+ Result.RawSizes.reserve(NewPathCount);
+ Result.Attributes.reserve(NewPathCount);
+ Result.ModificationTicks.reserve(NewPathCount);
+
tsl::robin_map<std::string, uint32_t> NewPathToIndex;
- const uint32_t NewPathCount = gsl::narrow<uint32_t>(New.Paths.size());
NewPathToIndex.reserve(NewPathCount);
for (uint32_t NewPathIndex = 0; NewPathIndex < NewPathCount; NewPathIndex++)
{
NewPathToIndex.insert({New.Paths[NewPathIndex].generic_string(), NewPathIndex});
}
+
uint32_t OldPathCount = gsl::narrow<uint32_t>(Old.Paths.size());
for (uint32_t OldPathIndex = 0; OldPathIndex < OldPathCount; OldPathIndex++)
{
@@ -667,6 +679,12 @@ DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span
const ChunkedContentLookup BaseLookup = BuildChunkedContentLookup(BaseContent);
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
+ const size_t ExpectedCount = BaseContent.Paths.size() - DeletedPaths.size();
+ Result.Paths.reserve(ExpectedCount);
+ Result.RawSizes.reserve(ExpectedCount);
+ Result.Attributes.reserve(ExpectedCount);
+ Result.RawHashes.reserve(ExpectedCount);
+
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceRawHashIndex;
for (uint32_t PathIndex = 0; PathIndex < BaseContent.Paths.size(); PathIndex++)
{
diff --git a/src/zenutil/chunkingcontroller.cpp b/src/zenutil/chunkingcontroller.cpp
index 2a7057a46..a5ebce193 100644
--- a/src/zenutil/chunkingcontroller.cpp
+++ b/src/zenutil/chunkingcontroller.cpp
@@ -41,9 +41,13 @@ class BasicChunkingController : public ChunkingController
{
public:
BasicChunkingController(std::span<const std::string_view> ExcludeExtensions,
+ bool ExcludeElfFiles,
+ bool ExcludeMachOFiles,
uint64_t ChunkFileSizeLimit,
const ChunkedParams& ChunkingParams)
: m_ChunkExcludeExtensions(ExcludeExtensions.begin(), ExcludeExtensions.end())
+ , m_ExcludeElfFiles(ExcludeElfFiles)
+ , m_ExcludeMachOFiles(ExcludeMachOFiles)
, m_ChunkFileSizeLimit(ChunkFileSizeLimit)
, m_ChunkingParams(ChunkingParams)
{
@@ -51,6 +55,8 @@ public:
BasicChunkingController(CbObjectView Parameters)
: m_ChunkExcludeExtensions(ReadStringArray(Parameters["ChunkExcludeExtensions"sv].AsArrayView()))
+ , m_ExcludeElfFiles(Parameters["ExcludeElfFiles"sv].AsBool(DefaultChunkingExcludeElfFiles))
+ , m_ExcludeMachOFiles(Parameters["ExcludeMachOFiles"sv].AsBool(DefaultChunkingExcludeMachOFiles))
, m_ChunkFileSizeLimit(Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit))
, m_ChunkingParams(ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView()))
{
@@ -73,6 +79,25 @@ public:
}
BasicFile Buffer(InputPath, BasicFile::Mode::kRead);
+ if (m_ExcludeElfFiles && Buffer.FileSize() > 4)
+ {
+ uint32_t ElfCheck = 0;
+ Buffer.Read(&ElfCheck, 4, 0);
+ if (ElfCheck == 0x464c457f)
+ {
+ return false;
+ }
+ }
+ if (m_ExcludeMachOFiles && Buffer.FileSize() > 4)
+ {
+ uint32_t MachOCheck = 0;
+ Buffer.Read(&MachOCheck, 4, 0);
+ if ((MachOCheck == 0xfeedface) || (MachOCheck == 0xcefaedfe))
+ {
+ return false;
+ }
+ }
+
OutChunked = ChunkData(Buffer, 0, RawSize, m_ChunkingParams, &BytesProcessed, &AbortFlag);
return true;
}
@@ -90,6 +115,10 @@ public:
}
}
Writer.EndArray(); // ChunkExcludeExtensions
+
+ Writer.AddBool("ExcludeElfFiles"sv, m_ExcludeElfFiles);
+ Writer.AddBool("ExcludeMachOFiles"sv, m_ExcludeMachOFiles);
+
Writer.AddInteger("ChunkFileSizeLimit"sv, m_ChunkFileSizeLimit);
Writer.BeginObject("ChunkingParams"sv);
{
@@ -106,6 +135,8 @@ public:
protected:
const std::vector<std::string> m_ChunkExcludeExtensions;
+ const bool m_ExcludeElfFiles = false;
+ const bool m_ExcludeMachOFiles = false;
const uint64_t m_ChunkFileSizeLimit;
const ChunkedParams m_ChunkingParams;
};
@@ -230,10 +261,16 @@ protected:
std::unique_ptr<ChunkingController>
CreateBasicChunkingController(std::span<const std::string_view> ExcludeExtensions,
+ bool ExcludeElfFiles,
+ bool ExcludeMachOFiles,
uint64_t ChunkFileSizeLimit,
const ChunkedParams& ChunkingParams)
{
- return std::make_unique<BasicChunkingController>(ExcludeExtensions, ChunkFileSizeLimit, ChunkingParams);
+ return std::make_unique<BasicChunkingController>(ExcludeExtensions,
+ ExcludeElfFiles,
+ ExcludeMachOFiles,
+ ChunkFileSizeLimit,
+ ChunkingParams);
}
std::unique_ptr<ChunkingController>
CreateBasicChunkingController(CbObjectView Parameters)
diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp
index 130fec355..7aa252e44 100644
--- a/src/zenutil/filebuildstorage.cpp
+++ b/src/zenutil/filebuildstorage.cpp
@@ -235,7 +235,7 @@ public:
m_Stats.TotalRequestCount++;
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (!std::filesystem::is_regular_file(BlockPath))
+ if (!IsFile(BlockPath))
{
CreateDirectories(BlockPath.parent_path());
TemporaryFile::SafeWriteFile(BlockPath, Payload.Flatten().GetView());
@@ -260,7 +260,7 @@ public:
m_Stats.TotalRequestCount++;
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (!std::filesystem::is_regular_file(BlockPath))
+ if (!IsFile(BlockPath))
{
CreateDirectories(BlockPath.parent_path());
@@ -346,7 +346,7 @@ public:
m_Stats.TotalRequestCount++;
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (std::filesystem::is_regular_file(BlockPath))
+ if (IsFile(BlockPath))
{
BasicFile File(BlockPath, BasicFile::Mode::kRead);
IoBuffer Payload;
@@ -383,7 +383,7 @@ public:
m_Stats.TotalRequestCount++;
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (std::filesystem::is_regular_file(BlockPath))
+ if (IsFile(BlockPath))
{
struct WorkloadData
{
@@ -442,63 +442,77 @@ public:
SimulateLatency(0, 0);
}
- virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) override
+ virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override
{
ZEN_TRACE_CPU("FileBuildStorage::FindBlocks");
ZEN_UNUSED(BuildId);
- SimulateLatency(0, 0);
+ SimulateLatency(sizeof(BuildId), 0);
Stopwatch ExecutionTimer;
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
m_Stats.TotalRequestCount++;
+ uint64_t FoundCount = 0;
+
DirectoryContent Content;
GetDirectoryContent(GetBlobsMetadataFolder(), DirectoryContentFlags::IncludeFiles, Content);
- std::vector<ChunkBlockDescription> Result;
+ CbObjectWriter Writer;
+ Writer.BeginArray("blocks");
for (const std::filesystem::path& MetaDataFile : Content.Files)
{
IoHash ChunkHash;
if (IoHash::TryParse(MetaDataFile.stem().string(), ChunkHash))
{
std::filesystem::path BlockPath = GetBlobPayloadPath(ChunkHash);
- if (std::filesystem::is_regular_file(BlockPath))
+ if (IsFile(BlockPath))
{
IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten();
m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize();
CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
- Result.emplace_back(ParseChunkBlockDescription(BlockObject));
+ Writer.AddObject(BlockObject);
+ FoundCount++;
+ if (FoundCount == MaxBlockCount)
+ {
+ break;
+ }
}
}
}
- SimulateLatency(0, sizeof(IoHash) * Result.size());
+ Writer.EndArray(); // blocks
+ CbObject Result = Writer.Save();
+ SimulateLatency(0, Result.GetSize());
return Result;
}
- virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
+ virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
{
ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata");
ZEN_UNUSED(BuildId);
- SimulateLatency(0, 0);
+ SimulateLatency(sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(), 0);
Stopwatch ExecutionTimer;
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
m_Stats.TotalRequestCount++;
- std::vector<ChunkBlockDescription> Result;
+ CbObjectWriter Writer;
+ Writer.BeginArray("blocks");
+
for (const IoHash& BlockHash : BlockHashes)
{
std::filesystem::path MetaDataFile = GetBlobMetadataPath(BlockHash);
- if (std::filesystem::is_regular_file(MetaDataFile))
+ if (IsFile(MetaDataFile))
{
IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten();
m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize();
CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
- Result.emplace_back(ParseChunkBlockDescription(BlockObject));
+ Writer.AddObject(BlockObject);
}
}
- SimulateLatency(sizeof(BlockHashes) * BlockHashes.size(), sizeof(ChunkBlockDescription) * Result.size());
+ Writer.EndArray(); // blocks
+ CbObject Result = Writer.Save();
+ SimulateLatency(0, Result.GetSize());
return Result;
}
@@ -616,7 +630,7 @@ protected:
BuildPartObject.IterateAttachments([&](CbFieldView FieldView) {
const IoHash AttachmentHash = FieldView.AsBinaryAttachment();
const std::filesystem::path BlockPath = GetBlobPayloadPath(AttachmentHash);
- if (!std::filesystem::is_regular_file(BlockPath))
+ if (!IsFile(BlockPath))
{
NeededAttachments.push_back(AttachmentHash);
}
diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h
index 2ebd65a00..b0665dbf8 100644
--- a/src/zenutil/include/zenutil/buildstorage.h
+++ b/src/zenutil/include/zenutil/buildstorage.h
@@ -54,9 +54,9 @@ public:
uint64_t ChunkSize,
std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) = 0;
- virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0;
- virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) = 0;
- virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0;
+ virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0;
+ virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) = 0;
+ virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0;
virtual void PutBuildPartStats(const Oid& BuildId, const Oid& BuildPartId, const tsl::robin_map<std::string, double>& FloatStats) = 0;
};
diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h
new file mode 100644
index 000000000..cab35328d
--- /dev/null
+++ b/src/zenutil/include/zenutil/buildstoragecache.h
@@ -0,0 +1,57 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/logging.h>
+
+#include <zencore/compactbinary.h>
+#include <zencore/compositebuffer.h>
+#include <zenutil/chunkblock.h>
+
+namespace zen {
+
+class HttpClient;
+
+class BuildStorageCache
+{
+public:
+ struct Statistics
+ {
+ std::atomic<uint64_t> TotalBytesRead = 0;
+ std::atomic<uint64_t> TotalBytesWritten = 0;
+ std::atomic<uint64_t> TotalRequestCount = 0;
+ std::atomic<uint64_t> TotalRequestTimeUs = 0;
+ std::atomic<uint64_t> TotalExecutionTimeUs = 0;
+ };
+
+ virtual ~BuildStorageCache() {}
+
+ virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) = 0;
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t RangeOffset = 0,
+ uint64_t RangeBytes = (uint64_t)-1) = 0;
+
+ virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) = 0;
+ virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0;
+
+ struct BlobExistsResult
+ {
+ bool HasBody = 0;
+ bool HasMetadata = 0;
+ };
+
+ virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0;
+
+ virtual void Flush(
+ int32_t UpdateInteralMS,
+ std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0;
+};
+
+std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& HttpClient,
+ BuildStorageCache::Statistics& Stats,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const std::filesystem::path& TempFolderPath,
+ bool BoostBackgroundThreadCount);
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/chunkingcontroller.h b/src/zenutil/include/zenutil/chunkingcontroller.h
index 246f4498a..970917fb0 100644
--- a/src/zenutil/include/zenutil/chunkingcontroller.h
+++ b/src/zenutil/include/zenutil/chunkingcontroller.h
@@ -12,6 +12,8 @@
namespace zen {
const std::vector<std::string_view> DefaultChunkingExcludeExtensions = {".exe", ".dll", ".pdb", ".self", ".mp4"};
+const bool DefaultChunkingExcludeElfFiles = true;
+const bool DefaultChunkingExcludeMachOFiles = true;
const ChunkedParams DefaultChunkedParams = {.MinSize = ((8u * 1u) * 1024u) - 128u,
.MaxSize = 128u * 1024u,
@@ -40,6 +42,8 @@ public:
std::unique_ptr<ChunkingController> CreateBasicChunkingController(
std::span<const std::string_view> ExcludeExtensions = DefaultChunkingExcludeExtensions,
+ bool ExcludeElfFiles = DefaultChunkingExcludeElfFiles,
+ bool ExcludeMachOFiles = DefaultChunkingExcludeMachOFiles,
uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit,
const ChunkedParams& ChunkingParams = DefaultChunkedParams);
std::unique_ptr<ChunkingController> CreateBasicChunkingController(CbObjectView Parameters);
diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h
index fda4a7bfe..417ed7384 100644
--- a/src/zenutil/include/zenutil/jupiter/jupitersession.h
+++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h
@@ -152,7 +152,7 @@ public:
const Oid& BuildId,
const Oid& PartId,
const IoHash& RawHash);
- JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount);
JupiterResult GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload);
JupiterResult PutBuildPartStats(std::string_view Namespace,
diff --git a/src/zenutil/include/zenutil/logging/rotatingfilesink.h b/src/zenutil/include/zenutil/logging/rotatingfilesink.h
index 758722156..cd28bdcb2 100644
--- a/src/zenutil/include/zenutil/logging/rotatingfilesink.h
+++ b/src/zenutil/include/zenutil/logging/rotatingfilesink.h
@@ -27,7 +27,6 @@ public:
{
ZEN_MEMSCOPE(ELLMTag::Logging);
- ZEN_MEMSCOPE(ELLMTag::Logging);
std::error_code Ec;
if (RotateOnOpen)
{
diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h
index 9683ad720..df2033bca 100644
--- a/src/zenutil/include/zenutil/workerpools.h
+++ b/src/zenutil/include/zenutil/workerpools.h
@@ -21,6 +21,9 @@ WorkerThreadPool& GetMediumWorkerPool(EWorkloadType WorkloadType);
// Worker pool with std::thread::hardware_concurrency() / 8 worker threads, but at least one thread
WorkerThreadPool& GetSmallWorkerPool(EWorkloadType WorkloadType);
+// Worker pool with minimum number of worker threads, but at least one thread
+WorkerThreadPool& GetTinyWorkerPool(EWorkloadType WorkloadType);
+
// Special worker pool that does not use worker thread but issues all scheduled work on the calling thread
// This is useful for debugging when multiple async thread can make stepping in debugger complicated
WorkerThreadPool& GetSyncWorkerPool();
diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp
index d70fd8c00..f2d190408 100644
--- a/src/zenutil/jupiter/jupiterbuildstorage.cpp
+++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp
@@ -49,7 +49,7 @@ public:
{
throw std::runtime_error(fmt::format("Failed listing builds: {} ({})", ListResult.Reason, ListResult.ErrorCode));
}
- return PayloadToJson("Failed listing builds"sv, ListResult.Response);
+ return PayloadToCbObject("Failed listing builds"sv, ListResult.Response);
}
virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override
@@ -66,7 +66,7 @@ public:
{
throw std::runtime_error(fmt::format("Failed creating build: {} ({})", PutResult.Reason, PutResult.ErrorCode));
}
- return PayloadToJson(fmt::format("Failed creating build: {}", BuildId), PutResult.Response);
+ return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response);
}
virtual CbObject GetBuild(const Oid& BuildId) override
@@ -81,7 +81,7 @@ public:
{
throw std::runtime_error(fmt::format("Failed fetching build: {} ({})", GetBuildResult.Reason, GetBuildResult.ErrorCode));
}
- return PayloadToJson(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response);
+ return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response);
}
virtual void FinalizeBuild(const Oid& BuildId) override
@@ -134,7 +134,7 @@ public:
GetBuildPartResult.Reason,
GetBuildPartResult.ErrorCode));
}
- return PayloadToJson(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response);
+ return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response);
}
virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override
@@ -289,22 +289,22 @@ public:
}
}
- virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) override
+ virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override
{
ZEN_TRACE_CPU("Jupiter::FindBlocks");
Stopwatch ExecutionTimer;
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- JupiterResult FindResult = m_Session.FindBlocks(m_Namespace, m_Bucket, BuildId);
+ JupiterResult FindResult = m_Session.FindBlocks(m_Namespace, m_Bucket, BuildId, MaxBlockCount);
AddStatistic(FindResult);
if (!FindResult.Success)
{
throw std::runtime_error(fmt::format("Failed fetching known blocks: {} ({})", FindResult.Reason, FindResult.ErrorCode));
}
- return ParseChunkBlockDescriptionList(PayloadToJson("Failed fetching known blocks"sv, FindResult.Response));
+ return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response);
}
- virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
+ virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
{
ZEN_TRACE_CPU("Jupiter::GetBlockMetadata");
@@ -328,24 +328,7 @@ public:
throw std::runtime_error(
fmt::format("Failed fetching block metadatas: {} ({})", GetBlockMetadataResult.Reason, GetBlockMetadataResult.ErrorCode));
}
- std::vector<ChunkBlockDescription> UnorderedList =
- ParseChunkBlockDescriptionList(PayloadToJson("Failed fetching block metadatas", GetBlockMetadataResult.Response));
- tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup;
- for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++)
- {
- const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex];
- BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex);
- }
- std::vector<ChunkBlockDescription> SortedBlockDescriptions;
- SortedBlockDescriptions.reserve(BlockDescriptionLookup.size());
- for (const IoHash& BlockHash : BlockHashes)
- {
- if (auto It = BlockDescriptionLookup.find(BlockHash); It != BlockDescriptionLookup.end())
- {
- SortedBlockDescriptions.push_back(std::move(UnorderedList[It->second]));
- }
- }
- return SortedBlockDescriptions;
+ return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response);
}
virtual void PutBuildPartStats(const Oid& BuildId,
@@ -373,7 +356,7 @@ public:
}
private:
- static CbObject PayloadToJson(std::string_view Context, const IoBuffer& Payload)
+ static CbObject PayloadToCbObject(std::string_view Context, const IoBuffer& Payload)
{
if (Payload.GetContentType() == ZenContentType::kJSON)
{
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp
index 2e4fe5258..fde86a478 100644
--- a/src/zenutil/jupiter/jupitersession.cpp
+++ b/src/zenutil/jupiter/jupitersession.cpp
@@ -758,10 +758,12 @@ JupiterSession::FinalizeBuildPart(std::string_view Namespace,
}
JupiterResult
-JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
+JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount)
{
- HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks", Namespace, BucketId, BuildId),
- HttpClient::Accept(ZenContentType::kCbObject));
+ const std::string Parameters = MaxBlockCount == (uint64_t)-1 ? "" : fmt::format("?count={}", MaxBlockCount);
+ HttpClient::Response Response =
+ m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks{}", Namespace, BucketId, BuildId, Parameters),
+ HttpClient::Accept(ZenContentType::kCbObject));
return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv);
}
diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp
index e3165e838..797034978 100644
--- a/src/zenutil/workerpools.cpp
+++ b/src/zenutil/workerpools.cpp
@@ -11,9 +11,10 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
namespace {
- const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(std::thread::hardware_concurrency());
+ const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(Max(std::thread::hardware_concurrency() - 1u, 2u));
const int MediumWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 2u));
const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 8u), 1u));
+ const int TinyWorkerThreadPoolTreadCount = 1;
static bool IsShutDown = false;
@@ -35,6 +36,9 @@ namespace {
WorkerPool BurstSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(burst)"};
WorkerPool BackgroundSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(bkg)"};
+ WorkerPool BurstTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "TinyThreadPool(burst)"};
+ WorkerPool BackgroundTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "TinyThreadPool(bkg)"};
+
WorkerPool SyncWorkerPool = {.TreadCount = 0, .Name = "SyncThreadPool"};
WorkerThreadPool& EnsurePoolPtr(WorkerPool& Pool)
@@ -75,6 +79,12 @@ GetSmallWorkerPool(EWorkloadType WorkloadType)
}
WorkerThreadPool&
+GetTinyWorkerPool(EWorkloadType WorkloadType)
+{
+ return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstTinyWorkerPool : BackgroundTinyWorkerPool);
+}
+
+WorkerThreadPool&
GetSyncWorkerPool()
{
return EnsurePoolPtr(SyncWorkerPool);
@@ -91,6 +101,8 @@ ShutdownWorkerPools()
BackgroundMediumWorkerPool.Pool.reset();
BurstSmallWorkerPool.Pool.reset();
BackgroundSmallWorkerPool.Pool.reset();
+ BurstTinyWorkerPool.Pool.reset();
+ BackgroundTinyWorkerPool.Pool.reset();
SyncWorkerPool.Pool.reset();
}
} // namespace zen
diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp
index b36f11741..c0c2a754a 100644
--- a/src/zenutil/zenserverprocess.cpp
+++ b/src/zenutil/zenserverprocess.cpp
@@ -534,7 +534,7 @@ ZenServerEnvironment::CreateNewTestDir()
TestDir << "test"sv << int64_t(ZenServerTestCounter.fetch_add(1));
std::filesystem::path TestPath = m_TestBaseDir / TestDir.c_str();
- ZEN_ASSERT(!std::filesystem::exists(TestPath));
+ ZEN_ASSERT(!IsDir(TestPath));
ZEN_INFO("Creating new test dir @ '{}'", TestPath);
@@ -568,7 +568,7 @@ ZenServerInstance::~ZenServerInstance()
{
Shutdown();
std::error_code DummyEc;
- std::filesystem::remove(std::filesystem::temp_directory_path() / ("zenserver_" + m_Name + ".log"), DummyEc);
+ RemoveFile(std::filesystem::temp_directory_path() / ("zenserver_" + m_Name + ".log"), DummyEc);
}
catch (const std::exception& Err)
{
@@ -1033,7 +1033,7 @@ std::string
ZenServerInstance::GetLogOutput() const
{
std::filesystem::path OutputPath = std::filesystem::temp_directory_path() / ("zenserver_" + m_Name + ".log");
- if (std::filesystem::is_regular_file(OutputPath))
+ if (IsFile(OutputPath))
{
FileContents Contents = ReadFile(OutputPath);
if (!Contents.ErrorCode)
@@ -1123,7 +1123,7 @@ ValidateLockFileInfo(const LockFileInfo& Info, std::string& OutReason)
OutReason = fmt::format("listen port ({}) is not valid", Info.EffectiveListenPort);
return false;
}
- if (!std::filesystem::is_directory(Info.DataDir))
+ if (!IsDir(Info.DataDir))
{
OutReason = fmt::format("data directory ('{}') does not exist", Info.DataDir);
return false;