aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/bufferedwritefilecache.cpp177
-rw-r--r--src/zenutil/buildstoragecache.cpp407
-rw-r--r--src/zenutil/cache/rpcrecording.cpp14
-rw-r--r--src/zenutil/chunkblock.cpp2
-rw-r--r--src/zenutil/chunkedcontent.cpp185
-rw-r--r--src/zenutil/chunkingcontroller.cpp266
-rw-r--r--src/zenutil/commandlineoptions.cpp221
-rw-r--r--src/zenutil/environmentoptions.cpp84
-rw-r--r--src/zenutil/filebuildstorage.cpp148
-rw-r--r--src/zenutil/include/zenutil/bufferedwritefilecache.h106
-rw-r--r--src/zenutil/include/zenutil/buildstorage.h27
-rw-r--r--src/zenutil/include/zenutil/buildstoragecache.h57
-rw-r--r--src/zenutil/include/zenutil/chunkedcontent.h57
-rw-r--r--src/zenutil/include/zenutil/chunkingcontroller.h41
-rw-r--r--src/zenutil/include/zenutil/commandlineoptions.h29
-rw-r--r--src/zenutil/include/zenutil/environmentoptions.h92
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h1
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupitersession.h28
-rw-r--r--src/zenutil/include/zenutil/logging/rotatingfilesink.h1
-rw-r--r--src/zenutil/include/zenutil/parallellwork.h117
-rw-r--r--src/zenutil/include/zenutil/parallelwork.h77
-rw-r--r--src/zenutil/include/zenutil/workerpools.h3
-rw-r--r--src/zenutil/jupiter/jupiterbuildstorage.cpp141
-rw-r--r--src/zenutil/jupiter/jupitersession.cpp177
-rw-r--r--src/zenutil/parallelwork.cpp225
-rw-r--r--src/zenutil/workerpools.cpp14
-rw-r--r--src/zenutil/zenserverprocess.cpp16
-rw-r--r--src/zenutil/zenutil.cpp4
28 files changed, 2263 insertions, 454 deletions
diff --git a/src/zenutil/bufferedwritefilecache.cpp b/src/zenutil/bufferedwritefilecache.cpp
new file mode 100644
index 000000000..a52850314
--- /dev/null
+++ b/src/zenutil/bufferedwritefilecache.cpp
@@ -0,0 +1,177 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/bufferedwritefilecache.h>
+
+#include <zencore/logging.h>
+#include <zencore/trace.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+BufferedWriteFileCache::BufferedWriteFileCache() : m_CacheHitCount(0), m_CacheMissCount(0), m_OpenHandleCount(0), m_DroppedHandleCount(0)
+{
+}
+
+BufferedWriteFileCache::~BufferedWriteFileCache()
+{
+ ZEN_TRACE_CPU("~BufferedWriteFileCache()");
+
+ try
+ {
+ for (TOpenHandles& OpenHandles : m_OpenFiles)
+ {
+ while (BasicFile* File = OpenHandles.Pop())
+ {
+ std::unique_ptr<BasicFile> FileToClose(File);
+ m_OpenHandleCount--;
+ }
+ }
+ m_OpenFiles.clear();
+ m_ChunkWriters.clear();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~BufferedWriteFileCache() threw exeption: {}", Ex.what());
+ }
+}
+
+std::unique_ptr<BasicFile>
+BufferedWriteFileCache::Get(uint32_t FileIndex)
+{
+ ZEN_TRACE_CPU("BufferedWriteFileCache::Get");
+
+ RwLock::ExclusiveLockScope _(m_WriterLock);
+ if (auto It = m_ChunkWriters.find(FileIndex); It != m_ChunkWriters.end())
+ {
+ const uint32_t HandleIndex = It->second;
+ TOpenHandles& OpenHandles = m_OpenFiles[HandleIndex];
+ if (BasicFile* File = OpenHandles.Pop(); File != nullptr)
+ {
+ m_OpenHandleCount--;
+ m_CacheHitCount++;
+ return std::unique_ptr<BasicFile>(File);
+ }
+ }
+ m_CacheMissCount++;
+ return nullptr;
+}
+
+void
+BufferedWriteFileCache::Put(uint32_t FileIndex, std::unique_ptr<BasicFile>&& Writer)
+{
+ ZEN_TRACE_CPU("BufferedWriteFileCache::Put");
+
+ if (m_OpenHandleCount.load() >= MaxBufferedCount)
+ {
+ m_DroppedHandleCount++;
+ return;
+ }
+ RwLock::ExclusiveLockScope _(m_WriterLock);
+ if (auto It = m_ChunkWriters.find(FileIndex); It != m_ChunkWriters.end())
+ {
+ const uint32_t HandleIndex = It->second;
+ TOpenHandles& OpenHandles = m_OpenFiles[HandleIndex];
+ if (OpenHandles.Push(Writer.get()))
+ {
+ Writer.release();
+ m_OpenHandleCount++;
+ }
+ else
+ {
+ m_DroppedHandleCount++;
+ }
+ }
+ else
+ {
+ const uint32_t HandleIndex = gsl::narrow<uint32_t>(m_OpenFiles.size());
+ m_OpenFiles.push_back(TOpenHandles{});
+ m_OpenFiles.back().Push(Writer.release());
+ m_ChunkWriters.insert_or_assign(FileIndex, HandleIndex);
+ m_OpenHandleCount++;
+ }
+}
+
+void
+BufferedWriteFileCache::Close(std::span<uint32_t> FileIndexes)
+{
+ ZEN_TRACE_CPU("BufferedWriteFileCache::Close");
+
+ std::vector<std::unique_ptr<BasicFile>> FilesToClose;
+ FilesToClose.reserve(FileIndexes.size());
+ {
+ RwLock::ExclusiveLockScope _(m_WriterLock);
+ for (uint32_t FileIndex : FileIndexes)
+ {
+ if (auto It = m_ChunkWriters.find(FileIndex); It != m_ChunkWriters.end())
+ {
+ const uint32_t HandleIndex = It->second;
+ TOpenHandles& OpenHandles = m_OpenFiles[HandleIndex];
+ while (BasicFile* File = OpenHandles.Pop())
+ {
+ FilesToClose.emplace_back(std::unique_ptr<BasicFile>(File));
+ m_OpenHandleCount--;
+ }
+ m_ChunkWriters.erase(It);
+ }
+ }
+ }
+ FilesToClose.clear();
+}
+
+BufferedWriteFileCache::Local::Local(BufferedWriteFileCache& Cache) : m_Cache(Cache)
+{
+}
+
+BufferedWriteFileCache::Local::Writer*
+BufferedWriteFileCache::Local::GetWriter(uint32_t FileIndex)
+{
+ if (auto It = m_FileIndexToWriterIndex.find(FileIndex); It != m_FileIndexToWriterIndex.end())
+ {
+ return m_ChunkWriters[It->second].get();
+ }
+ std::unique_ptr<BasicFile> File = m_Cache.Get(FileIndex);
+ if (File)
+ {
+ const uint32_t WriterIndex = gsl::narrow<uint32_t>(m_ChunkWriters.size());
+ m_FileIndexToWriterIndex.insert_or_assign(FileIndex, WriterIndex);
+ m_ChunkWriters.emplace_back(std::make_unique<Writer>(Writer{.File = std::move(File)}));
+ return m_ChunkWriters.back().get();
+ }
+ return nullptr;
+}
+
+BufferedWriteFileCache::Local::Writer*
+BufferedWriteFileCache::Local::PutWriter(uint32_t FileIndex, std::unique_ptr<Writer> Writer)
+{
+ ZEN_ASSERT(!m_FileIndexToWriterIndex.contains(FileIndex));
+ const uint32_t WriterIndex = gsl::narrow<uint32_t>(m_ChunkWriters.size());
+ m_FileIndexToWriterIndex.insert_or_assign(FileIndex, WriterIndex);
+ m_ChunkWriters.emplace_back(std::move(Writer));
+ return m_ChunkWriters.back().get();
+}
+
+BufferedWriteFileCache::Local::~Local()
+{
+ ZEN_TRACE_CPU("BufferedWriteFileCache::~Local()");
+ try
+ {
+ for (auto& It : m_FileIndexToWriterIndex)
+ {
+ const uint32_t FileIndex = It.first;
+ const uint32_t WriterIndex = It.second;
+ m_ChunkWriters[WriterIndex]->Writer.reset();
+ std::unique_ptr<BasicFile> File;
+ File.swap(m_ChunkWriters[WriterIndex]->File);
+ m_Cache.Put(FileIndex, std::move(File));
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("BufferedWriteFileCache::~Local() threw exeption: {}", Ex.what());
+ }
+}
+
+} // namespace zen
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp
new file mode 100644
index 000000000..88238effd
--- /dev/null
+++ b/src/zenutil/buildstoragecache.cpp
@@ -0,0 +1,407 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/buildstoragecache.h>
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/fmtutils.h>
+#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
+#include <zenhttp/httpclient.h>
+#include <zenhttp/packageformat.h>
+#include <zenutil/workerpools.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_set.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+using namespace std::literals;
+
+class ZenBuildStorageCache : public BuildStorageCache
+{
+public:
+ explicit ZenBuildStorageCache(HttpClient& HttpClient,
+ BuildStorageCache::Statistics& Stats,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const std::filesystem::path& TempFolderPath,
+ bool BoostBackgroundThreadCount)
+ : m_HttpClient(HttpClient)
+ , m_Stats(Stats)
+ , m_Namespace(Namespace.empty() ? "none" : Namespace)
+ , m_Bucket(Bucket.empty() ? "none" : Bucket)
+ , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred())
+ , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount)
+ , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background)
+ : GetTinyWorkerPool(EWorkloadType::Background))
+ , m_PendingBackgroundWorkCount(1)
+ , m_CancelBackgroundWork(false)
+ {
+ }
+
+ virtual ~ZenBuildStorageCache()
+ {
+ try
+ {
+ m_CancelBackgroundWork.store(true);
+ if (!IsFlushed)
+ {
+ m_PendingBackgroundWorkCount.CountDown();
+ m_PendingBackgroundWorkCount.Wait();
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~ZenBuildStorageCache() failed with: {}", Ex.what());
+ }
+ }
+
+ void ScheduleBackgroundWork(std::function<void()>&& Work)
+ {
+ m_PendingBackgroundWorkCount.AddCount(1);
+ try
+ {
+ m_BackgroundWorkPool.ScheduleWork([this, Work = std::move(Work)]() {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork");
+ auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); });
+ if (!m_CancelBackgroundWork)
+ {
+ try
+ {
+ Work();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what());
+ }
+ }
+ });
+ }
+ catch (const std::exception& Ex)
+ {
+ m_PendingBackgroundWorkCount.CountDown();
+ ZEN_ERROR("Failed scheduling background upload to build cache. Reason: {}", Ex.what());
+ }
+ }
+
+ virtual void PutBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload) override
+ {
+ ZEN_ASSERT(!IsFlushed);
+ ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
+ ScheduleBackgroundWork(
+ [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob");
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ HttpClient::Response CacheResponse =
+ m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()),
+ Payload,
+ ContentType);
+ AddStatistic(CacheResponse);
+ if (!CacheResponse.IsSuccess())
+ {
+ ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv));
+ }
+ });
+ }
+
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override
+ {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::GetBuildBlob");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ HttpClient::KeyValueMap Headers;
+ if (RangeOffset != 0 || RangeBytes != (uint64_t)-1)
+ {
+ Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", RangeOffset, RangeOffset + RangeBytes - 1)});
+ }
+ CreateDirectories(m_TempFolderPath);
+ HttpClient::Response CacheResponse =
+ m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()),
+ m_TempFolderPath,
+ Headers);
+ AddStatistic(CacheResponse);
+ if (CacheResponse.IsSuccess())
+ {
+ return CacheResponse.ResponsePayload;
+ }
+ return {};
+ }
+
+ virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override
+ {
+ ZEN_ASSERT(!IsFlushed);
+ ScheduleBackgroundWork([this,
+ BuildId = Oid(BuildId),
+ BlobRawHashes = std::vector<IoHash>(BlobHashes.begin(), BlobHashes.end()),
+ MetaDatas = std::vector<CbObject>(MetaDatas.begin(), MetaDatas.end())]() {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::PutBlobMetadatas");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ const uint64_t BlobCount = BlobRawHashes.size();
+
+ CbPackage RequestPackage;
+ std::vector<CbAttachment> Attachments;
+ tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes;
+ Attachments.reserve(BlobCount);
+ AttachmentHashes.reserve(BlobCount);
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.BeginArray("blobHashes");
+ for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++)
+ {
+ RequestWriter.AddHash(BlobRawHashes[BlockHashIndex]);
+ }
+ RequestWriter.EndArray(); // blobHashes
+
+ RequestWriter.BeginArray("metadatas");
+ for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++)
+ {
+ const IoHash ObjectHash = MetaDatas[BlockHashIndex].GetHash();
+ RequestWriter.AddBinaryAttachment(ObjectHash);
+ if (!AttachmentHashes.contains(ObjectHash))
+ {
+ Attachments.push_back(CbAttachment(MetaDatas[BlockHashIndex], ObjectHash));
+ AttachmentHashes.insert(ObjectHash);
+ }
+ }
+
+ RequestWriter.EndArray(); // metadatas
+
+ RequestPackage.SetObject(RequestWriter.Save());
+ }
+ RequestPackage.AddAttachments(Attachments);
+
+ CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage);
+
+ HttpClient::Response CacheResponse =
+ m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/putBlobMetadata", m_Namespace, m_Bucket, BuildId),
+ RpcRequestBuffer,
+ ZenContentType::kCbPackage);
+ AddStatistic(CacheResponse);
+ if (!CacheResponse.IsSuccess())
+ {
+ ZEN_DEBUG("Failed posting blob metadata to cache: {}", CacheResponse.ErrorMessage(""sv));
+ }
+ });
+ }
+
+ virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) override
+ {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::GetBlobMetadatas");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ CbObjectWriter Request;
+
+ Request.BeginArray("blobHashes"sv);
+ for (const IoHash& BlobHash : BlobHashes)
+ {
+ Request.AddHash(BlobHash);
+ }
+ Request.EndArray();
+
+ IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+
+ HttpClient::Response Response =
+ m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/getBlobMetadata", m_Namespace, m_Bucket, BuildId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ AddStatistic(Response);
+ if (Response.IsSuccess())
+ {
+ std::vector<CbObject> Result;
+
+ CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload);
+ CbObject ResponseObject = ResponsePackage.GetObject();
+
+ CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView();
+ CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView();
+ Result.reserve(MetadatasArray.Num());
+ auto BlobHashesIt = BlobHashes.begin();
+ auto BlobHashArrayIt = begin(BlobHashArray);
+ auto MetadataArrayIt = begin(MetadatasArray);
+ while (MetadataArrayIt != end(MetadatasArray))
+ {
+ const IoHash BlobHash = (*BlobHashArrayIt).AsHash();
+ while (BlobHash != *BlobHashesIt)
+ {
+ ZEN_ASSERT(BlobHashesIt != BlobHashes.end());
+ BlobHashesIt++;
+ }
+
+ ZEN_ASSERT(BlobHash == *BlobHashesIt);
+
+ const IoHash MetaHash = (*MetadataArrayIt).AsAttachment();
+ const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash);
+ ZEN_ASSERT(MetaAttachment);
+
+ CbObject Metadata = MetaAttachment->AsObject();
+ Result.emplace_back(std::move(Metadata));
+
+ BlobHashArrayIt++;
+ MetadataArrayIt++;
+ BlobHashesIt++;
+ }
+ return Result;
+ }
+ return {};
+ }
+
+ virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) override
+ {
+ ZEN_TRACE_CPU("ZenBuildStorageCache::BlobsExists");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+
+ CbObjectWriter Request;
+
+ Request.BeginArray("blobHashes"sv);
+ for (const IoHash& BlobHash : BlobHashes)
+ {
+ Request.AddHash(BlobHash);
+ }
+ Request.EndArray();
+
+ IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+
+ HttpClient::Response Response = m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/exists", m_Namespace, m_Bucket, BuildId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ AddStatistic(Response);
+ if (Response.IsSuccess())
+ {
+ CbObject ResponseObject = LoadCompactBinaryObject(Response.ResponsePayload);
+ if (!ResponseObject)
+ {
+ throw std::runtime_error("BlobExists reponse is invalid, failed to load payload as compact binary object");
+ }
+ CbArrayView BlobsExistsArray = ResponseObject["blobExists"sv].AsArrayView();
+ if (!BlobsExistsArray)
+ {
+ throw std::runtime_error("BlobExists reponse is invalid, 'blobExists' array is missing");
+ }
+ if (BlobsExistsArray.Num() != BlobHashes.size())
+ {
+ throw std::runtime_error(fmt::format("BlobExists reponse is invalid, 'blobExists' array contains {} entries, expected {}",
+ BlobsExistsArray.Num(),
+ BlobHashes.size()));
+ }
+
+ CbArrayView MetadatasExistsArray = ResponseObject["metadataExists"sv].AsArrayView();
+ if (!MetadatasExistsArray)
+ {
+ throw std::runtime_error("BlobExists reponse is invalid, 'metadataExists' array is missing");
+ }
+ if (MetadatasExistsArray.Num() != BlobHashes.size())
+ {
+ throw std::runtime_error(
+ fmt::format("BlobExists reponse is invalid, 'metadataExists' array contains {} entries, expected {}",
+ MetadatasExistsArray.Num(),
+ BlobHashes.size()));
+ }
+
+ std::vector<BlobExistsResult> Result;
+ Result.reserve(BlobHashes.size());
+ auto BlobExistsIt = begin(BlobsExistsArray);
+ auto MetadataExistsIt = begin(MetadatasExistsArray);
+ while (BlobExistsIt != end(BlobsExistsArray))
+ {
+ ZEN_ASSERT(MetadataExistsIt != end(MetadatasExistsArray));
+
+ const bool HasBody = (*BlobExistsIt).AsBool();
+ const bool HasMetadata = (*MetadataExistsIt).AsBool();
+
+ Result.push_back({.HasBody = HasBody, .HasMetadata = HasMetadata});
+
+ BlobExistsIt++;
+ MetadataExistsIt++;
+ }
+ return Result;
+ }
+ return {};
+ }
+
+ virtual void Flush(int32_t UpdateIntervalMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override
+ {
+ if (IsFlushed)
+ {
+ return;
+ }
+ if (!IsFlushed)
+ {
+ m_PendingBackgroundWorkCount.CountDown();
+ IsFlushed = true;
+ }
+ if (m_PendingBackgroundWorkCount.Wait(100))
+ {
+ return;
+ }
+ while (true)
+ {
+ intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining();
+ if (UpdateCallback(Remaining))
+ {
+ if (m_PendingBackgroundWorkCount.Wait(UpdateIntervalMS))
+ {
+ UpdateCallback(0);
+ return;
+ }
+ }
+ else
+ {
+ m_CancelBackgroundWork.store(true);
+ }
+ }
+ }
+
+private:
+ void AddStatistic(const HttpClient::Response& Result)
+ {
+ m_Stats.TotalBytesWritten += Result.UploadedBytes;
+ m_Stats.TotalBytesRead += Result.DownloadedBytes;
+ m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0);
+ m_Stats.TotalRequestCount++;
+ }
+
+ HttpClient& m_HttpClient;
+ BuildStorageCache::Statistics& m_Stats;
+ const std::string m_Namespace;
+ const std::string m_Bucket;
+ const std::filesystem::path m_TempFolderPath;
+ const bool m_BoostBackgroundThreadCount;
+ bool IsFlushed = false;
+
+ WorkerThreadPool& m_BackgroundWorkPool;
+ Latch m_PendingBackgroundWorkCount;
+ std::atomic<bool> m_CancelBackgroundWork;
+};
+
+std::unique_ptr<BuildStorageCache>
+CreateZenBuildStorageCache(HttpClient& HttpClient,
+ BuildStorageCache::Statistics& Stats,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const std::filesystem::path& TempFolderPath,
+ bool BoostBackgroundThreadCount)
+{
+ return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount);
+}
+
+} // namespace zen
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
index 1f951167d..46e80f6b7 100644
--- a/src/zenutil/cache/rpcrecording.cpp
+++ b/src/zenutil/cache/rpcrecording.cpp
@@ -46,7 +46,7 @@ struct RecordedRequestsWriter
void BeginWrite(const std::filesystem::path& BasePath)
{
m_BasePath = BasePath;
- std::filesystem::create_directories(m_BasePath);
+ CreateDirectories(m_BasePath);
}
void EndWrite()
@@ -366,6 +366,7 @@ private:
};
std::unique_ptr<std::thread> m_WriterThread;
+ std::atomic_bool m_IsWriterReady{false};
std::atomic_bool m_IsActive{false};
std::atomic_int64_t m_PendingRequests{0};
RwLock m_RequestQueueLock;
@@ -426,7 +427,7 @@ RecordedRequestsSegmentWriter::BeginWrite(const std::filesystem::path& BasePath,
m_BasePath = BasePath;
m_SegmentIndex = SegmentIndex;
m_RequestBaseIndex = RequestBaseIndex;
- std::filesystem::create_directories(m_BasePath);
+ CreateDirectories(m_BasePath);
}
void
@@ -658,6 +659,8 @@ RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath)
m_IsActive = true;
m_WriterThread.reset(new std::thread(&RecordedRequestsWriter::WriterThreadMain, this));
+
+ m_IsWriterReady.wait(false);
}
void
@@ -707,6 +710,9 @@ RecordedRequestsWriter::WriterThreadMain()
SetCurrentThreadName("rpc_writer");
EnsureCurrentSegment();
+ m_IsWriterReady.store(true);
+ m_IsWriterReady.notify_all();
+
while (m_IsActive)
{
m_PendingRequests.wait(0);
@@ -1051,14 +1057,14 @@ public:
static bool IsCompatible(const std::filesystem::path& BasePath)
{
- if (std::filesystem::exists(BasePath / "rpc_recording_info.zcb"))
+ if (IsFile(BasePath / "rpc_recording_info.zcb"))
{
return true;
}
const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0);
- if (std::filesystem::exists(SegmentZero / "rpc_segment_info.zcb") && std::filesystem::exists(SegmentZero / "index.bin"))
+ if (IsFile(SegmentZero / "rpc_segment_info.zcb") && IsFile(SegmentZero / "index.bin"))
{
// top-level metadata is missing, possibly because of premature exit
// on the recording side
diff --git a/src/zenutil/chunkblock.cpp b/src/zenutil/chunkblock.cpp
index f3c14edc4..abfc0fb63 100644
--- a/src/zenutil/chunkblock.cpp
+++ b/src/zenutil/chunkblock.cpp
@@ -52,7 +52,7 @@ ParseChunkBlockDescriptionList(const CbObjectView& BlocksObject)
return {};
}
std::vector<ChunkBlockDescription> Result;
- CbArrayView Blocks = BlocksObject["blocks"].AsArrayView();
+ CbArrayView Blocks = BlocksObject["blocks"sv].AsArrayView();
Result.reserve(Blocks.Num());
for (CbFieldView BlockView : Blocks)
{
diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp
index bb1ee5183..cd1bf7dd7 100644
--- a/src/zenutil/chunkedcontent.cpp
+++ b/src/zenutil/chunkedcontent.cpp
@@ -11,7 +11,7 @@
#include <zenutil/chunkedfile.h>
#include <zenutil/chunkingcontroller.h>
-#include <zenutil/parallellwork.h>
+#include <zenutil/parallelwork.h>
#include <zenutil/workerpools.h>
ZEN_THIRD_PARTY_INCLUDES_START
@@ -140,8 +140,12 @@ namespace {
{
ZEN_TRACE_CPU("HashOnly");
- IoBuffer Buffer = IoBufferBuilder::MakeFromFile((FolderPath / Path).make_preferred());
- const IoHash Hash = IoHash::HashBuffer(Buffer, &Stats.BytesHashed);
+ IoBuffer Buffer = IoBufferBuilder::MakeFromFile((FolderPath / Path).make_preferred());
+ if (Buffer.GetSize() != RawSize)
+ {
+ throw std::runtime_error(fmt::format("Failed opening file '{}' for hashing", FolderPath / Path));
+ }
+ const IoHash Hash = IoHash::HashBuffer(Buffer, &Stats.BytesHashed);
Lock.WithExclusiveLock([&]() {
if (!RawHashToSequenceRawHashIndex.contains(Hash))
@@ -301,17 +305,25 @@ FolderContent::UpdateState(const FolderContent& Rhs, std::vector<uint32_t>& OutP
}
FolderContent
-GetUpdatedContent(const FolderContent& Old, const FolderContent& New, std::vector<std::filesystem::path>& OutDeletedPathIndexes)
+GetUpdatedContent(const FolderContent& Old, const FolderContent& New, std::vector<std::filesystem::path>& OutDeletedPaths)
{
ZEN_TRACE_CPU("FolderContent::GetUpdatedContent");
- FolderContent Result = {.Platform = Old.Platform};
+
+ const uint32_t NewPathCount = gsl::narrow<uint32_t>(New.Paths.size());
+
+ FolderContent Result = {.Platform = Old.Platform};
+ Result.Paths.reserve(NewPathCount);
+ Result.RawSizes.reserve(NewPathCount);
+ Result.Attributes.reserve(NewPathCount);
+ Result.ModificationTicks.reserve(NewPathCount);
+
tsl::robin_map<std::string, uint32_t> NewPathToIndex;
- const uint32_t NewPathCount = gsl::narrow<uint32_t>(New.Paths.size());
NewPathToIndex.reserve(NewPathCount);
for (uint32_t NewPathIndex = 0; NewPathIndex < NewPathCount; NewPathIndex++)
{
NewPathToIndex.insert({New.Paths[NewPathIndex].generic_string(), NewPathIndex});
}
+
uint32_t OldPathCount = gsl::narrow<uint32_t>(Old.Paths.size());
for (uint32_t OldPathIndex = 0; OldPathIndex < OldPathCount; OldPathIndex++)
{
@@ -330,7 +342,7 @@ GetUpdatedContent(const FolderContent& Old, const FolderContent& New, std::vecto
}
else
{
- OutDeletedPathIndexes.push_back(Old.Paths[OldPathIndex]);
+ OutDeletedPaths.push_back(Old.Paths[OldPathIndex]);
}
}
return Result;
@@ -366,7 +378,7 @@ GetFolderContent(GetFolderContentStatistics& Stats,
std::function<bool(const std::string_view& RelativePath)>&& AcceptDirectory,
std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& AcceptFile,
WorkerThreadPool& WorkerPool,
- int32_t UpdateInteralMS,
+ int32_t UpdateIntervalMS,
std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
std::atomic<bool>& AbortFlag)
{
@@ -455,7 +467,7 @@ GetFolderContent(GetFolderContentStatistics& Stats,
WorkerPool,
PendingWork);
PendingWork.CountDown();
- while (!PendingWork.Wait(UpdateInteralMS))
+ while (!PendingWork.Wait(UpdateIntervalMS))
{
UpdateCallback(AbortFlag.load(), PendingWork.Remaining());
}
@@ -650,7 +662,9 @@ MergeChunkedFolderContents(const ChunkedFolderContent& Base, std::span<const Chu
}
ChunkedFolderContent
-DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span<const std::filesystem::path> DeletedPaths)
+DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent,
+ const ChunkedContentLookup& BaseContentLookup,
+ std::span<const std::filesystem::path> DeletedPaths)
{
ZEN_TRACE_CPU("DeletePathsFromChunkedContent");
@@ -664,8 +678,18 @@ DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span
{
DeletedPathSet.insert(PathCompareString(DeletedPath));
}
- const ChunkedContentLookup BaseLookup = BuildChunkedContentLookup(BaseContent);
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
+
+ const size_t BaseChunkCount = BaseContent.ChunkedContent.ChunkHashes.size();
+ std::vector<uint32_t> NewChunkIndexes(BaseChunkCount, (uint32_t)-1);
+
+ const size_t ExpectedPathCount = BaseContent.Paths.size() - DeletedPaths.size();
+ Result.Paths.reserve(ExpectedPathCount);
+ Result.RawSizes.reserve(ExpectedPathCount);
+ Result.Attributes.reserve(ExpectedPathCount);
+ Result.RawHashes.reserve(ExpectedPathCount);
+
+ Result.ChunkedContent.ChunkHashes.reserve(BaseChunkCount);
+ Result.ChunkedContent.ChunkRawSizes.reserve(BaseChunkCount);
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceRawHashIndex;
for (uint32_t PathIndex = 0; PathIndex < BaseContent.Paths.size(); PathIndex++)
@@ -685,20 +709,33 @@ DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span
{
RawHashToSequenceRawHashIndex.insert(
{RawHash, gsl::narrow<uint32_t>(Result.ChunkedContent.SequenceRawHashes.size())});
- const uint32_t SequenceRawHashIndex = BaseLookup.RawHashToSequenceIndex.at(RawHash);
- const uint32_t OrderIndexOffset = BaseLookup.SequenceIndexChunkOrderOffset[SequenceRawHashIndex];
- const uint32_t ChunkCount = BaseContent.ChunkedContent.ChunkCounts[SequenceRawHashIndex];
- ChunkingStatistics Stats;
+ const uint32_t SequenceRawHashIndex = BaseContentLookup.RawHashToSequenceIndex.at(RawHash);
+ const uint32_t OrderIndexOffset = BaseContentLookup.SequenceIndexChunkOrderOffset[SequenceRawHashIndex];
+ const uint32_t ChunkCount = BaseContent.ChunkedContent.ChunkCounts[SequenceRawHashIndex];
+
std::span<const uint32_t> OriginalChunkOrder =
std::span<const uint32_t>(BaseContent.ChunkedContent.ChunkOrders).subspan(OrderIndexOffset, ChunkCount);
- AddChunkSequence(Stats,
- Result.ChunkedContent,
- ChunkHashToChunkIndex,
- RawHash,
- OriginalChunkOrder,
- BaseContent.ChunkedContent.ChunkHashes,
- BaseContent.ChunkedContent.ChunkRawSizes);
- Stats.UniqueSequencesFound++;
+
+ Result.ChunkedContent.ChunkCounts.push_back(gsl::narrow<uint32_t>(OriginalChunkOrder.size()));
+
+ for (uint32_t OldChunkIndex : OriginalChunkOrder)
+ {
+ if (uint32_t FoundChunkIndex = NewChunkIndexes[OldChunkIndex]; FoundChunkIndex != (uint32_t)-1)
+ {
+ Result.ChunkedContent.ChunkOrders.push_back(FoundChunkIndex);
+ }
+ else
+ {
+ const uint32_t NewChunkIndex = gsl::narrow<uint32_t>(Result.ChunkedContent.ChunkHashes.size());
+ NewChunkIndexes[OldChunkIndex] = NewChunkIndex;
+ const IoHash& ChunkHash = BaseContent.ChunkedContent.ChunkHashes[OldChunkIndex];
+ const uint64_t OldChunkSize = BaseContent.ChunkedContent.ChunkRawSizes[OldChunkIndex];
+ Result.ChunkedContent.ChunkHashes.push_back(ChunkHash);
+ Result.ChunkedContent.ChunkRawSizes.push_back(OldChunkSize);
+ Result.ChunkedContent.ChunkOrders.push_back(NewChunkIndex);
+ }
+ }
+ Result.ChunkedContent.SequenceRawHashes.push_back(RawHash);
}
}
}
@@ -708,14 +745,28 @@ DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span
}
ChunkedFolderContent
-ChunkFolderContent(ChunkingStatistics& Stats,
- WorkerThreadPool& WorkerPool,
- const std::filesystem::path& RootPath,
- const FolderContent& Content,
- const ChunkingController& InChunkingController,
- int32_t UpdateInteralMS,
- std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
- std::atomic<bool>& AbortFlag)
+DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span<const std::filesystem::path> DeletedPaths)
+{
+ ZEN_TRACE_CPU("DeletePathsFromChunkedContent");
+ ZEN_ASSERT(DeletedPaths.size() <= BaseContent.Paths.size());
+ if (DeletedPaths.size() == BaseContent.Paths.size())
+ {
+ return {};
+ }
+ const ChunkedContentLookup BaseLookup = BuildChunkedContentLookup(BaseContent);
+ return DeletePathsFromChunkedContent(BaseContent, BaseLookup, DeletedPaths);
+}
+
+ChunkedFolderContent
+ChunkFolderContent(ChunkingStatistics& Stats,
+ WorkerThreadPool& WorkerPool,
+ const std::filesystem::path& RootPath,
+ const FolderContent& Content,
+ const ChunkingController& InChunkingController,
+ int32_t UpdateIntervalMS,
+ std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)>&& UpdateCallback,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag)
{
ZEN_TRACE_CPU("ChunkFolderContent");
@@ -754,7 +805,7 @@ ChunkFolderContent(ChunkingStatistics& Stats,
RwLock Lock;
- ParallellWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
for (uint32_t PathIndex : Order)
{
@@ -762,31 +813,28 @@ ChunkFolderContent(ChunkingStatistics& Stats,
{
break;
}
- Work.ScheduleWork(
- WorkerPool, // GetSyncWorkerPool()
- [&, PathIndex](std::atomic<bool>& AbortFlag) {
- if (!AbortFlag)
- {
- IoHash RawHash = HashOneFile(Stats,
- InChunkingController,
- Result,
- ChunkHashToChunkIndex,
- RawHashToSequenceRawHashIndex,
- Lock,
- RootPath,
- PathIndex,
- AbortFlag);
- Lock.WithExclusiveLock([&]() { Result.RawHashes[PathIndex] = RawHash; });
- Stats.FilesProcessed++;
- }
- },
- Work.DefaultErrorFunction());
- }
-
- Work.Wait(UpdateInteralMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted);
+ Work.ScheduleWork(WorkerPool, // GetSyncWorkerPool()
+ [&, PathIndex](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
+ {
+ IoHash RawHash = HashOneFile(Stats,
+ InChunkingController,
+ Result,
+ ChunkHashToChunkIndex,
+ RawHashToSequenceRawHashIndex,
+ Lock,
+ RootPath,
+ PathIndex,
+ AbortFlag);
+ Lock.WithExclusiveLock([&]() { Result.RawHashes[PathIndex] = RawHash; });
+ Stats.FilesProcessed++;
+ }
+ });
+ }
+
+ Work.Wait(UpdateIntervalMS, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(PendingWork);
- UpdateCallback(Work.IsAborted(), Work.PendingWork().Remaining());
+ UpdateCallback(IsAborted, IsPaused, Work.PendingWork().Remaining());
});
}
return Result;
@@ -799,8 +847,9 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content)
struct ChunkLocationReference
{
- uint32_t ChunkIndex = (uint32_t)-1;
- ChunkedContentLookup::ChunkSequenceLocation Location;
+ uint32_t ChunkIndex = (uint32_t)-1;
+ uint32_t SequenceIndex = (uint32_t)-1;
+ uint64_t Offset = (uint64_t)-1;
};
ChunkedContentLookup Result;
@@ -829,8 +878,7 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content)
{
uint32_t ChunkIndex = Content.ChunkedContent.ChunkOrders[OrderIndex];
- Locations.push_back(
- ChunkLocationReference{ChunkIndex, ChunkedContentLookup::ChunkSequenceLocation{SequenceIndex, LocationOffset}});
+ Locations.push_back(ChunkLocationReference{.ChunkIndex = ChunkIndex, .SequenceIndex = SequenceIndex, .Offset = LocationOffset});
LocationOffset += Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
}
@@ -845,15 +893,15 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content)
{
return false;
}
- if (Lhs.Location.SequenceIndex < Rhs.Location.SequenceIndex)
+ if (Lhs.SequenceIndex < Rhs.SequenceIndex)
{
return true;
}
- if (Lhs.Location.SequenceIndex > Rhs.Location.SequenceIndex)
+ if (Lhs.SequenceIndex > Rhs.SequenceIndex)
{
return false;
}
- return Lhs.Location.Offset < Rhs.Location.Offset;
+ return Lhs.Offset < Rhs.Offset;
});
Result.ChunkSequenceLocations.reserve(Locations.size());
@@ -866,7 +914,10 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content)
uint32_t Count = 0;
while ((RangeOffset + Count < Locations.size()) && (Locations[RangeOffset + Count].ChunkIndex == ChunkIndex))
{
- Result.ChunkSequenceLocations.push_back(Locations[RangeOffset + Count].Location);
+ const ChunkLocationReference& LocationReference = Locations[RangeOffset + Count];
+ Result.ChunkSequenceLocations.push_back(
+ ChunkedContentLookup::ChunkSequenceLocation{.SequenceIndex = LocationReference.SequenceIndex,
+ .Offset = LocationReference.Offset});
Count++;
}
Result.ChunkSequenceLocationOffset.push_back(RangeOffset);
@@ -875,8 +926,12 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content)
}
Result.SequenceIndexFirstPathIndex.resize(Content.ChunkedContent.SequenceRawHashes.size(), (uint32_t)-1);
+ Result.PathExtensionHash.resize(Content.Paths.size());
for (uint32_t PathIndex = 0; PathIndex < Content.Paths.size(); PathIndex++)
{
+ std::string LowercaseExtension = Content.Paths[PathIndex].extension().string();
+ std::transform(LowercaseExtension.begin(), LowercaseExtension.end(), LowercaseExtension.begin(), ::tolower);
+ Result.PathExtensionHash[PathIndex] = HashStringDjb2(LowercaseExtension);
if (Content.RawSizes[PathIndex] > 0)
{
const IoHash& RawHash = Content.RawHashes[PathIndex];
diff --git a/src/zenutil/chunkingcontroller.cpp b/src/zenutil/chunkingcontroller.cpp
index 2a7057a46..6fb4182c0 100644
--- a/src/zenutil/chunkingcontroller.cpp
+++ b/src/zenutil/chunkingcontroller.cpp
@@ -4,6 +4,7 @@
#include <zencore/basicfile.h>
#include <zencore/compactbinarybuilder.h>
+#include <zencore/filesystem.h>
#include <zencore/trace.h>
ZEN_THIRD_PARTY_INCLUDES_START
@@ -35,26 +36,54 @@ namespace {
return ChunkedParams{.UseThreshold = UseThreshold, .MinSize = MinSize, .MaxSize = MaxSize, .AvgSize = AvgSize};
}
-} // namespace
+ void WriteChunkParams(CbObjectWriter& Writer, const ChunkedParams& Params)
+ {
+ Writer.BeginObject("ChunkingParams"sv);
+ {
+ Writer.AddBool("UseThreshold"sv, Params.UseThreshold);
-class BasicChunkingController : public ChunkingController
-{
-public:
- BasicChunkingController(std::span<const std::string_view> ExcludeExtensions,
- uint64_t ChunkFileSizeLimit,
- const ChunkedParams& ChunkingParams)
- : m_ChunkExcludeExtensions(ExcludeExtensions.begin(), ExcludeExtensions.end())
- , m_ChunkFileSizeLimit(ChunkFileSizeLimit)
- , m_ChunkingParams(ChunkingParams)
+ Writer.AddInteger("MinSize"sv, (uint64_t)Params.MinSize);
+ Writer.AddInteger("MaxSize"sv, (uint64_t)Params.MaxSize);
+ Writer.AddInteger("AvgSize"sv, (uint64_t)Params.AvgSize);
+ }
+ Writer.EndObject(); // ChunkingParams
+ }
+
+ bool IsElfFile(BasicFile& Buffer)
{
+ if (Buffer.FileSize() > 4)
+ {
+ uint32_t ElfCheck = 0;
+ Buffer.Read(&ElfCheck, 4, 0);
+ if (ElfCheck == 0x464c457f)
+ {
+ return true;
+ }
+ }
+ return false;
}
- BasicChunkingController(CbObjectView Parameters)
- : m_ChunkExcludeExtensions(ReadStringArray(Parameters["ChunkExcludeExtensions"sv].AsArrayView()))
- , m_ChunkFileSizeLimit(Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit))
- , m_ChunkingParams(ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView()))
+ bool IsMachOFile(BasicFile& Buffer)
{
+ if (Buffer.FileSize() > 4)
+ {
+ uint32_t MachOCheck = 0;
+ Buffer.Read(&MachOCheck, 4, 0);
+ if ((MachOCheck == 0xfeedface) || (MachOCheck == 0xcefaedfe))
+ {
+ return true;
+ }
+ }
+ return false;
}
+} // namespace
+
+class BasicChunkingController : public ChunkingController
+{
+public:
+ BasicChunkingController(const BasicChunkingControllerSettings& Settings) : m_Settings(Settings) {}
+
+ BasicChunkingController(CbObjectView Parameters) : m_Settings(ReadSettings(Parameters)) {}
virtual bool ProcessFile(const std::filesystem::path& InputPath,
uint64_t RawSize,
@@ -64,16 +93,25 @@ public:
{
ZEN_TRACE_CPU("BasicChunkingController::ProcessFile");
const bool ExcludeFromChunking =
- std::find(m_ChunkExcludeExtensions.begin(), m_ChunkExcludeExtensions.end(), InputPath.extension()) !=
- m_ChunkExcludeExtensions.end();
+ std::find(m_Settings.ExcludeExtensions.begin(), m_Settings.ExcludeExtensions.end(), InputPath.extension()) !=
+ m_Settings.ExcludeExtensions.end();
- if (ExcludeFromChunking || (RawSize < m_ChunkFileSizeLimit))
+ if (ExcludeFromChunking || (RawSize < m_Settings.ChunkFileSizeLimit))
{
return false;
}
BasicFile Buffer(InputPath, BasicFile::Mode::kRead);
- OutChunked = ChunkData(Buffer, 0, RawSize, m_ChunkingParams, &BytesProcessed, &AbortFlag);
+ if (m_Settings.ExcludeElfFiles && IsElfFile(Buffer))
+ {
+ return false;
+ }
+ if (m_Settings.ExcludeMachOFiles && IsMachOFile(Buffer))
+ {
+ return false;
+ }
+
+ OutChunked = ChunkData(Buffer, 0, RawSize, m_Settings.ChunkingParams, &BytesProcessed, &AbortFlag);
return true;
}
@@ -84,53 +122,43 @@ public:
CbObjectWriter Writer;
Writer.BeginArray("ChunkExcludeExtensions"sv);
{
- for (const std::string& Extension : m_ChunkExcludeExtensions)
+ for (const std::string& Extension : m_Settings.ExcludeExtensions)
{
Writer.AddString(Extension);
}
}
Writer.EndArray(); // ChunkExcludeExtensions
- Writer.AddInteger("ChunkFileSizeLimit"sv, m_ChunkFileSizeLimit);
- Writer.BeginObject("ChunkingParams"sv);
- {
- Writer.AddBool("UseThreshold"sv, m_ChunkingParams.UseThreshold);
- Writer.AddInteger("MinSize"sv, (uint64_t)m_ChunkingParams.MinSize);
- Writer.AddInteger("MaxSize"sv, (uint64_t)m_ChunkingParams.MaxSize);
- Writer.AddInteger("AvgSize"sv, (uint64_t)m_ChunkingParams.AvgSize);
- }
- Writer.EndObject(); // ChunkingParams
+ Writer.AddBool("ExcludeElfFiles"sv, m_Settings.ExcludeElfFiles);
+ Writer.AddBool("ExcludeMachOFiles"sv, m_Settings.ExcludeMachOFiles);
+ Writer.AddInteger("ChunkFileSizeLimit"sv, m_Settings.ChunkFileSizeLimit);
+
+ WriteChunkParams(Writer, m_Settings.ChunkingParams);
+
return Writer.Save();
}
static constexpr std::string_view Name = "BasicChunkingController"sv;
-protected:
- const std::vector<std::string> m_ChunkExcludeExtensions;
- const uint64_t m_ChunkFileSizeLimit;
- const ChunkedParams m_ChunkingParams;
+private:
+ static BasicChunkingControllerSettings ReadSettings(CbObjectView Parameters)
+ {
+ return BasicChunkingControllerSettings{
+ .ExcludeExtensions = ReadStringArray(Parameters["ChunkExcludeExtensions"sv].AsArrayView()),
+ .ExcludeElfFiles = Parameters["ExcludeElfFiles"sv].AsBool(DefaultChunkingExcludeElfFiles),
+ .ExcludeMachOFiles = Parameters["ExcludeMachOFiles"sv].AsBool(DefaultChunkingExcludeMachOFiles),
+ .ChunkFileSizeLimit = Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit),
+ .ChunkingParams = ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView())};
+ }
+
+ const BasicChunkingControllerSettings m_Settings;
};
class ChunkingControllerWithFixedChunking : public ChunkingController
{
public:
- ChunkingControllerWithFixedChunking(std::span<const std::string_view> FixedChunkingExtensions,
- uint64_t ChunkFileSizeLimit,
- const ChunkedParams& ChunkingParams,
- uint32_t FixedChunkingChunkSize)
- : m_FixedChunkingExtensions(FixedChunkingExtensions.begin(), FixedChunkingExtensions.end())
- , m_ChunkFileSizeLimit(ChunkFileSizeLimit)
- , m_ChunkingParams(ChunkingParams)
- , m_FixedChunkingChunkSize(FixedChunkingChunkSize)
- {
- }
+ ChunkingControllerWithFixedChunking(const ChunkingControllerWithFixedChunkingSettings& Settings) : m_Settings(Settings) {}
- ChunkingControllerWithFixedChunking(CbObjectView Parameters)
- : m_FixedChunkingExtensions(ReadStringArray(Parameters["FixedChunkingExtensions"sv].AsArrayView()))
- , m_ChunkFileSizeLimit(Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit))
- , m_ChunkingParams(ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView()))
- , m_FixedChunkingChunkSize(Parameters["FixedChunkingChunkSize"sv].AsUInt32(16u * 1024u * 1024u))
- {
- }
+ ChunkingControllerWithFixedChunking(CbObjectView Parameters) : m_Settings(ReadSettings(Parameters)) {}
virtual bool ProcessFile(const std::filesystem::path& InputPath,
uint64_t RawSize,
@@ -139,33 +167,71 @@ public:
std::atomic<bool>& AbortFlag) const override
{
ZEN_TRACE_CPU("ChunkingControllerWithFixedChunking::ProcessFile");
- if (RawSize < m_ChunkFileSizeLimit)
+ const bool ExcludeFromChunking =
+ std::find(m_Settings.ExcludeExtensions.begin(), m_Settings.ExcludeExtensions.end(), InputPath.extension()) !=
+ m_Settings.ExcludeExtensions.end();
+
+ if (ExcludeFromChunking || (RawSize < m_Settings.ChunkFileSizeLimit))
{
return false;
}
- const bool FixedChunking = std::find(m_FixedChunkingExtensions.begin(), m_FixedChunkingExtensions.end(), InputPath.extension()) !=
- m_FixedChunkingExtensions.end();
- if (FixedChunking)
+ const bool FixedChunkingExtension =
+ std::find(m_Settings.FixedChunkingExtensions.begin(), m_Settings.FixedChunkingExtensions.end(), InputPath.extension()) !=
+ m_Settings.FixedChunkingExtensions.end();
+
+ if (FixedChunkingExtension)
{
+ if (RawSize < m_Settings.MinSizeForFixedChunking)
+ {
+ return false;
+ }
ZEN_TRACE_CPU("FixedChunking");
- IoHashStream FullHash;
- IoBuffer Source = IoBufferBuilder::MakeFromFile(InputPath);
+ IoHashStream FullHasher;
+ BasicFile Source(InputPath, BasicFile::Mode::kRead);
uint64_t Offset = 0;
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
- ChunkHashToChunkIndex.reserve(1 + (RawSize / m_FixedChunkingChunkSize));
+ const uint64_t ExpectedChunkCount = 1 + (RawSize / m_Settings.FixedChunkingChunkSize);
+ ChunkHashToChunkIndex.reserve(ExpectedChunkCount);
+ OutChunked.Info.ChunkHashes.reserve(ExpectedChunkCount);
+ OutChunked.Info.ChunkSequence.reserve(ExpectedChunkCount);
+ OutChunked.ChunkSources.reserve(ExpectedChunkCount);
+
+ static const uint64_t BufferingSize = 256u * 1024u;
+
+ IoHashStream ChunkHasher;
+
while (Offset < RawSize)
{
if (AbortFlag)
{
return false;
}
- uint64_t ChunkSize = std::min<uint64_t>(RawSize - Offset, m_FixedChunkingChunkSize);
- IoBuffer Chunk(Source, Offset, ChunkSize);
- MemoryView ChunkData = Chunk.GetView();
- FullHash.Append(ChunkData);
- IoHash ChunkHash = IoHash::HashBuffer(ChunkData);
+ ChunkHasher.Reset();
+
+ uint64_t ChunkSize = std::min<uint64_t>(RawSize - Offset, m_Settings.FixedChunkingChunkSize);
+ if (ChunkSize >= (BufferingSize + BufferingSize / 2))
+ {
+ ScanFile(Source.Handle(),
+ Offset,
+ ChunkSize,
+ BufferingSize,
+ [&FullHasher, &ChunkHasher, &BytesProcessed](const void* Data, size_t Size) {
+ FullHasher.Append(Data, Size);
+ ChunkHasher.Append(Data, Size);
+ BytesProcessed.fetch_add(Size);
+ });
+ }
+ else
+ {
+ IoBuffer ChunkData = Source.ReadRange(Offset, ChunkSize);
+ FullHasher.Append(ChunkData);
+ ChunkHasher.Append(ChunkData);
+ BytesProcessed.fetch_add(ChunkSize);
+ }
+
+ const IoHash ChunkHash = ChunkHasher.GetHash();
if (auto It = ChunkHashToChunkIndex.find(ChunkHash); It != ChunkHashToChunkIndex.end())
{
OutChunked.Info.ChunkSequence.push_back(It->second);
@@ -178,16 +244,24 @@ public:
OutChunked.ChunkSources.push_back({.Offset = Offset, .Size = gsl::narrow<uint32_t>(ChunkSize)});
}
Offset += ChunkSize;
- BytesProcessed.fetch_add(ChunkSize);
}
OutChunked.Info.RawSize = RawSize;
- OutChunked.Info.RawHash = FullHash.GetHash();
+ OutChunked.Info.RawHash = FullHasher.GetHash();
return true;
}
else
{
BasicFile Buffer(InputPath, BasicFile::Mode::kRead);
- OutChunked = ChunkData(Buffer, 0, RawSize, m_ChunkingParams, &BytesProcessed);
+ if (m_Settings.ExcludeElfFiles && IsElfFile(Buffer))
+ {
+ return false;
+ }
+ if (m_Settings.ExcludeMachOFiles && IsMachOFile(Buffer))
+ {
+ return false;
+ }
+
+ OutChunked = ChunkData(Buffer, 0, RawSize, m_Settings.ChunkingParams, &BytesProcessed, &AbortFlag);
return true;
}
}
@@ -199,41 +273,57 @@ public:
CbObjectWriter Writer;
Writer.BeginArray("FixedChunkingExtensions");
{
- for (const std::string& Extension : m_FixedChunkingExtensions)
+ for (const std::string& Extension : m_Settings.FixedChunkingExtensions)
{
Writer.AddString(Extension);
}
}
Writer.EndArray(); // ChunkExcludeExtensions
- Writer.AddInteger("ChunkFileSizeLimit"sv, m_ChunkFileSizeLimit);
- Writer.BeginObject("ChunkingParams"sv);
- {
- Writer.AddBool("UseThreshold"sv, m_ChunkingParams.UseThreshold);
- Writer.AddInteger("MinSize"sv, (uint64_t)m_ChunkingParams.MinSize);
- Writer.AddInteger("MaxSize"sv, (uint64_t)m_ChunkingParams.MaxSize);
- Writer.AddInteger("AvgSize"sv, (uint64_t)m_ChunkingParams.AvgSize);
+ Writer.BeginArray("ChunkExcludeExtensions"sv);
+ {
+ for (const std::string& Extension : m_Settings.ExcludeExtensions)
+ {
+ Writer.AddString(Extension);
+ }
}
- Writer.EndObject(); // ChunkingParams
- Writer.AddInteger("FixedChunkingChunkSize"sv, m_FixedChunkingChunkSize);
+ Writer.EndArray(); // ChunkExcludeExtensions
+
+ Writer.AddBool("ExcludeElfFiles"sv, m_Settings.ExcludeElfFiles);
+ Writer.AddBool("ExcludeMachOFiles"sv, m_Settings.ExcludeMachOFiles);
+
+ Writer.AddInteger("ChunkFileSizeLimit"sv, m_Settings.ChunkFileSizeLimit);
+
+ WriteChunkParams(Writer, m_Settings.ChunkingParams);
+
+ Writer.AddInteger("FixedChunkingChunkSize"sv, m_Settings.FixedChunkingChunkSize);
+ Writer.AddInteger("MinSizeForFixedChunking"sv, m_Settings.MinSizeForFixedChunking);
return Writer.Save();
}
static constexpr std::string_view Name = "ChunkingControllerWithFixedChunking"sv;
-protected:
- const std::vector<std::string> m_FixedChunkingExtensions;
- const uint64_t m_ChunkFileSizeLimit;
- const ChunkedParams m_ChunkingParams;
- const uint32_t m_FixedChunkingChunkSize;
+private:
+ static ChunkingControllerWithFixedChunkingSettings ReadSettings(CbObjectView Parameters)
+ {
+ return ChunkingControllerWithFixedChunkingSettings{
+ .FixedChunkingExtensions = ReadStringArray(Parameters["FixedChunkingExtensions"sv].AsArrayView()),
+ .ExcludeExtensions = ReadStringArray(Parameters["ChunkExcludeExtensions"sv].AsArrayView()),
+ .ExcludeElfFiles = Parameters["ExcludeElfFiles"sv].AsBool(DefaultChunkingExcludeElfFiles),
+ .ExcludeMachOFiles = Parameters["ExcludeMachOFiles"sv].AsBool(DefaultChunkingExcludeMachOFiles),
+ .ChunkFileSizeLimit = Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit),
+ .ChunkingParams = ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView()),
+ .FixedChunkingChunkSize = Parameters["FixedChunkingChunkSize"sv].AsUInt64(DefaultFixedChunkingChunkSize),
+ .MinSizeForFixedChunking = Parameters["MinSizeForFixedChunking"sv].AsUInt64(DefaultFixedChunkingChunkSize)};
+ }
+
+ const ChunkingControllerWithFixedChunkingSettings m_Settings;
};
std::unique_ptr<ChunkingController>
-CreateBasicChunkingController(std::span<const std::string_view> ExcludeExtensions,
- uint64_t ChunkFileSizeLimit,
- const ChunkedParams& ChunkingParams)
+CreateBasicChunkingController(const BasicChunkingControllerSettings& Settings)
{
- return std::make_unique<BasicChunkingController>(ExcludeExtensions, ChunkFileSizeLimit, ChunkingParams);
+ return std::make_unique<BasicChunkingController>(Settings);
}
std::unique_ptr<ChunkingController>
CreateBasicChunkingController(CbObjectView Parameters)
@@ -242,15 +332,9 @@ CreateBasicChunkingController(CbObjectView Parameters)
}
std::unique_ptr<ChunkingController>
-CreateChunkingControllerWithFixedChunking(std::span<const std::string_view> FixedChunkingExtensions,
- uint64_t ChunkFileSizeLimit,
- const ChunkedParams& ChunkingParams,
- uint32_t FixedChunkingChunkSize)
+CreateChunkingControllerWithFixedChunking(const ChunkingControllerWithFixedChunkingSettings& Setting)
{
- return std::make_unique<ChunkingControllerWithFixedChunking>(FixedChunkingExtensions,
- ChunkFileSizeLimit,
- ChunkingParams,
- FixedChunkingChunkSize);
+ return std::make_unique<ChunkingControllerWithFixedChunking>(Setting);
}
std::unique_ptr<ChunkingController>
CreateChunkingControllerWithFixedChunking(CbObjectView Parameters)
diff --git a/src/zenutil/commandlineoptions.cpp b/src/zenutil/commandlineoptions.cpp
new file mode 100644
index 000000000..afef7f6f2
--- /dev/null
+++ b/src/zenutil/commandlineoptions.cpp
@@ -0,0 +1,221 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/commandlineoptions.h>
+
+#include <filesystem>
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+#endif // ZEN_WITH_TESTS
+
+void
+cxxopts::values::parse_value(const std::string& text, std::filesystem::path& value)
+{
+ value = zen::StringToPath(text);
+}
+
+namespace zen {
+
+std::vector<std::string>
+ParseCommandLine(std::string_view CommandLine)
+{
+ auto IsWhitespaceOrEnd = [](std::string_view CommandLine, std::string::size_type Pos) {
+ if (Pos == CommandLine.length())
+ {
+ return true;
+ }
+ if (CommandLine[Pos] == ' ')
+ {
+ return true;
+ }
+ return false;
+ };
+
+ bool IsParsingArg = false;
+ bool IsInQuote = false;
+
+ std::string::size_type Pos = 0;
+ std::string::size_type ArgStart = 0;
+ std::vector<std::string> Args;
+ while (Pos < CommandLine.length())
+ {
+ if (IsInQuote)
+ {
+ if (CommandLine[Pos] == '"' && IsWhitespaceOrEnd(CommandLine, Pos + 1))
+ {
+ Args.push_back(std::string(CommandLine.substr(ArgStart, Pos - ArgStart + 1)));
+ Pos++;
+ IsInQuote = false;
+ IsParsingArg = false;
+ }
+ else
+ {
+ Pos++;
+ }
+ }
+ else if (IsParsingArg)
+ {
+ ZEN_ASSERT(Pos > ArgStart);
+ if (CommandLine[Pos] == ' ')
+ {
+ Args.push_back(std::string(CommandLine.substr(ArgStart, Pos - ArgStart)));
+ Pos++;
+ IsParsingArg = false;
+ }
+ else if (CommandLine[Pos] == '"')
+ {
+ IsInQuote = true;
+ Pos++;
+ }
+ else
+ {
+ Pos++;
+ }
+ }
+ else if (CommandLine[Pos] == '"')
+ {
+ IsInQuote = true;
+ IsParsingArg = true;
+ ArgStart = Pos;
+ Pos++;
+ }
+ else if (CommandLine[Pos] != ' ')
+ {
+ IsParsingArg = true;
+ ArgStart = Pos;
+ Pos++;
+ }
+ else
+ {
+ Pos++;
+ }
+ }
+ if (IsParsingArg)
+ {
+ ZEN_ASSERT(Pos > ArgStart);
+ Args.push_back(std::string(CommandLine.substr(ArgStart)));
+ }
+
+ return Args;
+}
+
+std::vector<char*>
+StripCommandlineQuotes(std::vector<std::string>& InOutArgs)
+{
+ std::vector<char*> RawArgs;
+ RawArgs.reserve(InOutArgs.size());
+ for (std::string& Arg : InOutArgs)
+ {
+ std::string::size_type EscapedQuotePos = Arg.find("\\\"", 1);
+ while (EscapedQuotePos != std::string::npos && Arg.rfind('\"', EscapedQuotePos - 1) != std::string::npos)
+ {
+ Arg.erase(EscapedQuotePos, 1);
+ EscapedQuotePos = Arg.find("\\\"", EscapedQuotePos);
+ }
+
+ if (Arg.starts_with("\""))
+ {
+ if (Arg.find('"', 1) == Arg.length() - 1)
+ {
+ if (Arg.find(' ', 1) == std::string::npos)
+ {
+ Arg = Arg.substr(1, Arg.length() - 2);
+ }
+ }
+ }
+ RawArgs.push_back(const_cast<char*>(Arg.c_str()));
+ }
+ return RawArgs;
+}
+
+void
+MakeSafeAbsolutePathÍnPlace(std::filesystem::path& Path)
+{
+ if (!Path.empty())
+ {
+ std::filesystem::path AbsolutePath = std::filesystem::absolute(Path).make_preferred();
+#if ZEN_PLATFORM_WINDOWS
+ const std::string_view Prefix = "\\\\?\\";
+ const std::u8string PrefixU8(Prefix.begin(), Prefix.end());
+ std::u8string PathString = AbsolutePath.u8string();
+ if (!PathString.empty() && !PathString.starts_with(PrefixU8))
+ {
+ PathString.insert(0, PrefixU8);
+ Path = PathString;
+ }
+#endif // ZEN_PLATFORM_WINDOWS
+ }
+}
+
+std::filesystem::path
+MakeSafeAbsolutePath(const std::filesystem::path& Path)
+{
+ std::filesystem::path Tmp(Path);
+ MakeSafeAbsolutePathÍnPlace(Tmp);
+ return Tmp;
+}
+
+std::filesystem::path
+StringToPath(const std::string_view& Path)
+{
+ std::string_view UnquotedPath = RemoveQuotes(Path);
+
+ if (UnquotedPath.ends_with('/') || UnquotedPath.ends_with('\\') || UnquotedPath.ends_with(std::filesystem::path::preferred_separator))
+ {
+ UnquotedPath = UnquotedPath.substr(0, UnquotedPath.length() - 1);
+ }
+
+ return std::filesystem::path(UnquotedPath).make_preferred();
+}
+
+std::string_view
+RemoveQuotes(const std::string_view& Arg)
+{
+ if (Arg.length() > 2)
+ {
+ if (Arg[0] == '"' && Arg[Arg.length() - 1] == '"')
+ {
+ return Arg.substr(1, Arg.length() - 2);
+ }
+ }
+ return Arg;
+}
+
+#if ZEN_WITH_TESTS
+
+void
+commandlineoptions_forcelink()
+{
+}
+
+TEST_CASE("CommandLine")
+{
+ std::vector<std::string> v1 = ParseCommandLine("c:\\my\\exe.exe \"quoted arg\" \"one\",two,\"three\\\"");
+ CHECK_EQ(v1[0], "c:\\my\\exe.exe");
+ CHECK_EQ(v1[1], "\"quoted arg\"");
+ CHECK_EQ(v1[2], "\"one\",two,\"three\\\"");
+
+ std::vector<std::string> v2 = ParseCommandLine(
+ "--tracehost 127.0.0.1 builds download --url=https://jupiter.devtools.epicgames.com --namespace=ue.oplog "
+ "--bucket=citysample.packaged-build.fortnite-main.windows \"c:\\just\\a\\path\" "
+ "--access-token-path=\"C:\\Users\\dan.engelbrecht\\jupiter-token.json\" \"D:\\Dev\\Spaced Folder\\Target\\\" "
+ "--alt-path=\"D:\\Dev\\Spaced Folder2\\Target\\\" 07dn23ifiwesnvoasjncasab --build-part-name win64,linux,ps5");
+
+ std::vector<char*> v2Stripped = StripCommandlineQuotes(v2);
+ CHECK_EQ(v2Stripped[0], std::string("--tracehost"));
+ CHECK_EQ(v2Stripped[1], std::string("127.0.0.1"));
+ CHECK_EQ(v2Stripped[2], std::string("builds"));
+ CHECK_EQ(v2Stripped[3], std::string("download"));
+ CHECK_EQ(v2Stripped[4], std::string("--url=https://jupiter.devtools.epicgames.com"));
+ CHECK_EQ(v2Stripped[5], std::string("--namespace=ue.oplog"));
+ CHECK_EQ(v2Stripped[6], std::string("--bucket=citysample.packaged-build.fortnite-main.windows"));
+ CHECK_EQ(v2Stripped[7], std::string("c:\\just\\a\\path"));
+ CHECK_EQ(v2Stripped[8], std::string("--access-token-path=\"C:\\Users\\dan.engelbrecht\\jupiter-token.json\""));
+ CHECK_EQ(v2Stripped[9], std::string("\"D:\\Dev\\Spaced Folder\\Target\""));
+ CHECK_EQ(v2Stripped[10], std::string("--alt-path=\"D:\\Dev\\Spaced Folder2\\Target\""));
+ CHECK_EQ(v2Stripped[11], std::string("07dn23ifiwesnvoasjncasab"));
+ CHECK_EQ(v2Stripped[12], std::string("--build-part-name"));
+ CHECK_EQ(v2Stripped[13], std::string("win64,linux,ps5"));
+}
+
+#endif
+} // namespace zen
diff --git a/src/zenutil/environmentoptions.cpp b/src/zenutil/environmentoptions.cpp
new file mode 100644
index 000000000..1b7ce8029
--- /dev/null
+++ b/src/zenutil/environmentoptions.cpp
@@ -0,0 +1,84 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/environmentoptions.h>
+
+#include <zencore/filesystem.h>
+
+namespace zen {
+
+EnvironmentOptions::StringOption::StringOption(std::string& Value) : RefValue(Value)
+{
+}
+void
+EnvironmentOptions::StringOption::Parse(std::string_view Value)
+{
+ RefValue = std::string(Value);
+}
+
+EnvironmentOptions::FilePathOption::FilePathOption(std::filesystem::path& Value) : RefValue(Value)
+{
+}
+void
+EnvironmentOptions::FilePathOption::Parse(std::string_view Value)
+{
+ RefValue = MakeSafeAbsolutePath(Value);
+}
+
+EnvironmentOptions::BoolOption::BoolOption(bool& Value) : RefValue(Value)
+{
+}
+void
+EnvironmentOptions::BoolOption::Parse(std::string_view Value)
+{
+ const std::string Lower = ToLower(Value);
+ if (Lower == "true" || Lower == "y" || Lower == "yes")
+ {
+ RefValue = true;
+ }
+ else if (Lower == "false" || Lower == "n" || Lower == "no")
+ {
+ RefValue = false;
+ }
+}
+
+std::shared_ptr<EnvironmentOptions::OptionValue>
+EnvironmentOptions::MakeOption(std::string& Value)
+{
+ return std::make_shared<StringOption>(Value);
+}
+
+std::shared_ptr<EnvironmentOptions::OptionValue>
+EnvironmentOptions::MakeOption(std::filesystem::path& Value)
+{
+ return std::make_shared<FilePathOption>(Value);
+}
+
+std::shared_ptr<EnvironmentOptions::OptionValue>
+EnvironmentOptions::MakeOption(bool& Value)
+{
+ return std::make_shared<BoolOption>(Value);
+}
+
+EnvironmentOptions::EnvironmentOptions()
+{
+}
+
+void
+EnvironmentOptions::Parse(const cxxopts::ParseResult& CmdLineResult)
+{
+ for (auto& It : OptionMap)
+ {
+ std::string_view EnvName = It.first;
+ const Option& Opt = It.second;
+ if (CmdLineResult.count(Opt.CommandLineOptionName) == 0)
+ {
+ std::string EnvValue = GetEnvVariable(It.first);
+ if (!EnvValue.empty())
+ {
+ Opt.Value->Parse(EnvValue);
+ }
+ }
+ }
+}
+
+} // namespace zen
diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp
index 47a4e1cc4..c2cc5ab3c 100644
--- a/src/zenutil/filebuildstorage.cpp
+++ b/src/zenutil/filebuildstorage.cpp
@@ -35,6 +35,27 @@ public:
virtual ~FileBuildStorage() {}
+ virtual CbObject ListNamespaces(bool bRecursive) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::ListNamespaces");
+ ZEN_UNUSED(bRecursive);
+
+ SimulateLatency(0, 0);
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ CbObjectWriter Writer;
+ Writer.BeginArray("results");
+ {
+ }
+ Writer.EndArray(); // results
+ Writer.Save();
+ SimulateLatency(Writer.GetSaveSize(), 0);
+ return Writer.Save();
+ }
+
virtual CbObject ListBuilds(CbObject Query) override
{
ZEN_TRACE_CPU("FileBuildStorage::ListBuilds");
@@ -66,7 +87,7 @@ public:
}
}
}
- Writer.EndArray(); // builds
+ Writer.EndArray(); // results
Writer.Save();
SimulateLatency(Writer.GetSaveSize(), 0);
return Writer.Save();
@@ -235,7 +256,7 @@ public:
m_Stats.TotalRequestCount++;
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (!std::filesystem::is_regular_file(BlockPath))
+ if (!IsFile(BlockPath))
{
CreateDirectories(BlockPath.parent_path());
TemporaryFile::SafeWriteFile(BlockPath, Payload.Flatten().GetView());
@@ -260,7 +281,7 @@ public:
m_Stats.TotalRequestCount++;
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (!std::filesystem::is_regular_file(BlockPath))
+ if (!IsFile(BlockPath))
{
CreateDirectories(BlockPath.parent_path());
@@ -346,7 +367,7 @@ public:
m_Stats.TotalRequestCount++;
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (std::filesystem::is_regular_file(BlockPath))
+ if (IsFile(BlockPath))
{
BasicFile File(BlockPath, BasicFile::Mode::kRead);
IoBuffer Payload;
@@ -369,11 +390,11 @@ public:
return IoBuffer{};
}
- virtual std::vector<std::function<void()>> GetLargeBuildBlob(
- const Oid& BuildId,
- const IoHash& RawHash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) override
+ virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete) override
{
ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob");
ZEN_UNUSED(BuildId);
@@ -383,20 +404,22 @@ public:
m_Stats.TotalRequestCount++;
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (std::filesystem::is_regular_file(BlockPath))
+ if (IsFile(BlockPath))
{
struct WorkloadData
{
- std::atomic<uint64_t> BytesRemaining;
- BasicFile BlobFile;
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver;
+ std::atomic<uint64_t> BytesRemaining;
+ BasicFile BlobFile;
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive;
+ std::function<void()> OnComplete;
};
std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
Workload->BlobFile.Open(BlockPath, BasicFile::Mode::kRead);
const uint64_t BlobSize = Workload->BlobFile.FileSize();
- Workload->Receiver = std::move(Receiver);
+ Workload->OnReceive = std::move(OnReceive);
+ Workload->OnComplete = std::move(OnComplete);
Workload->BytesRemaining = BlobSize;
std::vector<std::function<void()>> WorkItems;
@@ -410,8 +433,12 @@ public:
IoBuffer PartPayload(Size);
Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset);
m_Stats.TotalBytesRead += PartPayload.GetSize();
+ Workload->OnReceive(Offset, PartPayload);
uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size);
- Workload->Receiver(Offset, PartPayload, ByteRemaning);
+ if (ByteRemaning == Size)
+ {
+ Workload->OnComplete();
+ }
SimulateLatency(Size, PartPayload.GetSize());
});
@@ -423,7 +450,7 @@ public:
return {};
}
- virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override
+ virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override
{
ZEN_TRACE_CPU("FileBuildStorage::PutBlockMetadata");
ZEN_UNUSED(BuildId);
@@ -440,68 +467,107 @@ public:
m_Stats.TotalBytesWritten += MetaData.GetSize();
WriteAsJson(BlockMetaDataPath, MetaData);
SimulateLatency(0, 0);
+ return true;
}
- virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) override
+ virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override
{
ZEN_TRACE_CPU("FileBuildStorage::FindBlocks");
ZEN_UNUSED(BuildId);
- SimulateLatency(0, 0);
+ SimulateLatency(sizeof(BuildId), 0);
Stopwatch ExecutionTimer;
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
m_Stats.TotalRequestCount++;
+ uint64_t FoundCount = 0;
+
DirectoryContent Content;
GetDirectoryContent(GetBlobsMetadataFolder(), DirectoryContentFlags::IncludeFiles, Content);
- std::vector<ChunkBlockDescription> Result;
+ CbObjectWriter Writer;
+ Writer.BeginArray("blocks");
for (const std::filesystem::path& MetaDataFile : Content.Files)
{
IoHash ChunkHash;
if (IoHash::TryParse(MetaDataFile.stem().string(), ChunkHash))
{
std::filesystem::path BlockPath = GetBlobPayloadPath(ChunkHash);
- if (std::filesystem::is_regular_file(BlockPath))
+ if (IsFile(BlockPath))
{
IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten();
m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize();
CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
- Result.emplace_back(ParseChunkBlockDescription(BlockObject));
+ Writer.AddObject(BlockObject);
+ FoundCount++;
+ if (FoundCount == MaxBlockCount)
+ {
+ break;
+ }
}
}
}
- SimulateLatency(0, sizeof(IoHash) * Result.size());
+ Writer.EndArray(); // blocks
+ CbObject Result = Writer.Save();
+ SimulateLatency(0, Result.GetSize());
return Result;
}
- virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
+ virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
{
ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata");
ZEN_UNUSED(BuildId);
- SimulateLatency(0, 0);
+ SimulateLatency(sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(), 0);
Stopwatch ExecutionTimer;
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
m_Stats.TotalRequestCount++;
- std::vector<ChunkBlockDescription> Result;
+ CbObjectWriter Writer;
+ Writer.BeginArray("blocks");
+
for (const IoHash& BlockHash : BlockHashes)
{
std::filesystem::path MetaDataFile = GetBlobMetadataPath(BlockHash);
- if (std::filesystem::is_regular_file(MetaDataFile))
+ if (IsFile(MetaDataFile))
{
IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten();
m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize();
CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
- Result.emplace_back(ParseChunkBlockDescription(BlockObject));
+ Writer.AddObject(BlockObject);
}
}
- SimulateLatency(sizeof(BlockHashes) * BlockHashes.size(), sizeof(ChunkBlockDescription) * Result.size());
+ Writer.EndArray(); // blocks
+ CbObject Result = Writer.Save();
+ SimulateLatency(0, Result.GetSize());
return Result;
}
+ virtual void PutBuildPartStats(const Oid& BuildId,
+ const Oid& BuildPartId,
+ const tsl::robin_map<std::string, double>& FloatStats) override
+ {
+ CbObjectWriter Request;
+ Request.BeginObject("floatStats"sv);
+ for (auto It : FloatStats)
+ {
+ Request.AddFloat(It.first, It.second);
+ }
+ Request.EndObject();
+ CbObject Payload = Request.Save();
+
+ SimulateLatency(Payload.GetSize(), 0);
+
+ const std::filesystem::path BuildPartStatsDataPath = GetBuildPartStatsPath(BuildId, BuildPartId);
+ CreateDirectories(BuildPartStatsDataPath.parent_path());
+
+ TemporaryFile::SafeWriteFile(BuildPartStatsDataPath, Payload.GetView());
+ WriteAsJson(BuildPartStatsDataPath, Payload);
+
+ SimulateLatency(0, 0);
+ }
+
protected:
std::filesystem::path GetBuildsFolder() const { return m_StoragePath / "builds"; }
std::filesystem::path GetBlobsFolder() const { return m_StoragePath / "blobs"; }
@@ -520,6 +586,11 @@ protected:
return GetBuildPartFolder(BuildId, BuildPartId) / "metadata.cb";
}
+ std::filesystem::path GetBuildPartStatsPath(const Oid& BuildId, const Oid& BuildPartId) const
+ {
+ return GetBuildPartFolder(BuildId, BuildPartId) / fmt::format("stats_{}.cb", Oid::NewOid());
+ }
+
std::filesystem::path GetBlobPayloadPath(const IoHash& RawHash) const { return GetBlobsFolder() / fmt::format("{}.cbz", RawHash); }
std::filesystem::path GetBlobMetadataPath(const IoHash& RawHash) const
@@ -587,7 +658,7 @@ protected:
BuildPartObject.IterateAttachments([&](CbFieldView FieldView) {
const IoHash AttachmentHash = FieldView.AsBinaryAttachment();
const std::filesystem::path BlockPath = GetBlobPayloadPath(AttachmentHash);
- if (!std::filesystem::is_regular_file(BlockPath))
+ if (!IsFile(BlockPath))
{
NeededAttachments.push_back(AttachmentHash);
}
@@ -608,13 +679,24 @@ protected:
{
return false;
}
- CompositeBuffer Decompressed = ValidateBuffer.DecompressToComposite();
- if (!Decompressed)
+
+ IoHashStream Hash;
+ bool CouldDecompress = ValidateBuffer.DecompressToStream(
+ 0,
+ (uint64_t)-1,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset, SourceSize, Offset);
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ Hash.Append(Segment.GetView());
+ }
+ return true;
+ });
+ if (!CouldDecompress)
{
return false;
}
- IoHash Hash = IoHash::HashBuffer(Decompressed);
- if (Hash != RawHash)
+ if (Hash.GetHash() != VerifyHash)
{
return false;
}
diff --git a/src/zenutil/include/zenutil/bufferedwritefilecache.h b/src/zenutil/include/zenutil/bufferedwritefilecache.h
new file mode 100644
index 000000000..68d6c375e
--- /dev/null
+++ b/src/zenutil/include/zenutil/bufferedwritefilecache.h
@@ -0,0 +1,106 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/basicfile.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+class CompositeBuffer;
+
+class BufferedWriteFileCache
+{
+public:
+ BufferedWriteFileCache(const BufferedWriteFileCache&) = delete;
+ BufferedWriteFileCache& operator=(const BufferedWriteFileCache&) = delete;
+
+ BufferedWriteFileCache();
+
+ ~BufferedWriteFileCache();
+
+ std::unique_ptr<BasicFile> Get(uint32_t FileIndex);
+
+ void Put(uint32_t FileIndex, std::unique_ptr<BasicFile>&& Writer);
+
+ void Close(std::span<uint32_t> FileIndexes);
+
+ class Local
+ {
+ public:
+ struct Writer
+ {
+ std::unique_ptr<BasicFile> File;
+ std::unique_ptr<BasicFileWriter> Writer;
+
+ inline void Write(const CompositeBuffer& Chunk, uint64_t FileOffset)
+ {
+ if (Writer)
+ {
+ Writer->Write(Chunk, FileOffset);
+ }
+ else
+ {
+ File->Write(Chunk, FileOffset);
+ }
+ }
+ };
+
+ Local(const Local&) = delete;
+ Local& operator=(const Local&) = delete;
+
+ explicit Local(BufferedWriteFileCache& Cache);
+ ~Local();
+
+ Writer* GetWriter(uint32_t FileIndex);
+ Writer* PutWriter(uint32_t FileIndex, std::unique_ptr<Writer> Writer);
+
+ private:
+ tsl::robin_map<uint32_t, uint32_t> m_FileIndexToWriterIndex;
+ std::vector<std::unique_ptr<Writer>> m_ChunkWriters;
+ BufferedWriteFileCache& m_Cache;
+ };
+
+private:
+ static constexpr size_t MaxHandlesPerPath = 7;
+ static constexpr size_t MaxBufferedCount = 1024;
+ struct TOpenHandles
+ {
+ BasicFile* Files[MaxHandlesPerPath];
+ uint64_t Size = 0;
+ inline BasicFile* Pop()
+ {
+ if (Size > 0)
+ {
+ return Files[--Size];
+ }
+ else
+ {
+ return nullptr;
+ }
+ }
+ inline bool Push(BasicFile* File)
+ {
+ if (Size < MaxHandlesPerPath)
+ {
+ Files[Size++] = File;
+ return true;
+ }
+ return false;
+ }
+ };
+ static_assert(sizeof(TOpenHandles) == 64);
+
+ RwLock m_WriterLock;
+ tsl::robin_map<uint32_t, uint32_t> m_ChunkWriters;
+ std::vector<TOpenHandles> m_OpenFiles;
+ std::atomic<uint32_t> m_CacheHitCount;
+ std::atomic<uint32_t> m_CacheMissCount;
+ std::atomic<uint32_t> m_OpenHandleCount;
+ std::atomic<uint32_t> m_DroppedHandleCount;
+};
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h
index 9d2bab170..f49d4b42a 100644
--- a/src/zenutil/include/zenutil/buildstorage.h
+++ b/src/zenutil/include/zenutil/buildstorage.h
@@ -5,6 +5,10 @@
#include <zencore/compactbinary.h>
#include <zenutil/chunkblock.h>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
namespace zen {
class BuildStorage
@@ -21,6 +25,7 @@ public:
virtual ~BuildStorage() {}
+ virtual CbObject ListNamespaces(bool bRecursive = false) = 0;
virtual CbObject ListBuilds(CbObject Query) = 0;
virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) = 0;
virtual CbObject GetBuild(const Oid& BuildId) = 0;
@@ -43,16 +48,18 @@ public:
virtual IoBuffer GetBuildBlob(const Oid& BuildId,
const IoHash& RawHash,
uint64_t RangeOffset = 0,
- uint64_t RangeBytes = (uint64_t)-1) = 0;
- virtual std::vector<std::function<void()>> GetLargeBuildBlob(
- const Oid& BuildId,
- const IoHash& RawHash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) = 0;
-
- virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0;
- virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) = 0;
- virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0;
+ uint64_t RangeBytes = (uint64_t)-1) = 0;
+ virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete) = 0;
+
+ [[nodiscard]] virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0;
+ virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) = 0;
+ virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0;
+
+ virtual void PutBuildPartStats(const Oid& BuildId, const Oid& BuildPartId, const tsl::robin_map<std::string, double>& FloatStats) = 0;
};
} // namespace zen
diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h
new file mode 100644
index 000000000..e1fb73fd4
--- /dev/null
+++ b/src/zenutil/include/zenutil/buildstoragecache.h
@@ -0,0 +1,57 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/logging.h>
+
+#include <zencore/compactbinary.h>
+#include <zencore/compositebuffer.h>
+#include <zenutil/chunkblock.h>
+
+namespace zen {
+
+class HttpClient;
+
+class BuildStorageCache
+{
+public:
+ struct Statistics
+ {
+ std::atomic<uint64_t> TotalBytesRead = 0;
+ std::atomic<uint64_t> TotalBytesWritten = 0;
+ std::atomic<uint64_t> TotalRequestCount = 0;
+ std::atomic<uint64_t> TotalRequestTimeUs = 0;
+ std::atomic<uint64_t> TotalExecutionTimeUs = 0;
+ };
+
+ virtual ~BuildStorageCache() {}
+
+ virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) = 0;
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t RangeOffset = 0,
+ uint64_t RangeBytes = (uint64_t)-1) = 0;
+
+ virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) = 0;
+ virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0;
+
+ struct BlobExistsResult
+ {
+ bool HasBody = 0;
+ bool HasMetadata = 0;
+ };
+
+ virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0;
+
+ virtual void Flush(
+ int32_t UpdateIntervalMS,
+ std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0;
+};
+
+std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& HttpClient,
+ BuildStorageCache::Statistics& Stats,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const std::filesystem::path& TempFolderPath,
+ bool BoostBackgroundThreadCount);
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h
index 57b55cb8e..306a5d990 100644
--- a/src/zenutil/include/zenutil/chunkedcontent.h
+++ b/src/zenutil/include/zenutil/chunkedcontent.h
@@ -67,7 +67,7 @@ FolderContent GetFolderContent(GetFolderContentStatistics& Stats,
std::function<bool(const std::string_view& RelativePath)>&& AcceptDirectory,
std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& AcceptFile,
WorkerThreadPool& WorkerPool,
- int32_t UpdateInteralMS,
+ int32_t UpdateIntervalMS,
std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
std::atomic<bool>& AbortFlag);
@@ -94,10 +94,31 @@ struct ChunkedFolderContent
ChunkedContentData ChunkedContent;
};
+struct ChunkedContentLookup
+{
+ struct ChunkSequenceLocation
+ {
+ uint32_t SequenceIndex = (uint32_t)-1;
+ uint64_t Offset = (uint64_t)-1;
+ };
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceIndex;
+ std::vector<uint32_t> SequenceIndexChunkOrderOffset;
+ std::vector<ChunkSequenceLocation> ChunkSequenceLocations;
+ std::vector<size_t>
+ ChunkSequenceLocationOffset; // ChunkSequenceLocations[ChunkLocationOffset[ChunkIndex]] -> start of sources for ChunkIndex
+ std::vector<uint32_t> ChunkSequenceLocationCounts; // ChunkSequenceLocationCounts[ChunkIndex] count of chunk locations for ChunkIndex
+ std::vector<uint32_t> SequenceIndexFirstPathIndex; // SequenceIndexFirstPathIndex[SequenceIndex] -> first path index with that RawHash
+ std::vector<uint32_t> PathExtensionHash;
+};
+
void SaveChunkedFolderContentToCompactBinary(const ChunkedFolderContent& Content, CbWriter& Output);
ChunkedFolderContent LoadChunkedFolderContentToCompactBinary(CbObjectView Input);
ChunkedFolderContent MergeChunkedFolderContents(const ChunkedFolderContent& Base, std::span<const ChunkedFolderContent> Overlays);
+ChunkedFolderContent DeletePathsFromChunkedContent(const ChunkedFolderContent& Base,
+ const ChunkedContentLookup& BaseContentLookup,
+ std::span<const std::filesystem::path> DeletedPaths);
ChunkedFolderContent DeletePathsFromChunkedContent(const ChunkedFolderContent& Base, std::span<const std::filesystem::path> DeletedPaths);
struct ChunkingStatistics
@@ -111,31 +132,15 @@ struct ChunkingStatistics
uint64_t ElapsedWallTimeUS = 0;
};
-ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats,
- WorkerThreadPool& WorkerPool,
- const std::filesystem::path& RootPath,
- const FolderContent& Content,
- const ChunkingController& InChunkingController,
- int32_t UpdateInteralMS,
- std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
- std::atomic<bool>& AbortFlag);
-
-struct ChunkedContentLookup
-{
- struct ChunkSequenceLocation
- {
- uint32_t SequenceIndex = (uint32_t)-1;
- uint64_t Offset = (uint64_t)-1;
- };
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceIndex;
- std::vector<uint32_t> SequenceIndexChunkOrderOffset;
- std::vector<ChunkSequenceLocation> ChunkSequenceLocations;
- std::vector<size_t>
- ChunkSequenceLocationOffset; // ChunkSequenceLocations[ChunkLocationOffset[ChunkIndex]] -> start of sources for ChunkIndex
- std::vector<uint32_t> ChunkSequenceLocationCounts; // ChunkSequenceLocationCounts[ChunkIndex] count of chunk locations for ChunkIndex
- std::vector<uint32_t> SequenceIndexFirstPathIndex; // SequenceIndexFirstPathIndex[SequenceIndex] -> first path index with that RawHash
-};
+ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats,
+ WorkerThreadPool& WorkerPool,
+ const std::filesystem::path& RootPath,
+ const FolderContent& Content,
+ const ChunkingController& InChunkingController,
+ int32_t UpdateIntervalMS,
+ std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)>&& UpdateCallback,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag);
ChunkedContentLookup BuildChunkedContentLookup(const ChunkedFolderContent& Content);
diff --git a/src/zenutil/include/zenutil/chunkingcontroller.h b/src/zenutil/include/zenutil/chunkingcontroller.h
index 246f4498a..315502265 100644
--- a/src/zenutil/include/zenutil/chunkingcontroller.h
+++ b/src/zenutil/include/zenutil/chunkingcontroller.h
@@ -11,7 +11,11 @@
namespace zen {
-const std::vector<std::string_view> DefaultChunkingExcludeExtensions = {".exe", ".dll", ".pdb", ".self", ".mp4"};
+const std::vector<std::string> DefaultChunkingExcludeExtensions =
+ {".exe", ".dll", ".pdb", ".self", ".mp4", ".zip", ".7z", ".bzip", ".rar", ".gzip"};
+const std::vector<std::string> DefaultFixedChunkingExtensions = {".apk", ".nsp", ".xvc", ".pkg", ".dmg", ".ipa"};
+const bool DefaultChunkingExcludeElfFiles = true;
+const bool DefaultChunkingExcludeMachOFiles = true;
const ChunkedParams DefaultChunkedParams = {.MinSize = ((8u * 1u) * 1024u) - 128u,
.MaxSize = 128u * 1024u,
@@ -19,7 +23,8 @@ const ChunkedParams DefaultChunkedParams = {.MinSize = ((8u * 1u) * 1024u) - 128
const size_t DefaultChunkingFileSizeLimit = DefaultChunkedParams.MaxSize;
-const uint32_t DefaultFixedChunkingChunkSize = 16u * 1024u * 1024u;
+const uint64_t DefaultFixedChunkingChunkSize = 32u * 1024u * 1024u;
+const uint64_t DefaultMinSizeForFixedChunking = DefaultFixedChunkingChunkSize * 8u;
struct ChunkedInfoWithSource;
@@ -38,17 +43,31 @@ public:
virtual CbObject GetParameters() const = 0;
};
-std::unique_ptr<ChunkingController> CreateBasicChunkingController(
- std::span<const std::string_view> ExcludeExtensions = DefaultChunkingExcludeExtensions,
- uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit,
- const ChunkedParams& ChunkingParams = DefaultChunkedParams);
+struct BasicChunkingControllerSettings
+{
+ std::vector<std::string> ExcludeExtensions = DefaultChunkingExcludeExtensions;
+ bool ExcludeElfFiles = DefaultChunkingExcludeElfFiles;
+ bool ExcludeMachOFiles = DefaultChunkingExcludeMachOFiles;
+ uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit;
+ ChunkedParams ChunkingParams = DefaultChunkedParams;
+};
+
+std::unique_ptr<ChunkingController> CreateBasicChunkingController(const BasicChunkingControllerSettings& Settings);
std::unique_ptr<ChunkingController> CreateBasicChunkingController(CbObjectView Parameters);
-std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(
- std::span<const std::string_view> ExcludeExtensions = DefaultChunkingExcludeExtensions,
- uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit,
- const ChunkedParams& ChunkingParams = DefaultChunkedParams,
- uint32_t FixedChunkingChunkSize = DefaultFixedChunkingChunkSize);
+struct ChunkingControllerWithFixedChunkingSettings
+{
+ std::vector<std::string> FixedChunkingExtensions = DefaultFixedChunkingExtensions;
+ std::vector<std::string> ExcludeExtensions = DefaultChunkingExcludeExtensions;
+ bool ExcludeElfFiles = DefaultChunkingExcludeElfFiles;
+ bool ExcludeMachOFiles = DefaultChunkingExcludeMachOFiles;
+ uint64_t ChunkFileSizeLimit = DefaultChunkingFileSizeLimit;
+ ChunkedParams ChunkingParams = DefaultChunkedParams;
+ uint64_t FixedChunkingChunkSize = DefaultFixedChunkingChunkSize;
+ uint64_t MinSizeForFixedChunking = DefaultMinSizeForFixedChunking;
+};
+
+std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(const ChunkingControllerWithFixedChunkingSettings& Setting);
std::unique_ptr<ChunkingController> CreateChunkingControllerWithFixedChunking(CbObjectView Parameters);
std::unique_ptr<ChunkingController> CreateChunkingController(std::string_view Name, CbObjectView Parameters);
diff --git a/src/zenutil/include/zenutil/commandlineoptions.h b/src/zenutil/include/zenutil/commandlineoptions.h
new file mode 100644
index 000000000..f927d41e5
--- /dev/null
+++ b/src/zenutil/include/zenutil/commandlineoptions.h
@@ -0,0 +1,29 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+#include <filesystem>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+
+namespace cxxopts::values {
+// We declare this specialization before including cxxopts to make it stick
+void parse_value(const std::string& text, std::filesystem::path& value);
+} // namespace cxxopts::values
+
+#include <cxxopts.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+std::vector<std::string> ParseCommandLine(std::string_view CommandLine);
+std::vector<char*> StripCommandlineQuotes(std::vector<std::string>& InOutArgs);
+void MakeSafeAbsolutePathÍnPlace(std::filesystem::path& Path);
+[[nodiscard]] std::filesystem::path MakeSafeAbsolutePath(const std::filesystem::path& Path);
+std::filesystem::path StringToPath(const std::string_view& Path);
+std::string_view RemoveQuotes(const std::string_view& Arg);
+
+void commandlineoptions_forcelink(); // internal
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/environmentoptions.h b/src/zenutil/include/zenutil/environmentoptions.h
new file mode 100644
index 000000000..7418608e4
--- /dev/null
+++ b/src/zenutil/include/zenutil/environmentoptions.h
@@ -0,0 +1,92 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/string.h>
+#include <zenutil/commandlineoptions.h>
+
+namespace zen {
+
+class EnvironmentOptions
+{
+public:
+ class OptionValue
+ {
+ public:
+ virtual void Parse(std::string_view Value) = 0;
+
+ virtual ~OptionValue() {}
+ };
+
+ class StringOption : public OptionValue
+ {
+ public:
+ explicit StringOption(std::string& Value);
+ virtual void Parse(std::string_view Value) override;
+ std::string& RefValue;
+ };
+
+ class FilePathOption : public OptionValue
+ {
+ public:
+ explicit FilePathOption(std::filesystem::path& Value);
+ virtual void Parse(std::string_view Value) override;
+ std::filesystem::path& RefValue;
+ };
+
+ class BoolOption : public OptionValue
+ {
+ public:
+ explicit BoolOption(bool& Value);
+ virtual void Parse(std::string_view Value);
+ bool& RefValue;
+ };
+
+ template<Integral T>
+ class NumberOption : public OptionValue
+ {
+ public:
+ explicit NumberOption(T& Value) : RefValue(Value) {}
+ virtual void Parse(std::string_view Value) override
+ {
+ if (std::optional<T> OptionalValue = ParseInt<T>(Value); OptionalValue.has_value())
+ {
+ RefValue = OptionalValue.value();
+ }
+ }
+ T& RefValue;
+ };
+
+ struct Option
+ {
+ std::string CommandLineOptionName;
+ std::shared_ptr<OptionValue> Value;
+ };
+
+ std::shared_ptr<OptionValue> MakeOption(std::string& Value);
+ std::shared_ptr<OptionValue> MakeOption(std::filesystem::path& Value);
+
+ template<Integral T>
+ std::shared_ptr<OptionValue> MakeOption(T& Value)
+ {
+ return std::make_shared<NumberOption<T>>(Value);
+ };
+
+ std::shared_ptr<OptionValue> MakeOption(bool& Value);
+
+ template<typename T>
+ void AddOption(std::string_view EnvName, T& Value, std::string_view CommandLineOptionName = "")
+ {
+ OptionMap.insert_or_assign(std::string(EnvName),
+ Option{.CommandLineOptionName = std::string(CommandLineOptionName), .Value = MakeOption(Value)});
+ };
+
+ EnvironmentOptions();
+
+ void Parse(const cxxopts::ParseResult& CmdLineResult);
+
+private:
+ std::unordered_map<std::string, Option> OptionMap;
+};
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h b/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h
index 89fc70140..bbf070993 100644
--- a/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h
+++ b/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h
@@ -13,5 +13,6 @@ std::unique_ptr<BuildStorage> CreateJupiterBuildStorage(LoggerRef InLog,
BuildStorage::Statistics& Stats,
std::string_view Namespace,
std::string_view Bucket,
+ bool AllowRedirect,
const std::filesystem::path& TempFolderPath);
} // namespace zen
diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h
index 2c5fc73b8..b79790f25 100644
--- a/src/zenutil/include/zenutil/jupiter/jupitersession.h
+++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h
@@ -65,7 +65,7 @@ struct FinalizeBuildPartResult : JupiterResult
class JupiterSession
{
public:
- JupiterSession(LoggerRef InLog, HttpClient& InHttpClient);
+ JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect);
~JupiterSession();
JupiterResult Authenticate();
@@ -102,6 +102,8 @@ public:
std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
+ JupiterResult ListBuildNamespaces();
+ JupiterResult ListBuildBuckets(std::string_view Namespace);
JupiterResult ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload);
JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload);
JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
@@ -135,13 +137,14 @@ public:
uint64_t PayloadSize,
std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems);
- JupiterResult GetMultipartBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver,
- std::vector<std::function<JupiterResult()>>& OutWorkItems);
+ JupiterResult GetMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete,
+ std::vector<std::function<JupiterResult()>>& OutWorkItems);
JupiterResult PutBlockMetadata(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
@@ -152,9 +155,15 @@ public:
const Oid& BuildId,
const Oid& PartId,
const IoHash& RawHash);
- JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount);
JupiterResult GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload);
+ JupiterResult PutBuildPartStats(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& BuildPartId,
+ IoBuffer Payload);
+
private:
inline LoggerRef Log() { return m_Log; }
@@ -164,6 +173,7 @@ private:
LoggerRef m_Log;
HttpClient& m_HttpClient;
+ const bool m_AllowRedirect = false;
};
} // namespace zen
diff --git a/src/zenutil/include/zenutil/logging/rotatingfilesink.h b/src/zenutil/include/zenutil/logging/rotatingfilesink.h
index 758722156..cd28bdcb2 100644
--- a/src/zenutil/include/zenutil/logging/rotatingfilesink.h
+++ b/src/zenutil/include/zenutil/logging/rotatingfilesink.h
@@ -27,7 +27,6 @@ public:
{
ZEN_MEMSCOPE(ELLMTag::Logging);
- ZEN_MEMSCOPE(ELLMTag::Logging);
std::error_code Ec;
if (RotateOnOpen)
{
diff --git a/src/zenutil/include/zenutil/parallellwork.h b/src/zenutil/include/zenutil/parallellwork.h
deleted file mode 100644
index 79798fc8d..000000000
--- a/src/zenutil/include/zenutil/parallellwork.h
+++ /dev/null
@@ -1,117 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/except.h>
-#include <zencore/fmtutils.h>
-#include <zencore/thread.h>
-#include <zencore/workthreadpool.h>
-
-#include <atomic>
-
-namespace zen {
-
-class ParallellWork
-{
-public:
- ParallellWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) {}
-
- ~ParallellWork()
- {
- // Make sure to call Wait before destroying
- ZEN_ASSERT(m_PendingWork.Remaining() == 0);
- }
-
- std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)> DefaultErrorFunction()
- {
- return [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) {
- m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex.what()); });
- AbortFlag = true;
- };
- }
-
- void ScheduleWork(WorkerThreadPool& WorkerPool,
- std::function<void(std::atomic<bool>& AbortFlag)>&& Work,
- std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)>&& OnError)
- {
- m_PendingWork.AddCount(1);
- try
- {
- WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = std::move(OnError)] {
- try
- {
- Work(m_AbortFlag);
- }
- catch (const AssertException& AssertEx)
- {
- OnError(
- std::runtime_error(fmt::format("Caught assert exception while handling request: {}", AssertEx.FullDescription())),
- m_AbortFlag);
- }
- catch (const std::system_error& SystemError)
- {
- if (IsOOM(SystemError.code()))
- {
- OnError(std::runtime_error(fmt::format("Out of memory. Reason: {}", SystemError.what())), m_AbortFlag);
- }
- else if (IsOOD(SystemError.code()))
- {
- OnError(std::runtime_error(fmt::format("Out of disk. Reason: {}", SystemError.what())), m_AbortFlag);
- }
- else
- {
- OnError(std::runtime_error(fmt::format("System error. Reason: {}", SystemError.what())), m_AbortFlag);
- }
- }
- catch (const std::exception& Ex)
- {
- OnError(Ex, m_AbortFlag);
- }
- m_PendingWork.CountDown();
- });
- }
- catch (const std::exception&)
- {
- m_PendingWork.CountDown();
- throw;
- }
- }
-
- void Abort() { m_AbortFlag = true; }
-
- bool IsAborted() const { return m_AbortFlag.load(); }
-
- void Wait(int32_t UpdateInteralMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback)
- {
- ZEN_ASSERT(m_PendingWork.Remaining() > 0);
- m_PendingWork.CountDown();
- while (!m_PendingWork.Wait(UpdateInteralMS))
- {
- UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining());
- }
- if (m_Errors.size() == 1)
- {
- throw std::runtime_error(m_Errors.front());
- }
- else if (m_Errors.size() > 1)
- {
- ExtendableStringBuilder<128> SB;
- SB.Append("Multiple errors:");
- for (const std::string& Error : m_Errors)
- {
- SB.Append(fmt::format("\n {}", Error));
- }
- throw std::runtime_error(SB.ToString());
- }
- }
- Latch& PendingWork() { return m_PendingWork; }
-
-private:
- std::atomic<bool>& m_AbortFlag;
- Latch m_PendingWork;
-
- RwLock m_ErrorLock;
- std::vector<std::string> m_Errors;
-};
-
-} // namespace zen
diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h
new file mode 100644
index 000000000..639c6968c
--- /dev/null
+++ b/src/zenutil/include/zenutil/parallelwork.h
@@ -0,0 +1,77 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/scopeguard.h>
+#include <zencore/thread.h>
+#include <zencore/workthreadpool.h>
+
+#include <atomic>
+
+namespace zen {
+
+class ParallelWork
+{
+public:
+ ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag);
+
+ ~ParallelWork();
+
+ typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback;
+ typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback;
+ typedef std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)> UpdateCallback;
+
+ void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {})
+ {
+ m_PendingWork.AddCount(1);
+ try
+ {
+ WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] {
+ auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); });
+ try
+ {
+ while (m_PauseFlag && !m_AbortFlag)
+ {
+ Sleep(2000);
+ }
+ Work(m_AbortFlag);
+ }
+ catch (...)
+ {
+ OnError(std::current_exception(), m_AbortFlag);
+ }
+ });
+ }
+ catch (const std::exception&)
+ {
+ m_PendingWork.CountDown();
+ throw;
+ }
+ }
+
+ void Abort() { m_AbortFlag = true; }
+
+ bool IsAborted() const { return m_AbortFlag.load(); }
+
+ void Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback);
+
+ void Wait();
+
+ Latch& PendingWork() { return m_PendingWork; }
+
+private:
+ ExceptionCallback DefaultErrorFunction();
+ void RethrowErrors();
+
+ std::atomic<bool>& m_AbortFlag;
+ std::atomic<bool>& m_PauseFlag;
+ bool m_DispatchComplete = false;
+ Latch m_PendingWork;
+
+ RwLock m_ErrorLock;
+ std::vector<std::exception_ptr> m_Errors;
+};
+
+void parallellwork_forcelink();
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h
index 9683ad720..df2033bca 100644
--- a/src/zenutil/include/zenutil/workerpools.h
+++ b/src/zenutil/include/zenutil/workerpools.h
@@ -21,6 +21,9 @@ WorkerThreadPool& GetMediumWorkerPool(EWorkloadType WorkloadType);
// Worker pool with std::thread::hardware_concurrency() / 8 worker threads, but at least one thread
WorkerThreadPool& GetSmallWorkerPool(EWorkloadType WorkloadType);
+// Worker pool with minimum number of worker threads, but at least one thread
+WorkerThreadPool& GetTinyWorkerPool(EWorkloadType WorkloadType);
+
// Special worker pool that does not use worker thread but issues all scheduled work on the calling thread
// This is useful for debugging when multiple async thread can make stepping in debugger complicated
WorkerThreadPool& GetSyncWorkerPool();
diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp
index bf89ce785..9974725ff 100644
--- a/src/zenutil/jupiter/jupiterbuildstorage.cpp
+++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp
@@ -25,8 +25,9 @@ public:
Statistics& Stats,
std::string_view Namespace,
std::string_view Bucket,
+ bool AllowRedirect,
const std::filesystem::path& TempFolderPath)
- : m_Session(InLog, InHttpClient)
+ : m_Session(InLog, InHttpClient, AllowRedirect)
, m_Stats(Stats)
, m_Namespace(Namespace)
, m_Bucket(Bucket)
@@ -35,6 +36,61 @@ public:
}
virtual ~JupiterBuildStorage() {}
+ virtual CbObject ListNamespaces(bool bRecursive) override
+ {
+ ZEN_TRACE_CPU("Jupiter::ListNamespaces");
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ JupiterResult ListResult = m_Session.ListBuildNamespaces();
+ AddStatistic(ListResult);
+ if (!ListResult.Success)
+ {
+ throw std::runtime_error(fmt::format("Failed listing namespaces: {} ({})", ListResult.Reason, ListResult.ErrorCode));
+ }
+ CbObject NamespaceResponse = PayloadToCbObject("Failed listing namespaces"sv, ListResult.Response);
+
+ CbObjectWriter Response;
+ Response.BeginArray("results"sv);
+ for (CbFieldView NamespaceField : NamespaceResponse["namespaces"])
+ {
+ std::string_view Namespace = NamespaceField.AsString();
+ if (!Namespace.empty())
+ {
+ Response.BeginObject();
+ Response.AddString("name", Namespace);
+
+ if (bRecursive)
+ {
+ JupiterResult BucketsResult = m_Session.ListBuildBuckets(Namespace);
+ AddStatistic(BucketsResult);
+ if (!BucketsResult.Success)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed listing namespaces: {} ({})", BucketsResult.Reason, BucketsResult.ErrorCode));
+ }
+ CbObject BucketResponse = PayloadToCbObject("Failed listing namespaces"sv, BucketsResult.Response);
+
+ Response.BeginArray("items");
+ for (CbFieldView BucketField : BucketResponse["buckets"])
+ {
+ std::string_view Bucket = BucketField.AsString();
+ if (!Bucket.empty())
+ {
+ Response.AddString(Bucket);
+ }
+ }
+ Response.EndArray();
+ }
+
+ Response.EndObject();
+ }
+ }
+ Response.EndArray();
+
+ return Response.Save();
+ }
+
virtual CbObject ListBuilds(CbObject Query) override
{
ZEN_TRACE_CPU("Jupiter::ListBuilds");
@@ -49,7 +105,7 @@ public:
{
throw std::runtime_error(fmt::format("Failed listing builds: {} ({})", ListResult.Reason, ListResult.ErrorCode));
}
- return PayloadToJson("Failed listing builds"sv, ListResult.Response);
+ return PayloadToCbObject("Failed listing builds"sv, ListResult.Response);
}
virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override
@@ -66,7 +122,7 @@ public:
{
throw std::runtime_error(fmt::format("Failed creating build: {} ({})", PutResult.Reason, PutResult.ErrorCode));
}
- return PayloadToJson(fmt::format("Failed creating build: {}", BuildId), PutResult.Response);
+ return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response);
}
virtual CbObject GetBuild(const Oid& BuildId) override
@@ -81,7 +137,7 @@ public:
{
throw std::runtime_error(fmt::format("Failed fetching build: {} ({})", GetBuildResult.Reason, GetBuildResult.ErrorCode));
}
- return PayloadToJson(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response);
+ return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response);
}
virtual void FinalizeBuild(const Oid& BuildId) override
@@ -134,7 +190,7 @@ public:
GetBuildPartResult.Reason,
GetBuildPartResult.ErrorCode));
}
- return PayloadToJson(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response);
+ return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response);
}
virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override
@@ -235,19 +291,25 @@ public:
return std::move(GetBuildBlobResult.Response);
}
- virtual std::vector<std::function<void()>> GetLargeBuildBlob(
- const Oid& BuildId,
- const IoHash& RawHash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) override
+ virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete) override
{
ZEN_TRACE_CPU("Jupiter::GetLargeBuildBlob");
Stopwatch ExecutionTimer;
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
std::vector<std::function<JupiterResult()>> WorkItems;
- JupiterResult GetMultipartBlobResult =
- m_Session.GetMultipartBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ChunkSize, std::move(Receiver), WorkItems);
+ JupiterResult GetMultipartBlobResult = m_Session.GetMultipartBuildBlob(m_Namespace,
+ m_Bucket,
+ BuildId,
+ RawHash,
+ ChunkSize,
+ std::move(OnReceive),
+ std::move(OnComplete),
+ WorkItems);
AddStatistic(GetMultipartBlobResult);
if (!GetMultipartBlobResult.Success)
@@ -272,7 +334,7 @@ public:
return WorkList;
}
- virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override
+ virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override
{
ZEN_TRACE_CPU("Jupiter::PutBlockMetadata");
@@ -284,27 +346,32 @@ public:
AddStatistic(PutMetaResult);
if (!PutMetaResult.Success)
{
+ if (PutMetaResult.ErrorCode == int32_t(HttpResponseCode::NotFound))
+ {
+ return false;
+ }
throw std::runtime_error(
fmt::format("Failed putting build block metadata: {} ({})", PutMetaResult.Reason, PutMetaResult.ErrorCode));
}
+ return true;
}
- virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) override
+ virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override
{
ZEN_TRACE_CPU("Jupiter::FindBlocks");
Stopwatch ExecutionTimer;
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- JupiterResult FindResult = m_Session.FindBlocks(m_Namespace, m_Bucket, BuildId);
+ JupiterResult FindResult = m_Session.FindBlocks(m_Namespace, m_Bucket, BuildId, MaxBlockCount);
AddStatistic(FindResult);
if (!FindResult.Success)
{
throw std::runtime_error(fmt::format("Failed fetching known blocks: {} ({})", FindResult.Reason, FindResult.ErrorCode));
}
- return ParseChunkBlockDescriptionList(PayloadToJson("Failed fetching known blocks"sv, FindResult.Response));
+ return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response);
}
- virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
+ virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
{
ZEN_TRACE_CPU("Jupiter::GetBlockMetadata");
@@ -328,28 +395,35 @@ public:
throw std::runtime_error(
fmt::format("Failed fetching block metadatas: {} ({})", GetBlockMetadataResult.Reason, GetBlockMetadataResult.ErrorCode));
}
- std::vector<ChunkBlockDescription> UnorderedList =
- ParseChunkBlockDescriptionList(PayloadToJson("Failed fetching block metadatas", GetBlockMetadataResult.Response));
- tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup;
- for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++)
+ return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response);
+ }
+
+ virtual void PutBuildPartStats(const Oid& BuildId,
+ const Oid& BuildPartId,
+ const tsl::robin_map<std::string, double>& FloatStats) override
+ {
+ ZEN_UNUSED(BuildId, BuildPartId, FloatStats);
+ CbObjectWriter Request;
+ Request.BeginObject("floatStats"sv);
+ for (auto It : FloatStats)
{
- const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex];
- BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex);
+ Request.AddFloat(It.first, It.second);
}
- std::vector<ChunkBlockDescription> SortedBlockDescriptions;
- SortedBlockDescriptions.reserve(BlockDescriptionLookup.size());
- for (const IoHash& BlockHash : BlockHashes)
+ Request.EndObject();
+ IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+ JupiterResult PutBuildPartStatsResult = m_Session.PutBuildPartStats(m_Namespace, m_Bucket, BuildId, BuildPartId, Payload);
+ AddStatistic(PutBuildPartStatsResult);
+ if (!PutBuildPartStatsResult.Success)
{
- if (auto It = BlockDescriptionLookup.find(BlockHash); It != BlockDescriptionLookup.end())
- {
- SortedBlockDescriptions.push_back(std::move(UnorderedList[It->second]));
- }
+ throw std::runtime_error(fmt::format("Failed posting build part statistics: {} ({})",
+ PutBuildPartStatsResult.Reason,
+ PutBuildPartStatsResult.ErrorCode));
}
- return SortedBlockDescriptions;
}
private:
- static CbObject PayloadToJson(std::string_view Context, const IoBuffer& Payload)
+ static CbObject PayloadToCbObject(std::string_view Context, const IoBuffer& Payload)
{
if (Payload.GetContentType() == ZenContentType::kJSON)
{
@@ -394,11 +468,12 @@ CreateJupiterBuildStorage(LoggerRef InLog,
BuildStorage::Statistics& Stats,
std::string_view Namespace,
std::string_view Bucket,
+ bool AllowRedirect,
const std::filesystem::path& TempFolderPath)
{
ZEN_TRACE_CPU("CreateJupiterBuildStorage");
- return std::make_unique<JupiterBuildStorage>(InLog, InHttpClient, Stats, Namespace, Bucket, TempFolderPath);
+ return std::make_unique<JupiterBuildStorage>(InLog, InHttpClient, Stats, Namespace, Bucket, AllowRedirect, TempFolderPath);
}
} // namespace zen
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp
index 68f214c06..1fd59acdf 100644
--- a/src/zenutil/jupiter/jupitersession.cpp
+++ b/src/zenutil/jupiter/jupitersession.cpp
@@ -5,6 +5,7 @@
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compositebuffer.h>
+#include <zencore/compress.h>
#include <zencore/fmtutils.h>
#include <zencore/trace.h>
@@ -48,7 +49,10 @@ namespace detail {
}
} // namespace detail
-JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient) : m_Log(InLog), m_HttpClient(InHttpClient)
+JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect)
+: m_Log(InLog)
+, m_HttpClient(InHttpClient)
+, m_AllowRedirect(AllowRedirect)
{
}
@@ -357,12 +361,28 @@ JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view Typ
}
JupiterResult
+JupiterSession::ListBuildNamespaces()
+{
+ HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds"), {HttpClient::Accept(ZenContentType::kJSON)});
+ return detail::ConvertResponse(Response, "JupiterSession::ListBuildNamespaces"sv);
+}
+
+JupiterResult
+JupiterSession::ListBuildBuckets(std::string_view Namespace)
+{
+ HttpClient::Response Response =
+ m_HttpClient.Get(fmt::format("/api/v2/builds/{}", Namespace), {HttpClient::Accept(ZenContentType::kJSON)});
+ return detail::ConvertResponse(Response, "JupiterSession::ListBuildBuckets"sv);
+}
+
+JupiterResult
JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload)
{
ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
- HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/search", Namespace, BucketId),
- Payload,
- {HttpClient::Accept(ZenContentType::kCbObject)});
+ std::string OptionalBucketPath = BucketId.empty() ? "" : fmt::format("/{}", BucketId);
+ HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}{}/search", Namespace, OptionalBucketPath),
+ Payload,
+ {HttpClient::Accept(ZenContentType::kCbObject)});
return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv);
}
@@ -527,12 +547,14 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
bool& OutIsComplete) -> JupiterResult {
const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex];
IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte);
- std::string MultipartUploadResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}",
- Namespace,
- BucketId,
- BuildId,
- Hash.ToHexString(),
- Part.QueryString);
+ std::string MultipartUploadResponseRequestString =
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ Part.QueryString,
+ m_AllowRedirect ? "true"sv : "false"sv);
// ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString);
HttpClient::Response MultipartUploadResponse = m_HttpClient.Put(MultipartUploadResponseRequestString, PartPayload);
if (!MultipartUploadResponse.IsSuccess())
@@ -590,12 +612,13 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
IoBuffer RetryPartPayload =
Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte - 1);
std::string RetryMultipartUploadResponseRequestString =
- fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}",
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}",
Namespace,
BucketId,
BuildId,
Hash.ToHexString(),
- RetryPart.QueryString);
+ RetryPart.QueryString,
+ m_AllowRedirect ? "true"sv : "false"sv);
MultipartUploadResponse = m_HttpClient.Put(RetryMultipartUploadResponseRequestString, RetryPartPayload);
TotalUploadedBytes = MultipartUploadResponse.UploadedBytes;
@@ -626,15 +649,21 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
}
JupiterResult
-JupiterSession::GetMultipartBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver,
- std::vector<std::function<JupiterResult()>>& OutWorkItems)
-{
- std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString());
+JupiterSession::GetMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete,
+ std::vector<std::function<JupiterResult()>>& OutWorkItems)
+{
+ std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ m_AllowRedirect ? "true"sv : "false"sv);
HttpClient::Response Response =
m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}}));
if (Response.IsSuccess())
@@ -649,46 +678,68 @@ JupiterSession::GetMultipartBuildBlob(std::string_view Namespa
uint64_t TotalSize = TotalSizeMaybe.value();
uint64_t PayloadSize = Response.ResponsePayload.GetSize();
- Receiver(0, Response.ResponsePayload, TotalSize);
+ OnReceive(0, Response.ResponsePayload);
if (TotalSize > PayloadSize)
{
struct WorkloadData
{
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver;
- std::atomic<uint64_t> BytesRemaining;
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive;
+ std::function<void()> OnComplete;
+ std::atomic<uint64_t> BytesRemaining;
};
std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
- Workload->Receiver = std::move(Receiver);
+ Workload->OnReceive = std::move(OnReceive);
+ Workload->OnComplete = std::move(OnComplete);
Workload->BytesRemaining = TotalSize - PayloadSize;
uint64_t Offset = PayloadSize;
while (Offset < TotalSize)
{
uint64_t PartSize = Min(ChunkSize, TotalSize - Offset);
- OutWorkItems.emplace_back(
- [this, Namespace, BucketId, BuildId, Hash, TotalSize, Workload, Offset, PartSize]() -> JupiterResult {
- std::string RequestUrl =
- fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString());
- HttpClient::Response Response = m_HttpClient.Get(
- RequestUrl,
- HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}}));
- if (Response.IsSuccess())
+ OutWorkItems.emplace_back([this,
+ Namespace = std::string(Namespace),
+ BucketId = std::string(BucketId),
+ BuildId = Oid(BuildId),
+ Hash = IoHash(Hash),
+ TotalSize,
+ Workload,
+ Offset,
+ PartSize]() -> JupiterResult {
+ std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ m_AllowRedirect ? "true"sv : "false"sv);
+ HttpClient::Response Response = m_HttpClient.Get(
+ RequestUrl,
+ HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}}));
+ if (Response.IsSuccess())
+ {
+ Workload->OnReceive(Offset, Response.ResponsePayload);
+ uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize());
+ if (ByteRemaning == Response.ResponsePayload.GetSize())
{
- uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize());
- Workload->Receiver(Offset, Response.ResponsePayload, ByteRemaning);
+ Workload->OnComplete();
}
- return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
- });
+ }
+ return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
+ });
Offset += PartSize;
}
}
+ else
+ {
+ OnComplete();
+ }
return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
}
}
}
- Receiver(0, Response.ResponsePayload, Response.ResponsePayload.GetSize());
+ OnReceive(0, Response.ResponsePayload);
+ OnComplete();
}
return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
}
@@ -707,11 +758,28 @@ JupiterSession::GetBuildBlob(std::string_view Namespace,
{
Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", Offset, Offset + Size - 1)});
}
- HttpClient::Response Response =
- m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()),
- TempFolderPath,
- Headers);
-
+ HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ m_AllowRedirect ? "true"sv : "false"sv),
+ TempFolderPath,
+ Headers);
+ if (Response.IsSuccess())
+ {
+ // If we get a redirect to S3 or a non-Jupiter endpoint the content type will not be correct, validate it and set it
+ if (m_AllowRedirect && (Response.ResponsePayload.GetContentType() == HttpContentType::kBinary))
+ {
+ IoHash ValidateRawHash;
+ uint64_t ValidateRawSize = 0;
+ ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload, ValidateRawHash, ValidateRawSize));
+ ZEN_ASSERT_SLOW(ValidateRawHash == Hash);
+ ZEN_ASSERT_SLOW(ValidateRawSize > 0);
+ ZEN_UNUSED(ValidateRawHash, ValidateRawSize);
+ Response.ResponsePayload.SetContentType(ZenContentType::kCompressedBinary);
+ }
+ }
return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv);
}
@@ -758,10 +826,12 @@ JupiterSession::FinalizeBuildPart(std::string_view Namespace,
}
JupiterResult
-JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
+JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount)
{
- HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks", Namespace, BucketId, BuildId),
- HttpClient::Accept(ZenContentType::kCbObject));
+ const std::string Parameters = MaxBlockCount == (uint64_t)-1 ? "" : fmt::format("?count={}", MaxBlockCount);
+ HttpClient::Response Response =
+ m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks{}", Namespace, BucketId, BuildId, Parameters),
+ HttpClient::Accept(ZenContentType::kCbObject));
return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv);
}
@@ -776,4 +846,19 @@ JupiterSession::GetBlockMetadata(std::string_view Namespace, std::string_view Bu
return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv);
}
+JupiterResult
+JupiterSession::PutBuildPartStats(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& BuildPartId,
+ IoBuffer Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response =
+ m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/stats", Namespace, BucketId, BuildId, BuildPartId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::PutBuildPartStats"sv);
+}
+
} // namespace zen
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp
new file mode 100644
index 000000000..aa806438b
--- /dev/null
+++ b/src/zenutil/parallelwork.cpp
@@ -0,0 +1,225 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/parallelwork.h>
+
+#include <zencore/callstack.h>
+#include <zencore/except.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+
+#include <typeinfo>
+
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+#endif // ZEN_WITH_TESTS
+
+namespace zen {
+
+ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag)
+: m_AbortFlag(AbortFlag)
+, m_PauseFlag(PauseFlag)
+, m_PendingWork(1)
+{
+}
+
+ParallelWork::~ParallelWork()
+{
+ try
+ {
+ if (!m_DispatchComplete)
+ {
+ ZEN_ASSERT(m_PendingWork.Remaining() > 0);
+ ZEN_WARN(
+ "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads "
+ "to complete");
+ m_PendingWork.CountDown();
+ }
+ m_PendingWork.Wait();
+ ptrdiff_t RemainingWork = m_PendingWork.Remaining();
+ if (RemainingWork != 0)
+ {
+ void* Frames[8];
+ uint32_t FrameCount = GetCallstack(2, 8, Frames);
+ CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames);
+ ZEN_ERROR("ParallelWork destructor waited for outstanding work but pending work count is {} instead of 0\n{}",
+ RemainingWork,
+ CallstackToString(Callstack, " "));
+ FreeCallstack(Callstack);
+
+ uint32_t WaitedMs = 0;
+ while (m_PendingWork.Remaining() > 0 && WaitedMs < 2000)
+ {
+ Sleep(50);
+ WaitedMs += 50;
+ }
+ RemainingWork = m_PendingWork.Remaining();
+ if (RemainingWork != 0)
+ {
+ ZEN_WARN("ParallelWork destructor safety wait failed, pending work count at {}", RemainingWork)
+ }
+ else
+ {
+ ZEN_INFO("ParallelWork destructor safety wait succeeded");
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in ~ParallelWork: {}", Ex.what());
+ }
+}
+
+ParallelWork::ExceptionCallback
+ParallelWork::DefaultErrorFunction()
+{
+ return [&](std::exception_ptr Ex, std::atomic<bool>& AbortFlag) {
+ m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); });
+ AbortFlag = true;
+ };
+}
+
+void
+ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback)
+{
+ ZEN_ASSERT(!m_DispatchComplete);
+ m_DispatchComplete = true;
+
+ ZEN_ASSERT(m_PendingWork.Remaining() > 0);
+ m_PendingWork.CountDown();
+
+ while (!m_PendingWork.Wait(UpdateIntervalMS))
+ {
+ UpdateCallback(m_AbortFlag.load(), m_PauseFlag.load(), m_PendingWork.Remaining());
+ }
+
+ RethrowErrors();
+}
+
+void
+ParallelWork::Wait()
+{
+ ZEN_ASSERT(!m_DispatchComplete);
+ m_DispatchComplete = true;
+
+ ZEN_ASSERT(m_PendingWork.Remaining() > 0);
+ m_PendingWork.CountDown();
+ m_PendingWork.Wait();
+
+ RethrowErrors();
+}
+
+void
+ParallelWork::RethrowErrors()
+{
+ if (!m_Errors.empty())
+ {
+ if (m_Errors.size() > 1)
+ {
+ ZEN_INFO("Multiple exceptions thrown during ParallelWork execution, dropping the following exceptions:");
+ auto It = m_Errors.begin() + 1;
+ while (It != m_Errors.end())
+ {
+ try
+ {
+ std::rethrow_exception(*It);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_INFO(" {}", Ex.what());
+ }
+ It++;
+ }
+ }
+ std::exception_ptr Ex = m_Errors.front();
+ m_Errors.clear();
+ std::rethrow_exception(Ex);
+ }
+}
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("parallellwork.nowork")
+{
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
+ Work.Wait();
+}
+
+TEST_CASE("parallellwork.basic")
+{
+ WorkerThreadPool WorkerPool(2);
+
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
+ for (uint32_t I = 0; I < 5; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); });
+ }
+ Work.Wait();
+}
+
+TEST_CASE("parallellwork.throws_in_work")
+{
+ WorkerThreadPool WorkerPool(2);
+
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
+ for (uint32_t I = 0; I < 10; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) {
+ ZEN_UNUSED(AbortFlag);
+ if (I > 3)
+ {
+ throw std::runtime_error("We throw in async thread");
+ }
+ else
+ {
+ Sleep(10);
+ }
+ });
+ }
+ CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread");
+}
+
+TEST_CASE("parallellwork.throws_in_dispatch")
+{
+ WorkerThreadPool WorkerPool(2);
+ std::atomic<uint32_t> ExecutedCount;
+ try
+ {
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
+ for (uint32_t I = 0; I < 5; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ ExecutedCount++;
+ });
+ if (I == 3)
+ {
+ throw std::runtime_error("We throw in dispatcher thread");
+ }
+ }
+ CHECK(false);
+ }
+ catch (const std::runtime_error& Ex)
+ {
+ CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what()));
+ CHECK_LE(ExecutedCount.load(), 4);
+ }
+}
+
+void
+parallellwork_forcelink()
+{
+}
+#endif // ZEN_WITH_TESTS
+
+} // namespace zen
diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp
index e3165e838..797034978 100644
--- a/src/zenutil/workerpools.cpp
+++ b/src/zenutil/workerpools.cpp
@@ -11,9 +11,10 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
namespace {
- const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(std::thread::hardware_concurrency());
+ const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(Max(std::thread::hardware_concurrency() - 1u, 2u));
const int MediumWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 2u));
const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 8u), 1u));
+ const int TinyWorkerThreadPoolTreadCount = 1;
static bool IsShutDown = false;
@@ -35,6 +36,9 @@ namespace {
WorkerPool BurstSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(burst)"};
WorkerPool BackgroundSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(bkg)"};
+ WorkerPool BurstTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "TinyThreadPool(burst)"};
+ WorkerPool BackgroundTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "TinyThreadPool(bkg)"};
+
WorkerPool SyncWorkerPool = {.TreadCount = 0, .Name = "SyncThreadPool"};
WorkerThreadPool& EnsurePoolPtr(WorkerPool& Pool)
@@ -75,6 +79,12 @@ GetSmallWorkerPool(EWorkloadType WorkloadType)
}
WorkerThreadPool&
+GetTinyWorkerPool(EWorkloadType WorkloadType)
+{
+ return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstTinyWorkerPool : BackgroundTinyWorkerPool);
+}
+
+WorkerThreadPool&
GetSyncWorkerPool()
{
return EnsurePoolPtr(SyncWorkerPool);
@@ -91,6 +101,8 @@ ShutdownWorkerPools()
BackgroundMediumWorkerPool.Pool.reset();
BurstSmallWorkerPool.Pool.reset();
BackgroundSmallWorkerPool.Pool.reset();
+ BurstTinyWorkerPool.Pool.reset();
+ BackgroundTinyWorkerPool.Pool.reset();
SyncWorkerPool.Pool.reset();
}
} // namespace zen
diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp
index 0409cb976..4fd1cef9a 100644
--- a/src/zenutil/zenserverprocess.cpp
+++ b/src/zenutil/zenserverprocess.cpp
@@ -547,7 +547,7 @@ ZenServerEnvironment::CreateNewTestDir()
TestDir << "test"sv << int64_t(ZenServerTestCounter.fetch_add(1));
std::filesystem::path TestPath = m_TestBaseDir / TestDir.c_str();
- ZEN_ASSERT(!std::filesystem::exists(TestPath));
+ ZEN_ASSERT(!IsDir(TestPath));
ZEN_INFO("Creating new test dir @ '{}'", TestPath);
@@ -581,7 +581,7 @@ ZenServerInstance::~ZenServerInstance()
{
Shutdown();
std::error_code DummyEc;
- std::filesystem::remove(std::filesystem::temp_directory_path() / ("zenserver_" + m_Name + ".log"), DummyEc);
+ RemoveFile(std::filesystem::temp_directory_path() / ("zenserver_" + m_Name + ".log"), DummyEc);
}
catch (const std::exception& Err)
{
@@ -632,6 +632,7 @@ ZenServerInstance::Shutdown()
std::error_code Ec;
if (SignalShutdown(Ec))
{
+ Stopwatch Timer;
ZEN_DEBUG("Waiting for zenserver process {} ({}) to shut down", m_Name, m_Process.Pid());
while (!m_Process.Wait(1000))
{
@@ -645,7 +646,10 @@ ZenServerInstance::Shutdown()
ZEN_WARN("Wait abandoned by exited process");
return 0;
}
- ZEN_WARN("Waiting for zenserver process {} ({}) timed out", m_Name, m_Process.Pid());
+ ZEN_WARN("Waited for zenserver process {} ({}) to exit for {}",
+ m_Name,
+ m_Process.Pid(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
ZEN_DEBUG("zenserver process {} ({}) exited", m_Name, m_Process.Pid());
int ExitCode = m_Process.GetExitCode();
@@ -1046,7 +1050,7 @@ std::string
ZenServerInstance::GetLogOutput() const
{
std::filesystem::path OutputPath = std::filesystem::temp_directory_path() / ("zenserver_" + m_Name + ".log");
- if (std::filesystem::is_regular_file(OutputPath))
+ if (IsFile(OutputPath))
{
FileContents Contents = ReadFile(OutputPath);
if (!Contents.ErrorCode)
@@ -1068,7 +1072,7 @@ ZenServerInstance::Terminate()
const std::filesystem::path BaseDir = m_Env.ProgramBaseDir();
const std::filesystem::path Executable = BaseDir / "zenserver" ZEN_EXE_SUFFIX_LITERAL;
ProcessHandle RunningProcess;
- std::error_code Ec = FindProcess(Executable, RunningProcess);
+ std::error_code Ec = FindProcess(Executable, RunningProcess, /*IncludeSelf*/ false);
if (Ec)
{
throw std::system_error(Ec, fmt::format("failed to look up running server executable '{}'", Executable));
@@ -1136,7 +1140,7 @@ ValidateLockFileInfo(const LockFileInfo& Info, std::string& OutReason)
OutReason = fmt::format("listen port ({}) is not valid", Info.EffectiveListenPort);
return false;
}
- if (!std::filesystem::is_directory(Info.DataDir))
+ if (!IsDir(Info.DataDir))
{
OutReason = fmt::format("data directory ('{}') does not exist", Info.DataDir);
return false;
diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp
index 19eb63ce9..fe23b00c1 100644
--- a/src/zenutil/zenutil.cpp
+++ b/src/zenutil/zenutil.cpp
@@ -7,6 +7,8 @@
# include <zenutil/cache/cacherequests.h>
# include <zenutil/cache/rpcrecording.h>
# include <zenutil/chunkedfile.h>
+# include <zenutil/commandlineoptions.h>
+# include <zenutil/parallelwork.h>
namespace zen {
@@ -17,6 +19,8 @@ zenutil_forcelinktests()
cache::rpcrecord_forcelink();
cacherequests_forcelink();
chunkedfile_forcelink();
+ commandlineoptions_forcelink();
+ parallellwork_forcelink();
}
} // namespace zen