From b55fdf7c1dfe6d3e52b08a160a77472ec1480cf7 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 18 Feb 2026 08:54:05 +0100 Subject: convert ZEN_ASSERTs to exception to handle corrupt data gracefully (#760) * convert ZEN_ASSERTs to exception to handle corrupt data gracefully --- CHANGELOG.md | 1 + .../builds/buildstorageoperations.cpp | 68 ++++++++++++++++++---- .../zenremotestore/builds/buildstorageoperations.h | 3 +- 3 files changed, 59 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c2fe710a9..16a6b7fb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ - Improvement: Reduced time project and project oplogs are locked during GC and Validation - Improvement: `zen` now supports additional configuration of logging options, such as `--log-warn=...` for configuring log levels, etc (see `zen --help`) +- Bugfix: If a corrupted block (or partial block) is downloaded, handle it gracefully and end the download instead of causing an assert ## 5.7.20 - Improvement: When validating cache records read from disk we now do a limited validation of the payload to reduce overhead diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index 2319ad66d..ade431393 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -4083,7 +4083,8 @@ BuildsOperationUpdateFolder::WriteSequenceChunkToCache(BufferedWriteFileCache::L } bool -BuildsOperationUpdateFolder::GetBlockWriteOps(std::span ChunkRawHashes, +BuildsOperationUpdateFolder::GetBlockWriteOps(const IoHash& BlockRawHash, + std::span ChunkRawHashes, std::span ChunkCompressedLengths, std::span> SequenceIndexChunksLeftToWriteCounters, std::span> RemoteChunkIndexNeedsCopyFromSourceFlags, @@ -4115,9 +4116,34 @@ BuildsOperationUpdateFolder::GetBlockWriteOps(std::span ChunkR uint64_t VerifyChunkSize; CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer::MakeView(ChunkMemoryView), VerifyChunkHash, VerifyChunkSize); - ZEN_ASSERT(CompressedChunk); - ZEN_ASSERT(VerifyChunkHash == ChunkHash); - ZEN_ASSERT(VerifyChunkSize == m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + if (!CompressedChunk) + { + throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} is not a valid compressed buffer", + ChunkHash, + OffsetInBlock, + ChunkCompressedSize, + BlockRawHash)); + } + if (VerifyChunkHash != ChunkHash) + { + throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} has a mismatching content hash {}", + ChunkHash, + OffsetInBlock, + ChunkCompressedSize, + BlockRawHash, + VerifyChunkHash)); + } + if (VerifyChunkSize != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]) + { + throw std::runtime_error( + fmt::format("Chunk {} at {}, size {} in block {} has a mismatching raw size {}, expected {}", + ChunkHash, + OffsetInBlock, + ChunkCompressedSize, + BlockRawHash, + VerifyChunkSize, + m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])); + } OodleCompressor ChunkCompressor; OodleCompressionLevel ChunkCompressionLevel; @@ -4138,7 +4164,18 @@ BuildsOperationUpdateFolder::GetBlockWriteOps(std::span ChunkR { Decompressed = CompressedChunk.Decompress().AsIoBuffer(); } - ZEN_ASSERT(Decompressed.GetSize() == m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + + if (Decompressed.GetSize() != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]) + { + throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} decompressed to size {}, expected {}", + ChunkHash, + OffsetInBlock, + ChunkCompressedSize, + BlockRawHash, + Decompressed.GetSize(), + m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])); + } + ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) { @@ -4237,7 +4274,8 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription const std::vector ChunkCompressedLengths = ReadChunkBlockHeader(BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()), HeaderSize); - if (GetBlockWriteOps(BlockDescription.ChunkRawHashes, + if (GetBlockWriteOps(BlockDescription.BlockHash, + BlockDescription.ChunkRawHashes, ChunkCompressedLengths, SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromSourceFlags, @@ -4252,7 +4290,8 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription return false; } - if (GetBlockWriteOps(BlockDescription.ChunkRawHashes, + if (GetBlockWriteOps(BlockDescription.BlockHash, + BlockDescription.ChunkRawHashes, BlockDescription.ChunkCompressedLengths, SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromSourceFlags, @@ -4283,7 +4322,8 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc const MemoryView BlockView = BlockMemoryBuffer.GetView(); BlockWriteOps Ops; - if (GetBlockWriteOps(BlockDescription.ChunkRawHashes, + if (GetBlockWriteOps(BlockDescription.BlockHash, + BlockDescription.ChunkRawHashes, BlockDescription.ChunkCompressedLengths, SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromSourceFlags, @@ -5334,6 +5374,13 @@ BuildsOperationUploadFolder::FetchChunk(const ChunkedFolderContent& Content, ZEN_ASSERT(!ChunkLocations.empty()); CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); + if (!Chunk) + { + throw std::runtime_error(fmt::format("Unable to read chunk at {}, size {} from '{}'", + ChunkLocations[0].Offset, + Content.ChunkedContent.ChunkRawSizes[ChunkIndex], + Content.Paths[Lookup.SequenceIndexFirstPathIndex[ChunkLocations[0].SequenceIndex]])); + } ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash); return Chunk; }; @@ -5362,10 +5409,7 @@ BuildsOperationUploadFolder::GenerateBlock(const ChunkedFolderContent& Content, Content.ChunkedContent.ChunkHashes[ChunkIndex], [this, &Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair { CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache); - if (!Chunk) - { - ZEN_ASSERT(false); - } + ZEN_ASSERT(Chunk); uint64_t RawSize = Chunk.GetSize(); const bool ShouldCompressChunk = RawSize >= m_Options.MinimumSizeForCompressInBlock && diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index 6304159ae..9e5bf8d91 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -339,7 +339,8 @@ private: const uint64_t FileOffset, const uint32_t PathIndex); - bool GetBlockWriteOps(std::span ChunkRawHashes, + bool GetBlockWriteOps(const IoHash& BlockRawHash, + std::span ChunkRawHashes, std::span ChunkCompressedLengths, std::span> SequenceIndexChunksLeftToWriteCounters, std::span> RemoteChunkIndexNeedsCopyFromSourceFlags, -- cgit v1.2.3 From ae9c30841074da9226a76c1eb2fb3a3e29086bf6 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Wed, 18 Feb 2026 09:40:35 +0100 Subject: add selective request logging support to http.sys (#762) * implemented selective request logging for http.sys for consistency with asio * fixed traversal of GetLogicalProcessorInformationEx to account for variable-sized records * also adds CPU usage metrics --- src/zencore/include/zencore/system.h | 1 + src/zencore/system.cpp | 169 +++++++++++++++++++++++++++-------- src/zenhttp/servers/httpsys.cpp | 25 +++++- 3 files changed, 156 insertions(+), 39 deletions(-) diff --git a/src/zencore/include/zencore/system.h b/src/zencore/include/zencore/system.h index aec2e0ce4..bf3c15d3d 100644 --- a/src/zencore/include/zencore/system.h +++ b/src/zencore/include/zencore/system.h @@ -25,6 +25,7 @@ struct SystemMetrics uint64_t AvailVirtualMemoryMiB = 0; uint64_t PageFileMiB = 0; uint64_t AvailPageFileMiB = 0; + float CpuUsagePercent = 0.0f; }; SystemMetrics GetSystemMetrics(); diff --git a/src/zencore/system.cpp b/src/zencore/system.cpp index e92691781..267c87e12 100644 --- a/src/zencore/system.cpp +++ b/src/zencore/system.cpp @@ -13,6 +13,8 @@ ZEN_THIRD_PARTY_INCLUDES_START # include # include +# include +# pragma comment(lib, "pdh.lib") ZEN_THIRD_PARTY_INCLUDES_END #elif ZEN_PLATFORM_LINUX # include @@ -65,55 +67,98 @@ GetSystemMetrics() // Determine physical core count - DWORD BufferSize = 0; - BOOL Result = GetLogicalProcessorInformationEx(RelationAll, nullptr, &BufferSize); - if (int32_t Error = GetLastError(); Error != ERROR_INSUFFICIENT_BUFFER) { - ThrowSystemError(Error, "Failed to get buffer size for logical processor information"); - } - - PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX Buffer = (PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX)Memory::Alloc(BufferSize); + DWORD BufferSize = 0; + BOOL Result = GetLogicalProcessorInformationEx(RelationAll, nullptr, &BufferSize); + if (int32_t Error = GetLastError(); Error != ERROR_INSUFFICIENT_BUFFER) + { + ThrowSystemError(Error, "Failed to get buffer size for logical processor information"); + } - Result = GetLogicalProcessorInformationEx(RelationAll, Buffer, &BufferSize); - if (!Result) - { - Memory::Free(Buffer); - throw std::runtime_error("Failed to get logical processor information"); - } + PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX Buffer = (PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX)Memory::Alloc(BufferSize); - DWORD ProcessorPkgCount = 0; - DWORD ProcessorCoreCount = 0; - DWORD ByteOffset = 0; - while (ByteOffset + sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX) <= BufferSize) - { - const SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX& Slpi = Buffer[ByteOffset / sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX)]; - if (Slpi.Relationship == RelationProcessorCore) + Result = GetLogicalProcessorInformationEx(RelationAll, Buffer, &BufferSize); + if (!Result) { - ProcessorCoreCount++; + Memory::Free(Buffer); + throw std::runtime_error("Failed to get logical processor information"); } - else if (Slpi.Relationship == RelationProcessorPackage) + + DWORD ProcessorPkgCount = 0; + DWORD ProcessorCoreCount = 0; + DWORD LogicalProcessorCount = 0; + + BYTE* Ptr = reinterpret_cast(Buffer); + BYTE* const End = Ptr + BufferSize; + while (Ptr < End) { - ProcessorPkgCount++; + const SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX& Slpi = *reinterpret_cast(Ptr); + if (Slpi.Relationship == RelationProcessorCore) + { + ++ProcessorCoreCount; + + // Count logical processors (threads) across all processor groups for this core. + // Each core entry lists one GROUP_AFFINITY per group it spans; each set bit + // in the Mask represents one logical processor (HyperThreading sibling). + for (WORD g = 0; g < Slpi.Processor.GroupCount; ++g) + { + LogicalProcessorCount += static_cast(__popcnt64(Slpi.Processor.GroupMask[g].Mask)); + } + } + else if (Slpi.Relationship == RelationProcessorPackage) + { + ++ProcessorPkgCount; + } + Ptr += Slpi.Size; } - ByteOffset += sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX); - } - Metrics.CoreCount = ProcessorCoreCount; - Metrics.CpuCount = ProcessorPkgCount; + Metrics.CoreCount = ProcessorCoreCount; + Metrics.CpuCount = ProcessorPkgCount; + Metrics.LogicalProcessorCount = LogicalProcessorCount; - Memory::Free(Buffer); + Memory::Free(Buffer); + } // Query memory status - MEMORYSTATUSEX MemStatus{.dwLength = sizeof(MEMORYSTATUSEX)}; - GlobalMemoryStatusEx(&MemStatus); + { + MEMORYSTATUSEX MemStatus{.dwLength = sizeof(MEMORYSTATUSEX)}; + GlobalMemoryStatusEx(&MemStatus); + + Metrics.SystemMemoryMiB = MemStatus.ullTotalPhys / 1024 / 1024; + Metrics.AvailSystemMemoryMiB = MemStatus.ullAvailPhys / 1024 / 1024; + Metrics.VirtualMemoryMiB = MemStatus.ullTotalVirtual / 1024 / 1024; + Metrics.AvailVirtualMemoryMiB = MemStatus.ullAvailVirtual / 1024 / 1024; + Metrics.PageFileMiB = MemStatus.ullTotalPageFile / 1024 / 1024; + Metrics.AvailPageFileMiB = MemStatus.ullAvailPageFile / 1024 / 1024; + } + + // Query CPU usage using PDH + // + // TODO: This should be changed to not require a Sleep, perhaps by using some + // background metrics gathering mechanism. + + { + PDH_HQUERY QueryHandle = nullptr; + PDH_HCOUNTER CounterHandle = nullptr; - Metrics.SystemMemoryMiB = MemStatus.ullTotalPhys / 1024 / 1024; - Metrics.AvailSystemMemoryMiB = MemStatus.ullAvailPhys / 1024 / 1024; - Metrics.VirtualMemoryMiB = MemStatus.ullTotalVirtual / 1024 / 1024; - Metrics.AvailVirtualMemoryMiB = MemStatus.ullAvailVirtual / 1024 / 1024; - Metrics.PageFileMiB = MemStatus.ullTotalPageFile / 1024 / 1024; - Metrics.AvailPageFileMiB = MemStatus.ullAvailPageFile / 1024 / 1024; + if (PdhOpenQueryW(nullptr, 0, &QueryHandle) == ERROR_SUCCESS) + { + if (PdhAddEnglishCounterW(QueryHandle, L"\\Processor(_Total)\\% Processor Time", 0, &CounterHandle) == ERROR_SUCCESS) + { + PdhCollectQueryData(QueryHandle); + Sleep(100); + PdhCollectQueryData(QueryHandle); + + PDH_FMT_COUNTERVALUE CounterValue; + if (PdhGetFormattedCounterValue(CounterHandle, PDH_FMT_DOUBLE, nullptr, &CounterValue) == ERROR_SUCCESS) + { + Metrics.CpuUsagePercent = static_cast(CounterValue.doubleValue); + } + } + PdhCloseQuery(QueryHandle); + } + } return Metrics; } @@ -190,6 +235,39 @@ GetSystemMetrics() } } + // Query CPU usage + Metrics.CpuUsagePercent = 0.0f; + if (FILE* Stat = fopen("/proc/stat", "r")) + { + char Line[256]; + unsigned long User, Nice, System, Idle, IoWait, Irq, SoftIrq; + static unsigned long PrevUser = 0, PrevNice = 0, PrevSystem = 0, PrevIdle = 0, PrevIoWait = 0, PrevIrq = 0, PrevSoftIrq = 0; + + if (fgets(Line, sizeof(Line), Stat)) + { + if (sscanf(Line, "cpu %lu %lu %lu %lu %lu %lu %lu", &User, &Nice, &System, &Idle, &IoWait, &Irq, &SoftIrq) == 7) + { + unsigned long TotalDelta = (User + Nice + System + Idle + IoWait + Irq + SoftIrq) - + (PrevUser + PrevNice + PrevSystem + PrevIdle + PrevIoWait + PrevIrq + PrevSoftIrq); + unsigned long IdleDelta = Idle - PrevIdle; + + if (TotalDelta > 0) + { + Metrics.CpuUsagePercent = 100.0f * (TotalDelta - IdleDelta) / TotalDelta; + } + + PrevUser = User; + PrevNice = Nice; + PrevSystem = System; + PrevIdle = Idle; + PrevIoWait = IoWait; + PrevIrq = Irq; + PrevSoftIrq = SoftIrq; + } + } + fclose(Stat); + } + // Get memory information long Pages = sysconf(_SC_PHYS_PAGES); long PageSize = sysconf(_SC_PAGE_SIZE); @@ -270,6 +348,25 @@ GetSystemMetrics() sysctlbyname("hw.packages", &Packages, &Size, nullptr, 0); Metrics.CpuCount = Packages > 0 ? Packages : 1; + // Query CPU usage using host_statistics64 + Metrics.CpuUsagePercent = 0.0f; + host_cpu_load_info_data_t CpuLoad; + mach_msg_type_number_t CpuCount = sizeof(CpuLoad) / sizeof(natural_t); + if (host_statistics(mach_host_self(), HOST_CPU_LOAD_INFO, (host_info_t)&CpuLoad, &CpuCount) == KERN_SUCCESS) + { + unsigned long TotalTicks = 0; + for (int i = 0; i < CPU_STATE_MAX; ++i) + { + TotalTicks += CpuLoad.cpu_ticks[i]; + } + + if (TotalTicks > 0) + { + unsigned long IdleTicks = CpuLoad.cpu_ticks[CPU_STATE_IDLE]; + Metrics.CpuUsagePercent = 100.0f * (TotalTicks - IdleTicks) / TotalTicks; + } + } + // Get memory information uint64_t MemSize = 0; Size = sizeof(MemSize); diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index 14896c803..c640ba90b 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -331,6 +331,8 @@ public: virtual void WriteResponseAsync(std::function&& ContinuationHandler) override; virtual bool TryGetRanges(HttpRanges& Ranges) override; + void LogRequest(HttpMessageResponseRequest* Response); + using HttpServerRequest::WriteResponse; HttpSysServerRequest(const HttpSysServerRequest&) = delete; @@ -429,7 +431,8 @@ public: virtual HttpSysRequestHandler* HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) override; void SuppressResponseBody(); // typically used for HEAD requests - inline int64_t GetResponseBodySize() const { return m_TotalDataSize; } + inline uint16_t GetResponseCode() const { return m_ResponseCode; } + inline int64_t GetResponseBodySize() const { return m_TotalDataSize; } private: eastl::fixed_vector m_HttpDataChunks; @@ -1886,7 +1889,7 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode) ZEN_ASSERT(IsHandled() == false); - auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode); + HttpMessageResponseRequest* Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode); if (SuppressBody()) { @@ -1904,6 +1907,7 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode) # endif SetIsHandled(); + LogRequest(Response); } void @@ -1913,7 +1917,7 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy ZEN_ASSERT(IsHandled() == false); - auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs); + HttpMessageResponseRequest* Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs); if (SuppressBody()) { @@ -1931,6 +1935,20 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy # endif SetIsHandled(); + LogRequest(Response); +} + +void +HttpSysServerRequest::LogRequest(HttpMessageResponseRequest* Response) +{ + if (ShouldLogRequest()) + { + ZEN_INFO("{} {} {} -> {}", + ToString(RequestVerb()), + m_UriUtf8.c_str(), + Response->GetResponseCode(), + NiceBytes(Response->GetResponseBodySize())); + } } void @@ -1959,6 +1977,7 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy # endif SetIsHandled(); + LogRequest(Response); } void -- cgit v1.2.3 From 149a5c2faa8d59290b8b44717e504532e906aae2 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Wed, 18 Feb 2026 11:28:03 +0100 Subject: structured compute basics (#714) this change adds the `zencompute` component, which can be used to distribute work dispatched from UE using the DDB (Derived Data Build) APIs via zenserver this change also adds a distinct zenserver compute mode (`zenserver compute`) which is intended to be used for leaf compute nodes to exercise the compute functionality without directly involving UE, a `zen exec` subcommand is also added, which can be used to feed replays through the system all new functionality is considered *experimental* and disabled by default at this time, behind the `zencompute` option in xmake config --- docs/compute.md | 152 ++++ src/zen/cmds/exec_cmd.cpp | 654 ++++++++++++++ src/zen/cmds/exec_cmd.h | 97 ++ src/zen/xmake.lua | 5 +- src/zen/zen.cpp | 39 +- src/zencompute-test/xmake.lua | 9 + src/zencompute-test/zencompute-test.cpp | 32 + src/zencompute/actionrecorder.cpp | 258 ++++++ src/zencompute/actionrecorder.h | 91 ++ src/zencompute/functionrunner.cpp | 112 +++ src/zencompute/functionrunner.h | 207 +++++ src/zencompute/functionservice.cpp | 957 ++++++++++++++++++++ src/zencompute/httpfunctionservice.cpp | 709 +++++++++++++++ src/zencompute/httporchestrator.cpp | 81 ++ .../include/zencompute/functionservice.h | 132 +++ .../include/zencompute/httpfunctionservice.h | 73 ++ .../include/zencompute/httporchestrator.h | 44 + .../include/zencompute/recordingreader.h | 127 +++ src/zencompute/include/zencompute/zencompute.h | 11 + src/zencompute/localrunner.cpp | 722 +++++++++++++++ src/zencompute/localrunner.h | 100 +++ src/zencompute/recordingreader.cpp | 335 +++++++ src/zencompute/remotehttprunner.cpp | 457 ++++++++++ src/zencompute/remotehttprunner.h | 80 ++ src/zencompute/xmake.lua | 11 + src/zencompute/zencompute.cpp | 12 + src/zennet/beacon.cpp | 170 ++++ src/zennet/include/zennet/beacon.h | 38 + src/zennet/include/zennet/statsdclient.h | 2 + src/zennet/statsdclient.cpp | 1 + src/zenserver-test/function-tests.cpp | 34 + src/zenserver/compute/computeserver.cpp | 330 +++++++ src/zenserver/compute/computeserver.h | 106 +++ src/zenserver/compute/computeservice.cpp | 100 +++ src/zenserver/compute/computeservice.h | 36 + src/zenserver/frontend/html/compute.html | 991 +++++++++++++++++++++ src/zenserver/main.cpp | 55 +- src/zenserver/storage/storageconfig.cpp | 1 + src/zenserver/storage/storageconfig.h | 1 + src/zenserver/storage/zenstorageserver.cpp | 21 + src/zenserver/storage/zenstorageserver.h | 26 +- src/zenserver/xmake.lua | 4 + src/zenserver/zenserver.cpp | 8 + src/zenserver/zenserver.h | 13 +- src/zentest-appstub/xmake.lua | 3 + src/zentest-appstub/zentest-appstub.cpp | 391 +++++++- thirdparty/xmake.lua | 2 +- xmake.lua | 12 + 48 files changed, 7804 insertions(+), 48 deletions(-) create mode 100644 docs/compute.md create mode 100644 src/zen/cmds/exec_cmd.cpp create mode 100644 src/zen/cmds/exec_cmd.h create mode 100644 src/zencompute-test/xmake.lua create mode 100644 src/zencompute-test/zencompute-test.cpp create mode 100644 src/zencompute/actionrecorder.cpp create mode 100644 src/zencompute/actionrecorder.h create mode 100644 src/zencompute/functionrunner.cpp create mode 100644 src/zencompute/functionrunner.h create mode 100644 src/zencompute/functionservice.cpp create mode 100644 src/zencompute/httpfunctionservice.cpp create mode 100644 src/zencompute/httporchestrator.cpp create mode 100644 src/zencompute/include/zencompute/functionservice.h create mode 100644 src/zencompute/include/zencompute/httpfunctionservice.h create mode 100644 src/zencompute/include/zencompute/httporchestrator.h create mode 100644 src/zencompute/include/zencompute/recordingreader.h create mode 100644 src/zencompute/include/zencompute/zencompute.h create mode 100644 src/zencompute/localrunner.cpp create mode 100644 src/zencompute/localrunner.h create mode 100644 src/zencompute/recordingreader.cpp create mode 100644 src/zencompute/remotehttprunner.cpp create mode 100644 src/zencompute/remotehttprunner.h create mode 100644 src/zencompute/xmake.lua create mode 100644 src/zencompute/zencompute.cpp create mode 100644 src/zennet/beacon.cpp create mode 100644 src/zennet/include/zennet/beacon.h create mode 100644 src/zenserver-test/function-tests.cpp create mode 100644 src/zenserver/compute/computeserver.cpp create mode 100644 src/zenserver/compute/computeserver.h create mode 100644 src/zenserver/compute/computeservice.cpp create mode 100644 src/zenserver/compute/computeservice.h create mode 100644 src/zenserver/frontend/html/compute.html diff --git a/docs/compute.md b/docs/compute.md new file mode 100644 index 000000000..417622f94 --- /dev/null +++ b/docs/compute.md @@ -0,0 +1,152 @@ +# DDC compute interface design documentation + +This is a work in progress + +## General architecture + +The Zen server compute interfaces implement a basic model for distributing compute processes. +Clients can implement [Functions](#functions) in [worker executables](#workers) and dispatch +[actions](#actions) to them via a message based interface. + +The API requires users to describe the actions and the workers explicitly fully up front and the +work is described and submitted as singular objects to the compute service. The model somewhat +resembles Lambda and other stateless compute services but is more tightly constrained to allow +for optimizations and to integrate tightly with the storage components in Zen server. + +This is in contrast with Unreal Build Accelerator in where the worker (remote process) +and the inputs are discovered on-the-fly as the worker progresses and inputs and results +are communicated via relatively high-frequency RPCs. + +### Actions + +An action is described by an action descriptor, which is a compact binary object which +contains a self-contained description of the inputs and the function which should be applied +to generate an output. + +#### Sample Action Descriptor + +``` +work item 4857714dee2383b50b2e7d72afd79848ab5d13f8 (2 attachments): +Function: CompileShaderJobs +FunctionVersion: '83027356-2cf7-41ca-aba5-c81ab0ff2129' +BuildSystemVersion: '17fe280d-ccd8-4be8-a9d1-89c944a70969' +Inputs: + Input: + RawHash: 0c01d9f19033256ca974fced523d1e15b27c1b0a + RawSize: 4482 + Virtual0: + RawHash: dd9bbcb8763badd2f015f94f8f6e360362e2bce0 + RawSize: 3334 +``` + +### Functions + +Functions are identified by a name, and a version specification. For +matching purposes there's also a build system version specification. +When workers are registered with the compute service, they are entered +into a table and as actions stream in the compute subsystem will try to +find a worker which implements the required function using the +`[Function,FunctionVersion,BuildSystemVersion]` tuple. In practice there +may be more than one matching worker and it's up to the compute service +to pick one. + +``` +=== Known functions =========================== +function version build system worker id +CompileShaderJobs 83027356-2cf7-41ca-aba5-c81ab0ff2129 17fe280d-ccd8-4be8-a9d1-89c944a70969 69cb9bb50e9600b5bd5e5ca4ba0f9187b118069a +``` + +### Workers + +A worker is an executable which accepts some command line options which are used to pass the +information required to execute an action. There are two modes, one legacy mode which is +file-based and a streaming mode. + +In the file-based mode the option is simply `-Build=` which points to an action +descriptor in compact binary format (see above). By convention, the referenced inputs are in a folder +named `Inputs` where any input blobs are stored as `CompressedBuffer`-format files named +after the `IoHash` of the uncompressed contents. + +In the streaming mode, the data is provided through a streaming socket interface instead +of using the file system. This eliminates process spawning overheads and enables intra-process +pipelining for greater efficiency. The streaming mode is not yet implemented fully. + +### Worker Descriptors + +Workers are declared by passing a worker descriptor to the compute service. The descriptor +contains information about which executable files are required to execute the worker and how +they need to be laid out. You can optionally also provide additional non-executable files to +go along with the executables. + +The descriptor also lists the functions implemented by the worker. Each function defines +a version which is used when matching actions (the function version is passed in as the +`FunctionVersion` in the action descriptor). + +Each worker links in a small set of common support code which is used to handle the +communication with the invoking program (the 'build system'). To be able to evolve this +interface, each worker also indicates the version of the build system using the +`BuildSystemVersion` attribute. + +#### Sample Worker Descriptor + +``` +worker 69cb9bb50e9600b5bd5e5ca4ba0f9187b118069a: +name: ShaderBuildWorker +path: Engine/Binaries/Win64/ShaderBuildWorker.exe +host: Win64 +buildsystem_version: '17fe280d-ccd8-4be8-a9d1-89c944a70969' +timeout: 300 +cores: 1 +environment: [] +executables: + - name: 'Engine/Binaries/Win64/ShaderBuildWorker-DerivedDataBuildWorker.dll' + hash: f4dbec80e549bae2916288f1b9428c2878d9ae7a + size: 166912 + - name: 'Engine/Binaries/Win64/ShaderBuildWorker-DerivedDataCache.dll' + hash: 8025d561ede05db19b235fc2ef290e2b029c1b8c + size: 4339200 + - name: Engine/Binaries/Win64/ShaderBuildWorker.exe + hash: b85862fca2ce04990470f27bae9ead7f31d9b27e + size: 60928 + - name: Engine/Binaries/Win64/ShaderBuildWorker.modules + hash: 7b05741a69a2ea607c5578668a8de50b04259668 + size: 3739 + - name: Engine/Binaries/Win64/ShaderBuildWorker.version + hash: 8fdfd9f825febf2191b555393e69b32a1d78c24f + size: 259 +files: [] +dirs: + - Engine/Binaries/Win64 +functions: + - name: CompileShaderJobs + version: '83027356-2cf7-41ca-aba5-c81ab0ff2129' +``` + +## API (WIP not final) + +The compute interfaces are currently exposed on the `/apply` endpoint but this +will be subject to change as we adapt the interfaces during development. The LSN +APIs below are intended to replace the action ID oriented APIs. + +The POST APIs typically involve a two-step dance where a descriptor is POSTed and +the service responds with a list of `needs` chunks (identified via `IoHash`) which +it does not have yet. The client can then follow up with a POST of a Compact Binary +Package containing the descriptor along with the needed chunks. + +`/apply/ready` - health check endpoint returns HTTP 200 OK or HTTP 503 + +`/apply/sysinfo` - system information endpoint + +`/apply/record/start`, `/apply/record/stop` - start/stop action recording + +`/apply/workers/{worker}` - GET/POST worker descriptors and payloads + +`/apply/jobs/completed` - GET list of completed actions + +`/apply/jobs/{lsn}` - GET completed action results from LSN, POST action cancellation by LSN, priority changes by LSN + +`/apply/jobs/{worker}/{action}` - GET completed action (job) results by action ID + +`/apply/jobs/{worker}` - GET pending/running jobs for worker, POST requests to schedule action as a job + +`/apply/jobs` - POST request to schedule action as a job diff --git a/src/zen/cmds/exec_cmd.cpp b/src/zen/cmds/exec_cmd.cpp new file mode 100644 index 000000000..2d9d0d12e --- /dev/null +++ b/src/zen/cmds/exec_cmd.cpp @@ -0,0 +1,654 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "exec_cmd.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +using namespace std::literals; + +namespace eastl { + +template<> +struct hash : public zen::IoHash::Hasher +{ +}; + +} // namespace eastl + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen { + +ExecCommand::ExecCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName), ""); + m_Options.add_option("", "", "log", "Action log directory", cxxopts::value(m_RecordingLogPath), ""); + m_Options.add_option("", "p", "path", "Recording path (directory or .actionlog file)", cxxopts::value(m_RecordingPath), ""); + m_Options.add_option("", "", "offset", "Recording replay start offset", cxxopts::value(m_Offset), ""); + m_Options.add_option("", "", "stride", "Recording replay stride", cxxopts::value(m_Stride), ""); + m_Options.add_option("", "", "limit", "Recording replay limit", cxxopts::value(m_Limit), ""); + m_Options.add_option("", "", "beacon", "Beacon path", cxxopts::value(m_BeaconPath), ""); + m_Options.add_option("", + "", + "mode", + "Select execution mode (http,inproc,dump,direct,beacon,buildlog)", + cxxopts::value(m_Mode)->default_value("http"), + ""); + m_Options.add_option("", "", "quiet", "Quiet mode (less logging)", cxxopts::value(m_Quiet), ""); + m_Options.parse_positional("mode"); +} + +ExecCommand::~ExecCommand() +{ +} + +void +ExecCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + // Configure + + if (!ParseOptions(argc, argv)) + { + return; + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_RecordingPath.empty()) + { + throw OptionParseException("replay path is required!", m_Options.help()); + } + + m_VerboseLogging = GlobalOptions.IsVerbose; + m_QuietLogging = m_Quiet && !m_VerboseLogging; + + enum ExecMode + { + kHttp, + kDirect, + kInproc, + kDump, + kBeacon, + kBuildLog + } Mode; + + if (m_Mode == "http"sv) + { + Mode = kHttp; + } + else if (m_Mode == "direct"sv) + { + Mode = kDirect; + } + else if (m_Mode == "inproc"sv) + { + Mode = kInproc; + } + else if (m_Mode == "dump"sv) + { + Mode = kDump; + } + else if (m_Mode == "beacon"sv) + { + Mode = kBeacon; + } + else if (m_Mode == "buildlog"sv) + { + Mode = kBuildLog; + } + else + { + throw OptionParseException("invalid mode specified!", m_Options.help()); + } + + // Gather information from recording path + + std::unique_ptr Reader; + std::unique_ptr UeReader; + + std::filesystem::path RecordingPath{m_RecordingPath}; + + if (!std::filesystem::is_directory(RecordingPath)) + { + throw OptionParseException("replay path should be a directory path!", m_Options.help()); + } + else + { + if (std::filesystem::is_directory(RecordingPath / "cid")) + { + Reader = std::make_unique(RecordingPath); + m_WorkerMap = Reader->ReadWorkers(); + m_ChunkResolver = Reader.get(); + m_RecordingReader = Reader.get(); + } + else + { + UeReader = std::make_unique(RecordingPath); + m_WorkerMap = UeReader->ReadWorkers(); + m_ChunkResolver = UeReader.get(); + m_RecordingReader = UeReader.get(); + } + } + + ZEN_CONSOLE("found {} workers, {} action items", m_WorkerMap.size(), m_RecordingReader->GetActionCount()); + + for (auto& Kv : m_WorkerMap) + { + CbObject WorkerDesc = Kv.second.GetObject(); + const IoHash& WorkerId = Kv.first; + + RegisterWorkerFunctionsFromDescription(WorkerDesc, WorkerId); + + if (m_VerboseLogging) + { + zen::ExtendableStringBuilder<1024> ObjStr; +# if 0 + zen::CompactBinaryToJson(WorkerDesc, ObjStr); + ZEN_CONSOLE("worker {}: {}", WorkerId, ObjStr); +# else + zen::CompactBinaryToYaml(WorkerDesc, ObjStr); + ZEN_CONSOLE("worker {}:\n{}", WorkerId, ObjStr); +# endif + } + } + + if (m_VerboseLogging) + { + EmitFunctionList(m_FunctionList); + } + + // Iterate over work items and dispatch or log them + + int ReturnValue = 0; + + Stopwatch ExecTimer; + + switch (Mode) + { + case kHttp: + // Forward requests to HTTP function service + ReturnValue = HttpExecute(); + break; + + case kDirect: + // Not currently supported + ReturnValue = LocalMessagingExecute(); + break; + + case kInproc: + // Handle execution in-core (by spawning child processes) + ReturnValue = InProcessExecute(); + break; + + case kDump: + // Dump high level information about actions to console + ReturnValue = DumpWorkItems(); + break; + + case kBeacon: + ReturnValue = BeaconExecute(); + break; + + case kBuildLog: + ReturnValue = BuildActionsLog(); + break; + + default: + ZEN_ERROR("Unknown operating mode! No work submitted"); + + ReturnValue = 1; + } + + ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs())); + + if (!ReturnValue) + { + ZEN_CONSOLE("all work items completed successfully"); + } + else + { + ZEN_CONSOLE("some work items failed (code {})", ReturnValue); + } +} + +int +ExecCommand::InProcessExecute() +{ + ZEN_ASSERT(m_ChunkResolver); + ChunkResolver& Resolver = *m_ChunkResolver; + + zen::compute::FunctionServiceSession FunctionSession(Resolver); + + std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); + FunctionSession.AddLocalRunner(Resolver, TempPath); + + return ExecUsingSession(FunctionSession); +} + +int +ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSession) +{ + struct JobTracker + { + public: + inline void Insert(int LsnField) + { + RwLock::ExclusiveLockScope _(Lock); + PendingJobs.insert(LsnField); + } + + inline bool IsEmpty() const + { + RwLock::SharedLockScope _(Lock); + return PendingJobs.empty(); + } + + inline void Remove(int CompleteLsn) + { + RwLock::ExclusiveLockScope _(Lock); + PendingJobs.erase(CompleteLsn); + } + + inline size_t GetSize() const + { + RwLock::SharedLockScope _(Lock); + return PendingJobs.size(); + } + + private: + mutable RwLock Lock; + std::unordered_set PendingJobs; + }; + + JobTracker PendingJobs; + + std::atomic IsDraining{0}; + + auto DrainCompletedJobs = [&] { + if (IsDraining.exchange(1)) + { + return; + } + + auto _ = MakeGuard([&] { IsDraining.store(0, std::memory_order_release); }); + + CbObjectWriter Cbo; + FunctionSession.GetCompleted(Cbo); + + if (CbObject Completed = Cbo.Save()) + { + for (auto& It : Completed["completed"sv]) + { + int32_t CompleteLsn = It.AsInt32(); + + CbPackage ResultPackage; + HttpResponseCode Response = FunctionSession.GetActionResult(CompleteLsn, /* out */ ResultPackage); + + if (Response == HttpResponseCode::OK) + { + PendingJobs.Remove(CompleteLsn); + + ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, PendingJobs.GetSize()); + } + } + } + }; + + // Describe workers + + ZEN_CONSOLE("describing {} workers", m_WorkerMap.size()); + + for (auto Kv : m_WorkerMap) + { + CbPackage WorkerDesc = Kv.second; + + FunctionSession.RegisterWorker(WorkerDesc); + } + + // Then submit work items + + int FailedWorkCounter = 0; + size_t RemainingWorkItems = m_RecordingReader->GetActionCount(); + int SubmittedWorkItems = 0; + + ZEN_CONSOLE("submitting {} work items", RemainingWorkItems); + + int OffsetCounter = m_Offset; + int StrideCounter = m_Stride; + + auto ShouldSchedule = [&]() -> bool { + if (m_Limit && SubmittedWorkItems >= m_Limit) + { + // Limit reached, ignore + + return false; + } + + if (OffsetCounter && OffsetCounter--) + { + // Still in offset, ignore + + return false; + } + + if (--StrideCounter == 0) + { + StrideCounter = m_Stride; + + return true; + } + + return false; + }; + + m_RecordingReader->IterateActions( + [&](CbObject ActionObject, const IoHash& ActionId) { + // Enqueue job + + Stopwatch SubmitTimer; + + const int Priority = 0; + + if (ShouldSchedule()) + { + if (m_VerboseLogging) + { + int AttachmentCount = 0; + uint64_t AttachmentBytes = 0; + eastl::hash_set ReferencedChunks; + + ActionObject.IterateAttachments([&](CbFieldView Field) { + IoHash AttachData = Field.AsAttachment(); + + ReferencedChunks.insert(AttachData); + ++AttachmentCount; + + if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData)) + { + AttachmentBytes += ChunkData.GetSize(); + } + }); + + zen::ExtendableStringBuilder<1024> ObjStr; + zen::CompactBinaryToJson(ActionObject, ObjStr); + ZEN_CONSOLE("work item {} ({} attachments, {} bytes): {}", + ActionId, + AttachmentCount, + NiceBytes(AttachmentBytes), + ObjStr); + } + + if (zen::compute::FunctionServiceSession::EnqueueResult EnqueueResult = + FunctionSession.EnqueueAction(ActionObject, Priority)) + { + const int32_t LsnField = EnqueueResult.Lsn; + + --RemainingWorkItems; + ++SubmittedWorkItems; + + if (!m_QuietLogging) + { + ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining", + SubmittedWorkItems, + LsnField, + NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()), + RemainingWorkItems); + } + + PendingJobs.Insert(LsnField); + } + else + { + if (!m_QuietLogging) + { + std::string_view FunctionName = ActionObject["Function"sv].AsString(); + const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid(); + const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid(); + + ZEN_ERROR( + "failed to resolve function for work with (Function:{},FunctionVersion:{},BuildSystemVersion:{}). Work " + "descriptor " + "at: 'file://{}'", + std::string(FunctionName), + FunctionVersion, + BuildSystemVersion, + ""); + + EmitFunctionListOnce(m_FunctionList); + } + + ++FailedWorkCounter; + } + } + + // Check for completed work + + DrainCompletedJobs(); + }, + 8); + + // Wait until all pending work is complete + + while (!PendingJobs.IsEmpty()) + { + // TODO: improve this logic + zen::Sleep(500); + + DrainCompletedJobs(); + } + + if (FailedWorkCounter) + { + return 1; + } + + return 0; +} + +int +ExecCommand::LocalMessagingExecute() +{ + // Non-HTTP work submission path + + // To be reimplemented using final transport + + return 0; +} + +////////////////////////////////////////////////////////////////////////// + +int +ExecCommand::HttpExecute() +{ + ZEN_ASSERT(m_ChunkResolver); + ChunkResolver& Resolver = *m_ChunkResolver; + + std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); + + zen::compute::FunctionServiceSession FunctionSession(Resolver); + FunctionSession.AddRemoteRunner(Resolver, TempPath, m_HostName); + + return ExecUsingSession(FunctionSession); +} + +int +ExecCommand::BeaconExecute() +{ + ZEN_ASSERT(m_ChunkResolver); + ChunkResolver& Resolver = *m_ChunkResolver; + std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); + + zen::compute::FunctionServiceSession FunctionSession(Resolver); + FunctionSession.AddRemoteRunner(Resolver, TempPath, "http://localhost:8558"); + // FunctionSession.AddRemoteRunner(Resolver, TempPath, "http://10.99.9.246:8558"); + + return ExecUsingSession(FunctionSession); +} + +////////////////////////////////////////////////////////////////////////// + +void +ExecCommand::RegisterWorkerFunctionsFromDescription(const CbObject& WorkerDesc, const IoHash& WorkerId) +{ + const Guid WorkerBuildSystemVersion = WorkerDesc["buildsystem_version"sv].AsUuid(); + + for (auto& Item : WorkerDesc["functions"sv]) + { + CbObjectView Function = Item.AsObjectView(); + + std::string_view FunctionName = Function["name"sv].AsString(); + const Guid FunctionVersion = Function["version"sv].AsUuid(); + + m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName}, + .FunctionVersion = FunctionVersion, + .BuildSystemVersion = WorkerBuildSystemVersion, + .WorkerId = WorkerId}); + } +} + +void +ExecCommand::EmitFunctionListOnce(const std::vector& FunctionList) +{ + if (m_FunctionListEmittedOnce == false) + { + EmitFunctionList(FunctionList); + + m_FunctionListEmittedOnce = true; + } +} + +int +ExecCommand::DumpWorkItems() +{ + std::atomic EmittedCount{0}; + + eastl::hash_map SeenAttachments; // Attachment CID -> count of references + + m_RecordingReader->IterateActions( + [&](CbObject ActionObject, const IoHash& ActionId) { + eastl::hash_map Attachments; + + uint64_t AttachmentBytes = 0; + uint64_t UncompressedAttachmentBytes = 0; + + ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) { + const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); + IoBuffer AttachmentData = m_ChunkResolver->FindChunkByCid(AttachmentCid); + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); + Attachments[AttachmentCid] = CompressedData; + + AttachmentBytes += CompressedData.GetCompressedSize(); + UncompressedAttachmentBytes += CompressedData.DecodeRawSize(); + + if (auto [Iter, Inserted] = SeenAttachments.insert({AttachmentCid, 1}); !Inserted) + { + ++Iter->second; + } + }); + + zen::ExtendableStringBuilder<1024> ObjStr; + +# if 0 + zen::CompactBinaryToJson(ActionObject, ObjStr); + ZEN_CONSOLE("work item {} ({} attachments): {}", ActionId, Attachments.size(), ObjStr); +# else + zen::CompactBinaryToYaml(ActionObject, ObjStr); + ZEN_CONSOLE("work item {} ({} attachments, {}->{} bytes):\n{}", + ActionId, + Attachments.size(), + AttachmentBytes, + UncompressedAttachmentBytes, + ObjStr); +# endif + + ++EmittedCount; + }, + 1); + + ZEN_CONSOLE("emitted: {} actions", EmittedCount.load()); + + eastl::map> ReferenceHistogram; + + for (const auto& [K, V] : SeenAttachments) + { + if (V > 1) + { + ReferenceHistogram[V].push_back(K); + } + } + + for (const auto& [RefCount, Cids] : ReferenceHistogram) + { + ZEN_CONSOLE("{} attachments with {} references", Cids.size(), RefCount); + } + + return 0; +} + +////////////////////////////////////////////////////////////////////////// + +int +ExecCommand::BuildActionsLog() +{ + ZEN_ASSERT(m_ChunkResolver); + ChunkResolver& Resolver = *m_ChunkResolver; + + if (m_RecordingPath.empty()) + { + throw OptionParseException("need to specify recording path", m_Options.help()); + } + + if (std::filesystem::exists(m_RecordingLogPath)) + { + throw OptionParseException(fmt::format("recording log directory '{}' already exists!", m_RecordingLogPath), m_Options.help()); + } + + ZEN_NOT_IMPLEMENTED("build log generation not implemented yet!"); + + std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); + + zen::compute::FunctionServiceSession FunctionSession(Resolver); + FunctionSession.StartRecording(Resolver, m_RecordingLogPath); + + return ExecUsingSession(FunctionSession); +} + +void +ExecCommand::EmitFunctionList(const std::vector& FunctionList) +{ + ZEN_CONSOLE("=== Known functions:\n==========================="); + + ZEN_CONSOLE("{:30} {:36} {:36} {}", "function", "version", "build system", "worker id"); + + for (const FunctionDefinition& Func : FunctionList) + { + ZEN_CONSOLE("{:30} {:36} {:36} {}", Func.FunctionName, Func.FunctionVersion, Func.BuildSystemVersion, Func.WorkerId); + } + + ZEN_CONSOLE("==========================="); +} + +} // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zen/cmds/exec_cmd.h b/src/zen/cmds/exec_cmd.h new file mode 100644 index 000000000..43d092144 --- /dev/null +++ b/src/zen/cmds/exec_cmd.h @@ -0,0 +1,97 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "../zen.h" + +#include +#include +#include +#include + +#include +#include +#include + +namespace zen { +class CbPackage; +class CbObject; +struct IoHash; +class ChunkResolver; +} // namespace zen + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen::compute { +class FunctionServiceSession; +} + +namespace zen { + +/** + * Zen CLI command for executing functions from a recording + * + * Mostly for testing and debugging purposes + */ + +class ExecCommand : public ZenCmdBase +{ +public: + ExecCommand(); + ~ExecCommand(); + + static constexpr char Name[] = "exec"; + static constexpr char Description[] = "Execute functions from a recording"; + + virtual void Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options& Options() override { return m_Options; } + +private: + cxxopts::Options m_Options{Name, Description}; + std::string m_HostName; + std::filesystem::path m_BeaconPath; + std::filesystem::path m_RecordingPath; + std::filesystem::path m_RecordingLogPath; + int m_Offset = 0; + int m_Stride = 1; + int m_Limit = 0; + bool m_Quiet = false; + std::string m_Mode{"http"}; + + struct FunctionDefinition + { + std::string FunctionName; + zen::Guid FunctionVersion; + zen::Guid BuildSystemVersion; + zen::IoHash WorkerId; + }; + + bool m_FunctionListEmittedOnce = false; + void EmitFunctionListOnce(const std::vector& FunctionList); + void EmitFunctionList(const std::vector& FunctionList); + + std::unordered_map m_WorkerMap; + std::vector m_FunctionList; + bool m_VerboseLogging = false; + bool m_QuietLogging = false; + + zen::ChunkResolver* m_ChunkResolver = nullptr; + zen::compute::RecordingReaderBase* m_RecordingReader = nullptr; + + void RegisterWorkerFunctionsFromDescription(const zen::CbObject& WorkerDesc, const zen::IoHash& WorkerId); + + int ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSession); + + // Execution modes + + int DumpWorkItems(); + int HttpExecute(); + int InProcessExecute(); + int LocalMessagingExecute(); + int BeaconExecute(); + int BuildActionsLog(); +}; + +} // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zen/xmake.lua b/src/zen/xmake.lua index ab094fef3..f889c3296 100644 --- a/src/zen/xmake.lua +++ b/src/zen/xmake.lua @@ -6,15 +6,12 @@ target("zen") add_files("**.cpp") add_files("zen.cpp", {unity_ignored = true }) add_deps("zencore", "zenhttp", "zenremotestore", "zenstore", "zenutil") + add_deps("zencompute", "zennet") add_deps("cxxopts", "fmt") add_packages("json11") add_includedirs(".") set_symbols("debug") - if is_mode("release") then - set_optimize("fastest") - end - if is_plat("windows") then add_files("zen.rc") add_ldflags("/subsystem:console,5.02") diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index 25245c3d2..018f77738 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -11,6 +11,7 @@ #include "cmds/cache_cmd.h" #include "cmds/copy_cmd.h" #include "cmds/dedup_cmd.h" +#include "cmds/exec_cmd.h" #include "cmds/info_cmd.h" #include "cmds/print_cmd.h" #include "cmds/projectstore_cmd.h" @@ -316,22 +317,25 @@ main(int argc, char** argv) } #endif // ZEN_WITH_TRACE - AttachCommand AttachCmd; - BenchCommand BenchCmd; - BuildsCommand BuildsCmd; - CacheDetailsCommand CacheDetailsCmd; - CacheGetCommand CacheGetCmd; - CacheGenerateCommand CacheGenerateCmd; - CacheInfoCommand CacheInfoCmd; - CacheStatsCommand CacheStatsCmd; - CopyCommand CopyCmd; - CopyStateCommand CopyStateCmd; - CreateOplogCommand CreateOplogCmd; - CreateProjectCommand CreateProjectCmd; - DedupCommand DedupCmd; - DownCommand DownCmd; - DropCommand DropCmd; - DropProjectCommand ProjectDropCmd; + AttachCommand AttachCmd; + BenchCommand BenchCmd; + BuildsCommand BuildsCmd; + CacheDetailsCommand CacheDetailsCmd; + CacheGetCommand CacheGetCmd; + CacheGenerateCommand CacheGenerateCmd; + CacheInfoCommand CacheInfoCmd; + CacheStatsCommand CacheStatsCmd; + CopyCommand CopyCmd; + CopyStateCommand CopyStateCmd; + CreateOplogCommand CreateOplogCmd; + CreateProjectCommand CreateProjectCmd; + DedupCommand DedupCmd; + DownCommand DownCmd; + DropCommand DropCmd; + DropProjectCommand ProjectDropCmd; +#if ZEN_WITH_COMPUTE_SERVICES + ExecCommand ExecCmd; +#endif // ZEN_WITH_COMPUTE_SERVICES ExportOplogCommand ExportOplogCmd; FlushCommand FlushCmd; GcCommand GcCmd; @@ -388,6 +392,9 @@ main(int argc, char** argv) {"dedup", &DedupCmd, "Dedup files"}, {"down", &DownCmd, "Bring zen server down"}, {"drop", &DropCmd, "Drop cache namespace or bucket"}, +#if ZEN_WITH_COMPUTE_SERVICES + {ExecCommand::Name, &ExecCmd, ExecCommand::Description}, +#endif {"gc-status", &GcStatusCmd, "Garbage collect zen storage status check"}, {"gc-stop", &GcStopCmd, "Request cancel of running garbage collection in zen storage"}, {"gc", &GcCmd, "Garbage collect zen storage"}, diff --git a/src/zencompute-test/xmake.lua b/src/zencompute-test/xmake.lua new file mode 100644 index 000000000..64a3c7703 --- /dev/null +++ b/src/zencompute-test/xmake.lua @@ -0,0 +1,9 @@ +-- Copyright Epic Games, Inc. All Rights Reserved. + +target("zencompute-test") + set_kind("binary") + set_group("tests") + add_headerfiles("**.h") + add_files("*.cpp") + add_deps("zencompute", "zencore") + add_packages("vcpkg::doctest") diff --git a/src/zencompute-test/zencompute-test.cpp b/src/zencompute-test/zencompute-test.cpp new file mode 100644 index 000000000..237812e12 --- /dev/null +++ b/src/zencompute-test/zencompute-test.cpp @@ -0,0 +1,32 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include +#include +#include +#include + +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC +# include +# include +# include +#endif + +#if ZEN_WITH_TESTS +# define ZEN_TEST_WITH_RUNNER 1 +# include +#endif + +int +main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) +{ +#if ZEN_WITH_TESTS + zen::zencompute_forcelinktests(); + + zen::logging::InitializeLogging(); + zen::MaximizeOpenFileCount(); + + return ZEN_RUN_TESTS(argc, argv); +#else + return 0; +#endif +} diff --git a/src/zencompute/actionrecorder.cpp b/src/zencompute/actionrecorder.cpp new file mode 100644 index 000000000..04c4b5141 --- /dev/null +++ b/src/zencompute/actionrecorder.cpp @@ -0,0 +1,258 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "actionrecorder.h" + +#include "functionrunner.h" + +#include +#include +#include +#include +#include +#include + +#if ZEN_PLATFORM_WINDOWS +# include +# define ZEN_CONCRT_AVAILABLE 1 +#else +# define ZEN_CONCRT_AVAILABLE 0 +#endif + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen::compute { + +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +RecordingFileWriter::RecordingFileWriter() +{ +} + +RecordingFileWriter::~RecordingFileWriter() +{ + Close(); +} + +void +RecordingFileWriter::Open(std::filesystem::path FilePath) +{ + using namespace std::literals; + + m_File.Open(FilePath, BasicFile::Mode::kTruncate); + m_File.Write("----DDC2----DATA", 16, 0); + m_FileOffset = 16; + + std::filesystem::path TocPath = FilePath.replace_extension(".ztoc"); + m_TocFile.Open(TocPath, BasicFile::Mode::kTruncate); + + m_TocWriter << "version"sv << 1; + m_TocWriter.BeginArray("toc"sv); +} + +void +RecordingFileWriter::Close() +{ + m_TocWriter.EndArray(); + CbObject Toc = m_TocWriter.Save(); + + std::error_code Ec; + m_TocFile.WriteAll(Toc.GetBuffer().AsIoBuffer(), Ec); +} + +void +RecordingFileWriter::AppendObject(const CbObject& Object, const IoHash& ObjectHash) +{ + RwLock::ExclusiveLockScope _(m_FileLock); + + MemoryView ObjectView = Object.GetBuffer().GetView(); + + std::error_code Ec; + m_File.Write(ObjectView, m_FileOffset, Ec); + + if (Ec) + { + throw std::system_error(Ec, "failed writing to archive"); + } + + m_TocWriter.BeginArray(); + m_TocWriter.AddHash(ObjectHash); + m_TocWriter.AddInteger(m_FileOffset); + m_TocWriter.AddInteger(gsl::narrow(ObjectView.GetSize())); + m_TocWriter.EndArray(); + + m_FileOffset += ObjectView.GetSize(); +} + +////////////////////////////////////////////////////////////////////////// + +ActionRecorder::ActionRecorder(ChunkResolver& InChunkResolver, const std::filesystem::path& RecordingLogPath) +: m_ChunkResolver(InChunkResolver) +, m_RecordingLogDir(RecordingLogPath) +{ + std::error_code Ec; + CreateDirectories(m_RecordingLogDir, Ec); + + if (Ec) + { + ZEN_WARN("Could not create directory '{}': {}", m_RecordingLogDir, Ec.message()); + } + + CleanDirectory(m_RecordingLogDir, /* ForceRemoveReadOnlyFiles */ true, Ec); + + if (Ec) + { + ZEN_WARN("Could not clean directory '{}': {}", m_RecordingLogDir, Ec.message()); + } + + m_WorkersFile.Open(m_RecordingLogDir / "workers.zdat"); + m_ActionsFile.Open(m_RecordingLogDir / "actions.zdat"); + + CidStoreConfiguration CidConfig; + CidConfig.RootDirectory = m_RecordingLogDir / "cid"; + CidConfig.HugeValueThreshold = 128 * 1024 * 1024; + + m_CidStore.Initialize(CidConfig); +} + +ActionRecorder::~ActionRecorder() +{ + Shutdown(); +} + +void +ActionRecorder::Shutdown() +{ + m_CidStore.Flush(); +} + +void +ActionRecorder::RegisterWorker(const CbPackage& WorkerPackage) +{ + const IoHash WorkerId = WorkerPackage.GetObjectHash(); + + m_WorkersFile.AppendObject(WorkerPackage.GetObject(), WorkerId); + + std::unordered_set AddedChunks; + uint64_t AddedBytes = 0; + + // First add all attachments from the worker package itself + + for (const CbAttachment& Attachment : WorkerPackage.GetAttachments()) + { + CompressedBuffer Buffer = Attachment.AsCompressedBinary(); + IoBuffer Data = Buffer.GetCompressed().Flatten().AsIoBuffer(); + + const IoHash ChunkHash = Buffer.DecodeRawHash(); + + CidStore::InsertResult Result = m_CidStore.AddChunk(Data, ChunkHash, CidStore::InsertMode::kCopyOnly); + + AddedChunks.insert(ChunkHash); + + if (Result.New) + { + AddedBytes += Data.GetSize(); + } + } + + // Not all attachments will be present in the worker package, so we need to add + // all referenced chunks to ensure that the recording is self-contained and not + // referencing data in the main CID store + + CbObject WorkerDescriptor = WorkerPackage.GetObject(); + + WorkerDescriptor.IterateAttachments([&](const CbFieldView AttachmentField) { + const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); + + if (!AddedChunks.contains(AttachmentCid)) + { + IoBuffer AttachmentData = m_ChunkResolver.FindChunkByCid(AttachmentCid); + + if (AttachmentData) + { + CidStore::InsertResult Result = m_CidStore.AddChunk(AttachmentData, AttachmentCid, CidStore::InsertMode::kCopyOnly); + + if (Result.New) + { + AddedBytes += AttachmentData.GetSize(); + } + } + else + { + ZEN_WARN("RegisterWorker: could not resolve attachment chunk {} for worker {}", AttachmentCid, WorkerId); + } + + AddedChunks.insert(AttachmentCid); + } + }); + + ZEN_INFO("recorded worker {} with {} attachments ({} bytes)", WorkerId, AddedChunks.size(), AddedBytes); +} + +bool +ActionRecorder::RecordAction(Ref Action) +{ + bool AllGood = true; + + Action->ActionObj.IterateAttachments([&](CbFieldView Field) { + IoHash AttachData = Field.AsHash(); + IoBuffer ChunkData = m_ChunkResolver.FindChunkByCid(AttachData); + + if (ChunkData) + { + if (ChunkData.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash DecompressedHash; + uint64_t RawSize = 0; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(ChunkData), /* out */ DecompressedHash, /* out*/ RawSize); + + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + uint64_t BlockSize = 0; + if (Compressed.TryGetCompressParameters(/* out */ Compressor, /* out */ CompressionLevel, /* out */ BlockSize)) + { + if (Compressor == OodleCompressor::NotSet) + { + CompositeBuffer Decompressed = Compressed.DecompressToComposite(); + CompressedBuffer NewCompressed = CompressedBuffer::Compress(std::move(Decompressed), + OodleCompressor::Mermaid, + OodleCompressionLevel::Fast, + BlockSize); + + ChunkData = NewCompressed.GetCompressed().Flatten().AsIoBuffer(); + } + } + } + + const uint64_t ChunkSize = ChunkData.GetSize(); + + m_CidStore.AddChunk(ChunkData, AttachData, CidStore::InsertMode::kCopyOnly); + ++m_ChunkCounter; + m_ChunkBytesCounter.fetch_add(ChunkSize); + } + else + { + AllGood = false; + + ZEN_WARN("could not resolve chunk {}", AttachData); + } + }); + + if (AllGood) + { + m_ActionsFile.AppendObject(Action->ActionObj, Action->ActionId); + ++m_ActionsCounter; + + return true; + } + else + { + return false; + } +} + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/actionrecorder.h b/src/zencompute/actionrecorder.h new file mode 100644 index 000000000..9cc2b44a2 --- /dev/null +++ b/src/zencompute/actionrecorder.h @@ -0,0 +1,91 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace zen { +class CbObject; +class CbPackage; +struct IoHash; +} // namespace zen + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen::compute { + +////////////////////////////////////////////////////////////////////////// + +struct RecordingFileWriter +{ + RecordingFileWriter(RecordingFileWriter&&) = delete; + RecordingFileWriter& operator=(RecordingFileWriter&&) = delete; + + RwLock m_FileLock; + BasicFile m_File; + uint64_t m_FileOffset = 0; + CbObjectWriter m_TocWriter; + BasicFile m_TocFile; + + RecordingFileWriter(); + ~RecordingFileWriter(); + + void Open(std::filesystem::path FilePath); + void Close(); + void AppendObject(const CbObject& Object, const IoHash& ObjectHash); +}; + +////////////////////////////////////////////////////////////////////////// + +/** + * Recording "runner" implementation + * + * This class writes out all actions and their attachments to a recording directory + * in a format that can be read back by the RecordingReader. + * + * The contents of the recording directory will be self-contained, with all referenced + * attachments stored in the recording directory itself, so that the recording can be + * moved or shared without needing to maintain references to the main CID store. + * + */ + +class ActionRecorder +{ +public: + ActionRecorder(ChunkResolver& InChunkResolver, const std::filesystem::path& RecordingLogPath); + ~ActionRecorder(); + + ActionRecorder(const ActionRecorder&) = delete; + ActionRecorder& operator=(const ActionRecorder&) = delete; + + void Shutdown(); + void RegisterWorker(const CbPackage& WorkerPackage); + bool RecordAction(Ref Action); + +private: + ChunkResolver& m_ChunkResolver; + std::filesystem::path m_RecordingLogDir; + + RecordingFileWriter m_WorkersFile; + RecordingFileWriter m_ActionsFile; + GcManager m_Gc; + CidStore m_CidStore{m_Gc}; + std::atomic m_ChunkCounter{0}; + std::atomic m_ChunkBytesCounter{0}; + std::atomic m_ActionsCounter{0}; +}; + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/functionrunner.cpp b/src/zencompute/functionrunner.cpp new file mode 100644 index 000000000..8e7c12b2b --- /dev/null +++ b/src/zencompute/functionrunner.cpp @@ -0,0 +1,112 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "functionrunner.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include +# include + +# include +# include + +namespace zen::compute { + +FunctionRunner::FunctionRunner(std::filesystem::path BasePath) : m_ActionsPath(BasePath / "actions") +{ +} + +FunctionRunner::~FunctionRunner() = default; + +size_t +FunctionRunner::QueryCapacity() +{ + return 1; +} + +std::vector +FunctionRunner::SubmitActions(const std::vector>& Actions) +{ + std::vector Results; + Results.reserve(Actions.size()); + + for (const Ref& Action : Actions) + { + Results.push_back(SubmitAction(Action)); + } + + return Results; +} + +void +FunctionRunner::MaybeDumpAction(int ActionLsn, const CbObject& ActionObject) +{ + if (m_DumpActions) + { + std::string UniqueId = fmt::format("{}.ddb", ActionLsn); + std::filesystem::path Path = m_ActionsPath / UniqueId; + + zen::WriteFile(Path, IoBuffer(ActionObject.GetBuffer().AsIoBuffer())); + } +} + +////////////////////////////////////////////////////////////////////////// + +RunnerAction::RunnerAction(FunctionServiceSession* OwnerSession) : m_OwnerSession(OwnerSession) +{ + this->Timestamps[static_cast(State::New)] = DateTime::Now().GetTicks(); +} + +RunnerAction::~RunnerAction() +{ +} + +void +RunnerAction::SetActionState(State NewState) +{ + ZEN_ASSERT(NewState < State::_Count); + this->Timestamps[static_cast(NewState)] = DateTime::Now().GetTicks(); + + do + { + if (State CurrentState = m_ActionState.load(); CurrentState == NewState) + { + // No state change + return; + } + else + { + if (NewState <= CurrentState) + { + // Cannot transition to an earlier or same state + return; + } + + if (m_ActionState.compare_exchange_strong(CurrentState, NewState)) + { + // Successful state change + + m_OwnerSession->PostUpdate(this); + + return; + } + } + } while (true); +} + +void +RunnerAction::SetResult(CbPackage&& Result) +{ + m_Result = std::move(Result); +} + +CbPackage& +RunnerAction::GetResult() +{ + ZEN_ASSERT(IsCompleted()); + return m_Result; +} + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES \ No newline at end of file diff --git a/src/zencompute/functionrunner.h b/src/zencompute/functionrunner.h new file mode 100644 index 000000000..6fd0d84cc --- /dev/null +++ b/src/zencompute/functionrunner.h @@ -0,0 +1,207 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include + +#if ZEN_WITH_COMPUTE_SERVICES + +# include +# include + +namespace zen::compute { + +struct SubmitResult +{ + bool IsAccepted = false; + std::string Reason; +}; + +/** Base interface for classes implementing a remote execution "runner" + */ +class FunctionRunner : public RefCounted +{ + FunctionRunner(FunctionRunner&&) = delete; + FunctionRunner& operator=(FunctionRunner&&) = delete; + +public: + FunctionRunner(std::filesystem::path BasePath); + virtual ~FunctionRunner() = 0; + + virtual void Shutdown() = 0; + virtual void RegisterWorker(const CbPackage& WorkerPackage) = 0; + + [[nodiscard]] virtual SubmitResult SubmitAction(Ref Action) = 0; + [[nodiscard]] virtual size_t GetSubmittedActionCount() = 0; + [[nodiscard]] virtual bool IsHealthy() = 0; + [[nodiscard]] virtual size_t QueryCapacity(); + [[nodiscard]] virtual std::vector SubmitActions(const std::vector>& Actions); + +protected: + std::filesystem::path m_ActionsPath; + bool m_DumpActions = false; + void MaybeDumpAction(int ActionLsn, const CbObject& ActionObject); +}; + +template +struct RunnerGroup +{ + void AddRunner(RunnerType* Runner) + { + m_RunnersLock.WithExclusiveLock([&] { m_Runners.emplace_back(Runner); }); + } + size_t QueryCapacity() + { + size_t TotalCapacity = 0; + m_RunnersLock.WithSharedLock([&] { + for (const auto& Runner : m_Runners) + { + TotalCapacity += Runner->QueryCapacity(); + } + }); + return TotalCapacity; + } + + SubmitResult SubmitAction(Ref Action) + { + RwLock::SharedLockScope _(m_RunnersLock); + + const int InitialIndex = m_NextSubmitIndex.load(std::memory_order_acquire); + int Index = InitialIndex; + const int RunnerCount = gsl::narrow(m_Runners.size()); + + if (RunnerCount == 0) + { + return {.IsAccepted = false, .Reason = "No runners available"}; + } + + do + { + while (Index >= RunnerCount) + { + Index -= RunnerCount; + } + + auto& Runner = m_Runners[Index++]; + + SubmitResult Result = Runner->SubmitAction(Action); + + if (Result.IsAccepted == true) + { + m_NextSubmitIndex = Index % RunnerCount; + + return Result; + } + + while (Index >= RunnerCount) + { + Index -= RunnerCount; + } + } while (Index != InitialIndex); + + return {.IsAccepted = false}; + } + + size_t GetSubmittedActionCount() + { + RwLock::SharedLockScope _(m_RunnersLock); + + size_t TotalCount = 0; + + for (const auto& Runner : m_Runners) + { + TotalCount += Runner->GetSubmittedActionCount(); + } + + return TotalCount; + } + + void RegisterWorker(CbPackage Worker) + { + RwLock::SharedLockScope _(m_RunnersLock); + + for (auto& Runner : m_Runners) + { + Runner->RegisterWorker(Worker); + } + } + + void Shutdown() + { + RwLock::SharedLockScope _(m_RunnersLock); + + for (auto& Runner : m_Runners) + { + Runner->Shutdown(); + } + } + +private: + RwLock m_RunnersLock; + std::vector> m_Runners; + std::atomic m_NextSubmitIndex{0}; +}; + +/** + * This represents an action going through different stages of scheduling and execution. + */ +struct RunnerAction : public RefCounted +{ + explicit RunnerAction(FunctionServiceSession* OwnerSession); + ~RunnerAction(); + + int ActionLsn = 0; + WorkerDesc Worker; + IoHash ActionId; + CbObject ActionObj; + int Priority = 0; + + enum class State + { + New, + Pending, + Running, + Completed, + Failed, + _Count + }; + + static const char* ToString(State _) + { + switch (_) + { + case State::New: + return "New"; + case State::Pending: + return "Pending"; + case State::Running: + return "Running"; + case State::Completed: + return "Completed"; + case State::Failed: + return "Failed"; + default: + return "Unknown"; + } + } + + uint64_t Timestamps[static_cast(State::_Count)] = {}; + + State ActionState() const { return m_ActionState; } + void SetActionState(State NewState); + + bool IsSuccess() const { return ActionState() == State::Completed; } + bool IsCompleted() const { return ActionState() == State::Completed || ActionState() == State::Failed; } + + void SetResult(CbPackage&& Result); + CbPackage& GetResult(); + +private: + std::atomic m_ActionState = State::New; + FunctionServiceSession* m_OwnerSession = nullptr; + CbPackage m_Result; +}; + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES \ No newline at end of file diff --git a/src/zencompute/functionservice.cpp b/src/zencompute/functionservice.cpp new file mode 100644 index 000000000..0698449e9 --- /dev/null +++ b/src/zencompute/functionservice.cpp @@ -0,0 +1,957 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencompute/functionservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "functionrunner.h" +# include "actionrecorder.h" +# include "localrunner.h" +# include "remotehttprunner.h" + +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include + +# include +# include +# include +# include +# include + +ZEN_THIRD_PARTY_INCLUDES_START +# include +ZEN_THIRD_PARTY_INCLUDES_END + +using namespace std::literals; + +namespace zen::compute { + +////////////////////////////////////////////////////////////////////////// + +struct FunctionServiceSession::Impl +{ + FunctionServiceSession* m_FunctionServiceSession; + ChunkResolver& m_ChunkResolver; + LoggerRef m_Log{logging::Get("apply")}; + + Impl(FunctionServiceSession* InFunctionServiceSession, ChunkResolver& InChunkResolver) + : m_FunctionServiceSession(InFunctionServiceSession) + , m_ChunkResolver(InChunkResolver) + { + m_SchedulingThread = std::thread{&Impl::MonitorThreadFunction, this}; + } + + void Shutdown(); + bool IsHealthy(); + + LoggerRef Log() { return m_Log; } + + std::atomic_bool m_AcceptActions = true; + + struct FunctionDefinition + { + std::string FunctionName; + Guid FunctionVersion; + Guid BuildSystemVersion; + IoHash WorkerId; + }; + + void EmitStats(CbObjectWriter& Cbo) + { + m_WorkerLock.WithSharedLock([&] { Cbo << "worker_count"sv << m_WorkerMap.size(); }); + m_ResultsLock.WithSharedLock([&] { Cbo << "actions_complete"sv << m_ResultsMap.size(); }); + m_PendingLock.WithSharedLock([&] { Cbo << "actions_pending"sv << m_PendingActions.size(); }); + Cbo << "actions_submitted"sv << GetSubmittedActionCount(); + EmitSnapshot("actions_retired"sv, m_ResultRate, Cbo); + } + + void RegisterWorker(CbPackage Worker); + WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId); + + std::atomic m_ActionsCounter = 0; // sequence number + + RwLock m_PendingLock; + std::map> m_PendingActions; + + RwLock m_RunningLock; + std::unordered_map> m_RunningMap; + + RwLock m_ResultsLock; + std::unordered_map> m_ResultsMap; + metrics::Meter m_ResultRate; + std::atomic m_RetiredCount{0}; + + HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage); + HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage); + + std::atomic m_ShutdownRequested{false}; + + std::thread m_SchedulingThread; + std::atomic m_SchedulingThreadEnabled{true}; + Event m_SchedulingThreadEvent; + + void MonitorThreadFunction(); + void SchedulePendingActions(); + + // Workers + + RwLock m_WorkerLock; + std::unordered_map m_WorkerMap; + std::vector m_FunctionList; + std::vector GetKnownWorkerIds(); + + // Runners + + RunnerGroup m_LocalRunnerGroup; + RunnerGroup m_RemoteRunnerGroup; + + EnqueueResult EnqueueAction(CbObject ActionObject, int Priority); + EnqueueResult EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority); + + void GetCompleted(CbWriter& Cbo); + + // Recording + + void StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath); + void StopRecording(); + + std::unique_ptr m_Recorder; + + // History tracking + + RwLock m_ActionHistoryLock; + std::deque m_ActionHistory; + size_t m_HistoryLimit = 1000; + + std::vector GetActionHistory(int Limit); + + // + + [[nodiscard]] size_t QueryCapacity(); + + [[nodiscard]] SubmitResult SubmitAction(Ref Action); + [[nodiscard]] std::vector SubmitActions(const std::vector>& Actions); + [[nodiscard]] size_t GetSubmittedActionCount(); + + // Updates + + RwLock m_UpdatedActionsLock; + std::vector> m_UpdatedActions; + + void HandleActionUpdates(); + void PostUpdate(RunnerAction* Action); + + void ShutdownRunners(); +}; + +bool +FunctionServiceSession::Impl::IsHealthy() +{ + return true; +} + +void +FunctionServiceSession::Impl::Shutdown() +{ + m_AcceptActions = false; + m_ShutdownRequested = true; + + m_SchedulingThreadEnabled = false; + m_SchedulingThreadEvent.Set(); + if (m_SchedulingThread.joinable()) + { + m_SchedulingThread.join(); + } + + ShutdownRunners(); +} + +void +FunctionServiceSession::Impl::ShutdownRunners() +{ + m_LocalRunnerGroup.Shutdown(); + m_RemoteRunnerGroup.Shutdown(); +} + +void +FunctionServiceSession::Impl::StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath) +{ + ZEN_INFO("starting recording to '{}'", RecordingPath); + + m_Recorder = std::make_unique(InCidStore, RecordingPath); + + ZEN_INFO("started recording to '{}'", RecordingPath); +} + +void +FunctionServiceSession::Impl::StopRecording() +{ + ZEN_INFO("stopping recording"); + + m_Recorder = nullptr; + + ZEN_INFO("stopped recording"); +} + +std::vector +FunctionServiceSession::Impl::GetActionHistory(int Limit) +{ + RwLock::SharedLockScope _(m_ActionHistoryLock); + + if (Limit > 0 && static_cast(Limit) < m_ActionHistory.size()) + { + return std::vector(m_ActionHistory.end() - Limit, m_ActionHistory.end()); + } + + return std::vector(m_ActionHistory.begin(), m_ActionHistory.end()); +} + +void +FunctionServiceSession::Impl::RegisterWorker(CbPackage Worker) +{ + RwLock::ExclusiveLockScope _(m_WorkerLock); + + const IoHash& WorkerId = Worker.GetObject().GetHash(); + + if (m_WorkerMap.insert_or_assign(WorkerId, Worker).second) + { + // Note that since the convention currently is that WorkerId is equal to the hash + // of the worker descriptor there is no chance that we get a second write with a + // different descriptor. Thus we only need to call this the first time, when the + // worker is added + + m_LocalRunnerGroup.RegisterWorker(Worker); + m_RemoteRunnerGroup.RegisterWorker(Worker); + + if (m_Recorder) + { + m_Recorder->RegisterWorker(Worker); + } + + CbObject WorkerObj = Worker.GetObject(); + + // Populate worker database + + const Guid WorkerBuildSystemVersion = WorkerObj["buildsystem_version"sv].AsUuid(); + + for (auto& Item : WorkerObj["functions"sv]) + { + CbObjectView Function = Item.AsObjectView(); + + std::string_view FunctionName = Function["name"sv].AsString(); + const Guid FunctionVersion = Function["version"sv].AsUuid(); + + m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName}, + .FunctionVersion = FunctionVersion, + .BuildSystemVersion = WorkerBuildSystemVersion, + .WorkerId = WorkerId}); + } + } +} + +WorkerDesc +FunctionServiceSession::Impl::GetWorkerDescriptor(const IoHash& WorkerId) +{ + RwLock::SharedLockScope _(m_WorkerLock); + + if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end()) + { + const CbPackage& Desc = It->second; + return {Desc, WorkerId}; + } + + return {}; +} + +std::vector +FunctionServiceSession::Impl::GetKnownWorkerIds() +{ + std::vector WorkerIds; + WorkerIds.reserve(m_WorkerMap.size()); + + m_WorkerLock.WithSharedLock([&] { + for (const auto& [WorkerId, _] : m_WorkerMap) + { + WorkerIds.push_back(WorkerId); + } + }); + + return WorkerIds; +} + +FunctionServiceSession::EnqueueResult +FunctionServiceSession::Impl::EnqueueAction(CbObject ActionObject, int Priority) +{ + // Resolve function to worker + + IoHash WorkerId{IoHash::Zero}; + + std::string_view FunctionName = ActionObject["Function"sv].AsString(); + const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid(); + const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid(); + + for (const FunctionDefinition& FuncDef : m_FunctionList) + { + if (FuncDef.FunctionName == FunctionName && FuncDef.FunctionVersion == FunctionVersion && + FuncDef.BuildSystemVersion == BuildSystemVersion) + { + WorkerId = FuncDef.WorkerId; + + break; + } + } + + if (WorkerId == IoHash::Zero) + { + CbObjectWriter Writer; + + Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion; + Writer << "error" + << "no worker matches the action specification"; + + return {0, Writer.Save()}; + } + + if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end()) + { + CbPackage WorkerPackage = It->second; + + return EnqueueResolvedAction(WorkerDesc{WorkerPackage, WorkerId}, ActionObject, Priority); + } + + CbObjectWriter Writer; + + Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion; + Writer << "error" + << "no worker found despite match"; + + return {0, Writer.Save()}; +} + +FunctionServiceSession::EnqueueResult +FunctionServiceSession::Impl::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority) +{ + const int ActionLsn = ++m_ActionsCounter; + + Ref Pending{new RunnerAction(m_FunctionServiceSession)}; + + Pending->ActionLsn = ActionLsn; + Pending->Worker = Worker; + Pending->ActionId = ActionObj.GetHash(); + Pending->ActionObj = ActionObj; + Pending->Priority = RequestPriority; + + SubmitResult SubResult = SubmitAction(Pending); + + if (SubResult.IsAccepted) + { + // Great, the job is being taken care of by the runner + ZEN_DEBUG("direct schedule LSN {}", Pending->ActionLsn); + } + else + { + ZEN_DEBUG("action {} ({}) PENDING", Pending->ActionId, Pending->ActionLsn); + + Pending->SetActionState(RunnerAction::State::Pending); + } + + if (m_Recorder) + { + m_Recorder->RecordAction(Pending); + } + + CbObjectWriter Writer; + Writer << "lsn" << Pending->ActionLsn; + Writer << "worker" << Pending->Worker.WorkerId; + Writer << "action" << Pending->ActionId; + + return {Pending->ActionLsn, Writer.Save()}; +} + +SubmitResult +FunctionServiceSession::Impl::SubmitAction(Ref Action) +{ + // Loosely round-robin scheduling of actions across runners. + // + // It's not entirely clear what this means given that submits + // can come in across multiple threads, but it's probably better + // than always starting with the first runner. + // + // Longer term we should track the state of the individual + // runners and make decisions accordingly. + + SubmitResult Result = m_LocalRunnerGroup.SubmitAction(Action); + if (Result.IsAccepted) + { + return Result; + } + + return m_RemoteRunnerGroup.SubmitAction(Action); +} + +size_t +FunctionServiceSession::Impl::GetSubmittedActionCount() +{ + return m_LocalRunnerGroup.GetSubmittedActionCount() + m_RemoteRunnerGroup.GetSubmittedActionCount(); +} + +HttpResponseCode +FunctionServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResultPackage) +{ + // This lock is held for the duration of the function since we need to + // be sure that the action doesn't change state while we are checking the + // different data structures + + RwLock::ExclusiveLockScope _(m_ResultsLock); + + if (auto It = m_ResultsMap.find(ActionLsn); It != m_ResultsMap.end()) + { + OutResultPackage = std::move(It->second->GetResult()); + + m_ResultsMap.erase(It); + + return HttpResponseCode::OK; + } + + { + RwLock::SharedLockScope __(m_PendingLock); + + if (auto FindIt = m_PendingActions.find(ActionLsn); FindIt != m_PendingActions.end()) + { + return HttpResponseCode::Accepted; + } + } + + // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must + // always be taken after m_ResultsLock if both are needed + + { + RwLock::SharedLockScope __(m_RunningLock); + + if (m_RunningMap.find(ActionLsn) != m_RunningMap.end()) + { + return HttpResponseCode::Accepted; + } + } + + return HttpResponseCode::NotFound; +} + +HttpResponseCode +FunctionServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage) +{ + // This lock is held for the duration of the function since we need to + // be sure that the action doesn't change state while we are checking the + // different data structures + + RwLock::ExclusiveLockScope _(m_ResultsLock); + + for (auto It = begin(m_ResultsMap), End = end(m_ResultsMap); It != End; ++It) + { + if (It->second->ActionId == ActionId) + { + OutResultPackage = std::move(It->second->GetResult()); + + m_ResultsMap.erase(It); + + return HttpResponseCode::OK; + } + } + + { + RwLock::SharedLockScope __(m_PendingLock); + + for (const auto& [K, Pending] : m_PendingActions) + { + if (Pending->ActionId == ActionId) + { + return HttpResponseCode::Accepted; + } + } + } + + // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must + // always be taken after m_ResultsLock if both are needed + + { + RwLock::SharedLockScope __(m_RunningLock); + + for (const auto& [K, v] : m_RunningMap) + { + if (v->ActionId == ActionId) + { + return HttpResponseCode::Accepted; + } + } + } + + return HttpResponseCode::NotFound; +} + +void +FunctionServiceSession::Impl::GetCompleted(CbWriter& Cbo) +{ + Cbo.BeginArray("completed"); + + m_ResultsLock.WithSharedLock([&] { + for (auto& Kv : m_ResultsMap) + { + Cbo << Kv.first; + } + }); + + Cbo.EndArray(); +} + +# define ZEN_BATCH_SCHEDULER 1 + +void +FunctionServiceSession::Impl::SchedulePendingActions() +{ + int ScheduledCount = 0; + size_t RunningCount = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }); + size_t PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }); + size_t ResultCount = m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }); + + static Stopwatch DumpRunningTimer; + + auto _ = MakeGuard([&] { + ZEN_INFO("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results", + ScheduledCount, + RunningCount, + m_RetiredCount.load(), + PendingCount, + ResultCount); + + if (DumpRunningTimer.GetElapsedTimeMs() > 30000) + { + DumpRunningTimer.Reset(); + + std::set RunningList; + m_RunningLock.WithSharedLock([&] { + for (auto& [K, V] : m_RunningMap) + { + RunningList.insert(K); + } + }); + + ExtendableStringBuilder<1024> RunningString; + for (int i : RunningList) + { + if (RunningString.Size()) + { + RunningString << ", "; + } + + RunningString.Append(IntNum(i)); + } + + ZEN_INFO("running: {}", RunningString); + } + }); + +# if ZEN_BATCH_SCHEDULER + size_t Capacity = QueryCapacity(); + + if (!Capacity) + { + _.Dismiss(); + + return; + } + + std::vector> ActionsToSchedule; + + // Pull actions to schedule from the pending queue, we will try to submit these to the runner outside of the lock + + m_PendingLock.WithExclusiveLock([&] { + if (m_ShutdownRequested) + { + return; + } + + if (m_PendingActions.empty()) + { + return; + } + + size_t NumActionsToSchedule = std::min(Capacity, m_PendingActions.size()); + + auto PendingIt = m_PendingActions.begin(); + const auto PendingEnd = m_PendingActions.end(); + + while (NumActionsToSchedule && PendingIt != PendingEnd) + { + const Ref& Pending = PendingIt->second; + + switch (Pending->ActionState()) + { + case RunnerAction::State::Pending: + ActionsToSchedule.push_back(Pending); + break; + + case RunnerAction::State::Running: + case RunnerAction::State::Completed: + case RunnerAction::State::Failed: + break; + + default: + case RunnerAction::State::New: + ZEN_WARN("unexpected state {} for pending action {}", static_cast(Pending->ActionState()), Pending->ActionLsn); + break; + } + + ++PendingIt; + --NumActionsToSchedule; + } + + PendingCount = m_PendingActions.size(); + }); + + if (ActionsToSchedule.empty()) + { + _.Dismiss(); + return; + } + + ZEN_INFO("attempting schedule of {} pending actions", ActionsToSchedule.size()); + + auto SubmitResults = SubmitActions(ActionsToSchedule); + + // Move successfully scheduled actions to the running map and remove + // from pending queue. It's actually possible that by the time we get + // to this stage some of the actions may have already completed, so + // they should not always be added to the running map + + eastl::hash_set ScheduledActions; + + for (size_t i = 0; i < ActionsToSchedule.size(); ++i) + { + const Ref& Pending = ActionsToSchedule[i]; + const SubmitResult& SubResult = SubmitResults[i]; + + if (SubResult.IsAccepted) + { + ScheduledActions.insert(Pending->ActionLsn); + } + } + + ScheduledCount += (int)ActionsToSchedule.size(); + +# else + m_PendingLock.WithExclusiveLock([&] { + while (!m_PendingActions.empty()) + { + if (m_ShutdownRequested) + { + return; + } + + // Here it would be good if we could decide to pop immediately to avoid + // holding the lock while creating processes etc + const Ref& Pending = m_PendingActions.begin()->second; + FunctionRunner::SubmitResult SubResult = SubmitAction(Pending); + + if (SubResult.IsAccepted) + { + // Great, the job is being taken care of by the runner + + ZEN_DEBUG("action {} ({}) PENDING -> RUNNING", Pending->ActionId, Pending->ActionLsn); + + m_RunningLock.WithExclusiveLock([&] { + m_RunningMap.insert({Pending->ActionLsn, Pending}); + + RunningCount = m_RunningMap.size(); + }); + + m_PendingActions.pop_front(); + + PendingCount = m_PendingActions.size(); + ++ScheduledCount; + } + else + { + // Runner could not accept the job, leave it on the pending queue + + return; + } + } + }); +# endif +} + +void +FunctionServiceSession::Impl::MonitorThreadFunction() +{ + SetCurrentThreadName("FunctionServiceSession_Monitor"); + + auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); }); + + do + { + int TimeoutMs = 1000; + + if (m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); })) + { + TimeoutMs = 100; + } + + const bool Timedout = m_SchedulingThreadEvent.Wait(TimeoutMs); + + if (m_SchedulingThreadEnabled == false) + { + return; + } + + HandleActionUpdates(); + + // Schedule pending actions + + SchedulePendingActions(); + + if (!Timedout) + { + m_SchedulingThreadEvent.Reset(); + } + } while (m_SchedulingThreadEnabled); +} + +void +FunctionServiceSession::Impl::PostUpdate(RunnerAction* Action) +{ + m_UpdatedActionsLock.WithExclusiveLock([&] { m_UpdatedActions.emplace_back(Action); }); +} + +void +FunctionServiceSession::Impl::HandleActionUpdates() +{ + std::vector> UpdatedActions; + + m_UpdatedActionsLock.WithExclusiveLock([&] { std::swap(UpdatedActions, m_UpdatedActions); }); + + std::unordered_set SeenLsn; + std::unordered_set RunningLsn; + + for (Ref& Action : UpdatedActions) + { + const int ActionLsn = Action->ActionLsn; + + if (auto [It, Inserted] = SeenLsn.insert(ActionLsn); Inserted) + { + switch (Action->ActionState()) + { + case RunnerAction::State::Pending: + m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); }); + break; + + case RunnerAction::State::Running: + m_PendingLock.WithExclusiveLock([&] { + m_RunningLock.WithExclusiveLock([&] { + m_RunningMap.insert({ActionLsn, Action}); + m_PendingActions.erase(ActionLsn); + }); + }); + ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn); + break; + + case RunnerAction::State::Completed: + case RunnerAction::State::Failed: + m_ResultsLock.WithExclusiveLock([&] { + m_ResultsMap[ActionLsn] = Action; + + m_PendingLock.WithExclusiveLock([&] { + m_RunningLock.WithExclusiveLock([&] { + if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end()) + { + m_PendingActions.erase(ActionLsn); + } + else + { + m_RunningMap.erase(FindIt); + } + }); + }); + + m_ActionHistoryLock.WithExclusiveLock([&] { + ActionHistoryEntry Entry{.Lsn = ActionLsn, + .ActionId = Action->ActionId, + .WorkerId = Action->Worker.WorkerId, + .ActionDescriptor = Action->ActionObj, + .Succeeded = Action->ActionState() == RunnerAction::State::Completed}; + + std::copy(std::begin(Action->Timestamps), std::end(Action->Timestamps), std::begin(Entry.Timestamps)); + + m_ActionHistory.push_back(std::move(Entry)); + + if (m_ActionHistory.size() > m_HistoryLimit) + { + m_ActionHistory.pop_front(); + } + }); + }); + m_RetiredCount.fetch_add(1); + m_ResultRate.Mark(1); + ZEN_DEBUG("action {} ({}) RUNNING -> COMPLETED with {}", + Action->ActionId, + ActionLsn, + Action->ActionState() == RunnerAction::State::Completed ? "SUCCESS" : "FAILURE"); + break; + } + } + } +} + +size_t +FunctionServiceSession::Impl::QueryCapacity() +{ + return m_LocalRunnerGroup.QueryCapacity() + m_RemoteRunnerGroup.QueryCapacity(); +} + +std::vector +FunctionServiceSession::Impl::SubmitActions(const std::vector>& Actions) +{ + std::vector Results; + + for (const Ref& Action : Actions) + { + Results.push_back(SubmitAction(Action)); + } + + return Results; +} + +////////////////////////////////////////////////////////////////////////// + +FunctionServiceSession::FunctionServiceSession(ChunkResolver& InChunkResolver) +{ + m_Impl = std::make_unique(this, InChunkResolver); +} + +FunctionServiceSession::~FunctionServiceSession() +{ + Shutdown(); +} + +bool +FunctionServiceSession::IsHealthy() +{ + return m_Impl->IsHealthy(); +} + +void +FunctionServiceSession::Shutdown() +{ + m_Impl->Shutdown(); +} + +void +FunctionServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath) +{ + m_Impl->StartRecording(InResolver, RecordingPath); +} + +void +FunctionServiceSession::StopRecording() +{ + m_Impl->StopRecording(); +} + +void +FunctionServiceSession::EmitStats(CbObjectWriter& Cbo) +{ + m_Impl->EmitStats(Cbo); +} + +std::vector +FunctionServiceSession::GetKnownWorkerIds() +{ + return m_Impl->GetKnownWorkerIds(); +} + +WorkerDesc +FunctionServiceSession::GetWorkerDescriptor(const IoHash& WorkerId) +{ + return m_Impl->GetWorkerDescriptor(WorkerId); +} + +void +FunctionServiceSession::AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath) +{ + m_Impl->m_LocalRunnerGroup.AddRunner(new LocalProcessRunner(InChunkResolver, BasePath)); +} + +void +FunctionServiceSession::AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName) +{ + m_Impl->m_RemoteRunnerGroup.AddRunner(new RemoteHttpRunner(InChunkResolver, BasePath, HostName)); +} + +FunctionServiceSession::EnqueueResult +FunctionServiceSession::EnqueueAction(CbObject ActionObject, int Priority) +{ + return m_Impl->EnqueueAction(ActionObject, Priority); +} + +FunctionServiceSession::EnqueueResult +FunctionServiceSession::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority) +{ + return m_Impl->EnqueueResolvedAction(Worker, ActionObj, RequestPriority); +} + +void +FunctionServiceSession::RegisterWorker(CbPackage Worker) +{ + m_Impl->RegisterWorker(Worker); +} + +HttpResponseCode +FunctionServiceSession::GetActionResult(int ActionLsn, CbPackage& OutResultPackage) +{ + return m_Impl->GetActionResult(ActionLsn, OutResultPackage); +} + +HttpResponseCode +FunctionServiceSession::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage) +{ + return m_Impl->FindActionResult(ActionId, OutResultPackage); +} + +std::vector +FunctionServiceSession::GetActionHistory(int Limit) +{ + return m_Impl->GetActionHistory(Limit); +} + +void +FunctionServiceSession::GetCompleted(CbWriter& Cbo) +{ + m_Impl->GetCompleted(Cbo); +} + +void +FunctionServiceSession::PostUpdate(RunnerAction* Action) +{ + m_Impl->PostUpdate(Action); +} + +////////////////////////////////////////////////////////////////////////// + +void +function_forcelink() +{ +} + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/httpfunctionservice.cpp b/src/zencompute/httpfunctionservice.cpp new file mode 100644 index 000000000..09a9684a7 --- /dev/null +++ b/src/zencompute/httpfunctionservice.cpp @@ -0,0 +1,709 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencompute/httpfunctionservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "functionrunner.h" + +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include + +# include + +using namespace std::literals; + +namespace zen::compute { + +constinit AsciiSet g_DecimalSet("0123456789"); +auto DecimalMatcher = [](std::string_view Str) { return AsciiSet::HasOnly(Str, g_DecimalSet); }; + +constinit AsciiSet g_HexSet("0123456789abcdefABCDEF"); +auto IoHashMatcher = [](std::string_view Str) { return Str.size() == 40 && AsciiSet::HasOnly(Str, g_HexSet); }; + +HttpFunctionService::HttpFunctionService(CidStore& InCidStore, + IHttpStatsService& StatsService, + [[maybe_unused]] const std::filesystem::path& BaseDir) +: m_CidStore(InCidStore) +, m_StatsService(StatsService) +, m_Log(logging::Get("apply")) +, m_BaseDir(BaseDir) +, m_FunctionService(InCidStore) +{ + m_FunctionService.AddLocalRunner(InCidStore, m_BaseDir / "local"); + + m_StatsService.RegisterHandler("apply", *this); + + m_Router.AddMatcher("lsn", DecimalMatcher); + m_Router.AddMatcher("worker", IoHashMatcher); + m_Router.AddMatcher("action", IoHashMatcher); + + m_Router.RegisterRoute( + "ready", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + if (m_FunctionService.IsHealthy()) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok"); + } + + return HttpReq.WriteResponse(HttpResponseCode::ServiceUnavailable); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "workers", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObjectWriter Cbo; + Cbo.BeginArray("workers"sv); + for (const IoHash& WorkerId : m_FunctionService.GetKnownWorkerIds()) + { + Cbo << WorkerId; + } + Cbo.EndArray(); + + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "workers/{worker}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + if (WorkerDesc Desc = m_FunctionService.GetWorkerDescriptor(WorkerId)) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor.GetObject()); + } + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + + case HttpVerb::kPost: + { + switch (HttpReq.RequestContentType()) + { + case HttpContentType::kCbObject: + { + CbObject WorkerSpec = HttpReq.ReadPayloadObject(); + + // Determine which pieces are missing and need to be transmitted + + HashKeySet ChunkSet; + + WorkerSpec.IterateAttachments([&](CbFieldView Field) { + const IoHash Hash = Field.AsHash(); + ChunkSet.AddHashToSet(Hash); + }); + + CbPackage WorkerPackage; + WorkerPackage.SetObject(WorkerSpec); + + m_CidStore.FilterChunks(ChunkSet); + + if (ChunkSet.IsEmpty()) + { + ZEN_DEBUG("worker {}: all attachments already available", WorkerId); + m_FunctionService.RegisterWorker(WorkerPackage); + return HttpReq.WriteResponse(HttpResponseCode::NoContent); + } + + CbObjectWriter ResponseWriter; + ResponseWriter.BeginArray("need"); + + ChunkSet.IterateHashes([&](const IoHash& Hash) { + ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); + ResponseWriter.AddHash(Hash); + }); + + ResponseWriter.EndArray(); + + ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize()); + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); + } + break; + + case HttpContentType::kCbPackage: + { + CbPackage WorkerSpecPackage = HttpReq.ReadPayloadPackage(); + CbObject WorkerSpec = WorkerSpecPackage.GetObject(); + + std::span Attachments = WorkerSpecPackage.GetAttachments(); + + int AttachmentCount = 0; + int NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + + for (const CbAttachment& Attachment : Attachments) + { + ZEN_ASSERT(Attachment.IsCompressedBinary()); + + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer Buffer = Attachment.AsCompressedBinary(); + + ZEN_UNUSED(DataHash); + TotalAttachmentBytes += Buffer.GetCompressedSize(); + ++AttachmentCount; + + const CidStore::InsertResult InsertResult = + m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash); + + if (InsertResult.New) + { + TotalNewBytes += Buffer.GetCompressedSize(); + ++NewAttachmentCount; + } + } + + ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments", + WorkerId, + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); + + m_FunctionService.RegisterWorker(WorkerSpecPackage); + + return HttpReq.WriteResponse(HttpResponseCode::NoContent); + } + break; + + default: + break; + } + } + break; + + default: + break; + } + }, + HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( + "jobs/completed", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObjectWriter Cbo; + m_FunctionService.GetCompleted(Cbo); + + SystemMetrics Sm = GetSystemMetricsForReporting(); + Cbo.BeginObject("metrics"); + Describe(Sm, Cbo); + Cbo.EndObject(); + + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "jobs/history", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto QueryParams = HttpReq.GetQueryParams(); + + int QueryLimit = 50; + + if (auto LimitParam = QueryParams.GetValue("limit"); LimitParam.empty() == false) + { + QueryLimit = ParseInt(LimitParam).value_or(50); + } + + CbObjectWriter Cbo; + Cbo.BeginArray("history"); + for (const auto& Entry : m_FunctionService.GetActionHistory(QueryLimit)) + { + Cbo.BeginObject(); + Cbo << "lsn"sv << Entry.Lsn; + Cbo << "actionId"sv << Entry.ActionId; + Cbo << "workerId"sv << Entry.WorkerId; + Cbo << "succeeded"sv << Entry.Succeeded; + Cbo << "actionDescriptor"sv << Entry.ActionDescriptor; + + for (const auto& Timestamp : Entry.Timestamps) + { + Cbo.AddInteger( + fmt::format("time_{}"sv, RunnerAction::ToString(static_cast(&Timestamp - Entry.Timestamps))), + Timestamp); + } + Cbo.EndObject(); + } + Cbo.EndArray(); + + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "jobs/{lsn}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const int ActionLsn = std::stoi(std::string{Req.GetCapture(1)}); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + { + CbPackage Output; + HttpResponseCode ResponseCode = m_FunctionService.GetActionResult(ActionLsn, Output); + + if (ResponseCode == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + + return HttpReq.WriteResponse(ResponseCode); + } + break; + + case HttpVerb::kPost: + { + // Add support for cancellation, priority changes + } + break; + + default: + break; + } + }, + HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( + "jobs/{worker}/{action}", // This route is inefficient, and is only here for backwards compatibility. The preferred path is the + // one which uses the scheduled action lsn for lookups + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); + + CbPackage Output; + if (HttpResponseCode ResponseCode = m_FunctionService.FindActionResult(ActionId, /* out */ Output); + ResponseCode != HttpResponseCode::OK) + { + ZEN_TRACE("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode)) + + if (ResponseCode == HttpResponseCode::NotFound) + { + return HttpReq.WriteResponse(ResponseCode); + } + + return HttpReq.WriteResponse(ResponseCode); + } + + ZEN_DEBUG("jobs/{}/{}: OK", Req.GetCapture(1), Req.GetCapture(2)) + + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "jobs/{worker}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + + WorkerDesc Worker = m_FunctionService.GetWorkerDescriptor(WorkerId); + + if (!Worker) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + const auto QueryParams = Req.ServerRequest().GetQueryParams(); + + int RequestPriority = -1; + + if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) + { + RequestPriority = ParseInt(PriorityParam).value_or(-1); + } + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + // TODO: return status of all pending or executing jobs + break; + + case HttpVerb::kPost: + switch (HttpReq.RequestContentType()) + { + case HttpContentType::kCbObject: + { + // This operation takes the proposed job spec and identifies which + // chunks are not present on this server. This list is then returned in + // the "need" list in the response + + IoBuffer Payload = HttpReq.ReadPayload(); + CbObject ActionObj = LoadCompactBinaryObject(Payload); + + std::vector NeedList; + + ActionObj.IterateAttachments([&](CbFieldView Field) { + const IoHash FileHash = Field.AsHash(); + + if (!m_CidStore.ContainsChunk(FileHash)) + { + NeedList.push_back(FileHash); + } + }); + + if (NeedList.empty()) + { + // We already have everything, enqueue the action for execution + + if (FunctionServiceSession::EnqueueResult Result = + m_FunctionService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority)) + { + ZEN_DEBUG("action {} accepted (lsn {})", ActionObj.GetHash(), Result.Lsn); + + HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); + } + + return; + } + + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + + for (const IoHash& Hash : NeedList) + { + Cbo << Hash; + } + + Cbo.EndArray(); + CbObject Response = Cbo.Save(); + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); + } + break; + + case HttpContentType::kCbPackage: + { + CbPackage Action = HttpReq.ReadPayloadPackage(); + CbObject ActionObj = Action.GetObject(); + + std::span Attachments = Action.GetAttachments(); + + int AttachmentCount = 0; + int NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + + for (const CbAttachment& Attachment : Attachments) + { + ZEN_ASSERT(Attachment.IsCompressedBinary()); + + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer DataView = Attachment.AsCompressedBinary(); + + ZEN_UNUSED(DataHash); + + const uint64_t CompressedSize = DataView.GetCompressedSize(); + + TotalAttachmentBytes += CompressedSize; + ++AttachmentCount; + + const CidStore::InsertResult InsertResult = + m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); + + if (InsertResult.New) + { + TotalNewBytes += CompressedSize; + ++NewAttachmentCount; + } + } + + if (FunctionServiceSession::EnqueueResult Result = + m_FunctionService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority)) + { + ZEN_DEBUG("accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)", + ActionObj.GetHash(), + Result.Lsn, + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); + + HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); + } + + return; + } + break; + + default: + break; + } + break; + + default: + break; + } + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "jobs", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto QueryParams = HttpReq.GetQueryParams(); + + int RequestPriority = -1; + + if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) + { + RequestPriority = ParseInt(PriorityParam).value_or(-1); + } + + // Resolve worker + + // + + switch (HttpReq.RequestContentType()) + { + case HttpContentType::kCbObject: + { + // This operation takes the proposed job spec and identifies which + // chunks are not present on this server. This list is then returned in + // the "need" list in the response + + IoBuffer Payload = HttpReq.ReadPayload(); + CbObject ActionObj = LoadCompactBinaryObject(Payload); + + std::vector NeedList; + + ActionObj.IterateAttachments([&](CbFieldView Field) { + const IoHash FileHash = Field.AsHash(); + + if (!m_CidStore.ContainsChunk(FileHash)) + { + NeedList.push_back(FileHash); + } + }); + + if (NeedList.empty()) + { + // We already have everything, enqueue the action for execution + + if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueAction(ActionObj, RequestPriority)) + { + ZEN_DEBUG("action accepted (lsn {})", Result.Lsn); + + return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); + } + else + { + // Could not resolve? + return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); + } + } + + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + + for (const IoHash& Hash : NeedList) + { + Cbo << Hash; + } + + Cbo.EndArray(); + CbObject Response = Cbo.Save(); + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); + } + + case HttpContentType::kCbPackage: + { + CbPackage Action = HttpReq.ReadPayloadPackage(); + CbObject ActionObj = Action.GetObject(); + + std::span Attachments = Action.GetAttachments(); + + int AttachmentCount = 0; + int NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + + for (const CbAttachment& Attachment : Attachments) + { + ZEN_ASSERT(Attachment.IsCompressedBinary()); + + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer DataView = Attachment.AsCompressedBinary(); + + ZEN_UNUSED(DataHash); + + const uint64_t CompressedSize = DataView.GetCompressedSize(); + + TotalAttachmentBytes += CompressedSize; + ++AttachmentCount; + + const CidStore::InsertResult InsertResult = + m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); + + if (InsertResult.New) + { + TotalNewBytes += CompressedSize; + ++NewAttachmentCount; + } + } + + if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueAction(ActionObj, RequestPriority)) + { + ZEN_DEBUG("accepted action (lsn {}): {} in {} attachments. {} new ({} attachments)", + Result.Lsn, + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); + + HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); + } + else + { + // Could not resolve? + return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); + } + } + return; + } + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "workers/all", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + std::vector WorkerIds = m_FunctionService.GetKnownWorkerIds(); + + CbObjectWriter Cbo; + Cbo.BeginArray("workers"); + + for (const IoHash& WorkerId : WorkerIds) + { + Cbo.BeginObject(); + + Cbo << "id" << WorkerId; + + const auto& Descriptor = m_FunctionService.GetWorkerDescriptor(WorkerId); + + Cbo << "descriptor" << Descriptor.Descriptor.GetObject(); + + Cbo.EndObject(); + } + + Cbo.EndArray(); + + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "sysinfo", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + SystemMetrics Sm = GetSystemMetricsForReporting(); + + CbObjectWriter Cbo; + Describe(Sm, Cbo); + + Cbo << "cpu_usage" << Sm.CpuUsagePercent; + Cbo << "memory_total" << Sm.SystemMemoryMiB * 1024 * 1024; + Cbo << "memory_used" << (Sm.SystemMemoryMiB - Sm.AvailSystemMemoryMiB) * 1024 * 1024; + Cbo << "disk_used" << 100 * 1024; + Cbo << "disk_total" << 100 * 1024 * 1024; + + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "record/start", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + m_FunctionService.StartRecording(m_CidStore, m_BaseDir / "recording"); + + return HttpReq.WriteResponse(HttpResponseCode::OK); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "record/stop", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + m_FunctionService.StopRecording(); + + return HttpReq.WriteResponse(HttpResponseCode::OK); + }, + HttpVerb::kPost); +} + +HttpFunctionService::~HttpFunctionService() +{ + m_StatsService.UnregisterHandler("apply", *this); +} + +void +HttpFunctionService::Shutdown() +{ + m_FunctionService.Shutdown(); +} + +const char* +HttpFunctionService::BaseUri() const +{ + return "/apply/"; +} + +void +HttpFunctionService::HandleRequest(HttpServerRequest& Request) +{ + metrics::OperationTiming::Scope $(m_HttpRequests); + + if (m_Router.HandleRequest(Request) == false) + { + ZEN_WARN("No route found for {0}", Request.RelativeUri()); + } +} + +void +HttpFunctionService::HandleStatsRequest(HttpServerRequest& Request) +{ + CbObjectWriter Cbo; + m_FunctionService.EmitStats(Cbo); + + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +////////////////////////////////////////////////////////////////////////// + +void +httpfunction_forcelink() +{ +} + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/httporchestrator.cpp b/src/zencompute/httporchestrator.cpp new file mode 100644 index 000000000..39e7e60d7 --- /dev/null +++ b/src/zencompute/httporchestrator.cpp @@ -0,0 +1,81 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencompute/httporchestrator.h" + +#include +#include + +namespace zen::compute { + +HttpOrchestratorService::HttpOrchestratorService() : m_Log(logging::Get("orch")) +{ + m_Router.RegisterRoute( + "provision", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObjectWriter Cbo; + Cbo.BeginArray("workers"); + + m_KnownWorkersLock.WithSharedLock([&] { + for (const auto& [WorkerId, Worker] : m_KnownWorkers) + { + Cbo.BeginObject(); + Cbo << "uri" << Worker.BaseUri; + Cbo << "dt" << Worker.LastSeen.GetElapsedTimeMs(); + Cbo.EndObject(); + } + }); + + Cbo.EndArray(); + + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "announce", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObject Data = HttpReq.ReadPayloadObject(); + + std::string_view WorkerId = Data["id"].AsString(""); + std::string_view WorkerUri = Data["uri"].AsString(""); + + if (WorkerId.empty() || WorkerUri.empty()) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + m_KnownWorkersLock.WithExclusiveLock([&] { + auto& Worker = m_KnownWorkers[std::string(WorkerId)]; + Worker.BaseUri = WorkerUri; + Worker.LastSeen.Reset(); + }); + + HttpReq.WriteResponse(HttpResponseCode::OK); + }, + HttpVerb::kPost); +} + +HttpOrchestratorService::~HttpOrchestratorService() +{ +} + +const char* +HttpOrchestratorService::BaseUri() const +{ + return "/orch/"; +} + +void +HttpOrchestratorService::HandleRequest(HttpServerRequest& Request) +{ + if (m_Router.HandleRequest(Request) == false) + { + ZEN_WARN("No route found for {0}", Request.RelativeUri()); + } +} + +} // namespace zen::compute diff --git a/src/zencompute/include/zencompute/functionservice.h b/src/zencompute/include/zencompute/functionservice.h new file mode 100644 index 000000000..1deb99fd5 --- /dev/null +++ b/src/zencompute/include/zencompute/functionservice.h @@ -0,0 +1,132 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include + +#if !defined(ZEN_WITH_COMPUTE_SERVICES) +# define ZEN_WITH_COMPUTE_SERVICES 1 +#endif + +#if ZEN_WITH_COMPUTE_SERVICES + +# include +# include +# include +# include +# include + +# include + +namespace zen { +class ChunkResolver; +class CbObjectWriter; +} // namespace zen + +namespace zen::compute { + +class ActionRecorder; +class FunctionServiceSession; +class IActionResultHandler; +class LocalProcessRunner; +class RemoteHttpRunner; +struct RunnerAction; +struct SubmitResult; + +struct WorkerDesc +{ + CbPackage Descriptor; + IoHash WorkerId{IoHash::Zero}; + + inline operator bool() const { return WorkerId != IoHash::Zero; } +}; + +/** + * Lambda style compute function service + * + * The responsibility of this class is to accept function execution requests, and + * schedule them using one or more FunctionRunner instances. It will basically always + * accept requests, queueing them if necessary, and then hand them off to runners + * as they become available. + * + * This is typically fronted by an API service that handles communication with clients. + */ +class FunctionServiceSession final +{ +public: + FunctionServiceSession(ChunkResolver& InChunkResolver); + ~FunctionServiceSession(); + + void Shutdown(); + bool IsHealthy(); + + // Worker registration and discovery + + void RegisterWorker(CbPackage Worker); + [[nodiscard]] WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId); + [[nodiscard]] std::vector GetKnownWorkerIds(); + + // Action runners + + void AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath); + void AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName); + + // Action submission + + struct EnqueueResult + { + int Lsn; + CbObject ResponseMessage; + + inline operator bool() const { return Lsn != 0; } + }; + + [[nodiscard]] EnqueueResult EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int Priority); + [[nodiscard]] EnqueueResult EnqueueAction(CbObject ActionObject, int Priority); + + // Completed action tracking + + [[nodiscard]] HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage); + [[nodiscard]] HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage); + + void GetCompleted(CbWriter&); + + // Action history tracking (note that this is separate from completed action tracking, and + // will include actions which have been retired and no longer have their results available) + + struct ActionHistoryEntry + { + int Lsn; + IoHash ActionId; + IoHash WorkerId; + CbObject ActionDescriptor; + bool Succeeded; + uint64_t Timestamps[5] = {}; + }; + + [[nodiscard]] std::vector GetActionHistory(int Limit = 100); + + // Stats reporting + + void EmitStats(CbObjectWriter& Cbo); + + // Recording + + void StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath); + void StopRecording(); + +private: + void PostUpdate(RunnerAction* Action); + + friend class FunctionRunner; + friend struct RunnerAction; + + struct Impl; + std::unique_ptr m_Impl; +}; + +void function_forcelink(); + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/include/zencompute/httpfunctionservice.h b/src/zencompute/include/zencompute/httpfunctionservice.h new file mode 100644 index 000000000..6e2344ae6 --- /dev/null +++ b/src/zencompute/include/zencompute/httpfunctionservice.h @@ -0,0 +1,73 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include + +#if !defined(ZEN_WITH_COMPUTE_SERVICES) +# define ZEN_WITH_COMPUTE_SERVICES 1 +#endif + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "zencompute/functionservice.h" + +# include +# include +# include +# include +# include +# include + +# include +# include +# include + +namespace zen { +class CidStore; +} + +namespace zen::compute { + +class HttpFunctionService; +class FunctionService; + +/** + * HTTP interface for compute function service + */ +class HttpFunctionService : public HttpService, public IHttpStatsProvider +{ +public: + HttpFunctionService(CidStore& InCidStore, IHttpStatsService& StatsService, const std::filesystem::path& BaseDir); + ~HttpFunctionService(); + + void Shutdown(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& Request) override; + + // IHttpStatsProvider + + virtual void HandleStatsRequest(HttpServerRequest& Request) override; + +protected: + CidStore& m_CidStore; + IHttpStatsService& m_StatsService; + LoggerRef Log() { return m_Log; } + +private: + LoggerRef m_Log; + std::filesystem ::path m_BaseDir; + HttpRequestRouter m_Router; + FunctionServiceSession m_FunctionService; + + // Metrics + + metrics::OperationTiming m_HttpRequests; +}; + +void httpfunction_forcelink(); + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/include/zencompute/httporchestrator.h b/src/zencompute/include/zencompute/httporchestrator.h new file mode 100644 index 000000000..168c6d7fe --- /dev/null +++ b/src/zencompute/include/zencompute/httporchestrator.h @@ -0,0 +1,44 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include +#include +#include +#include + +#include + +namespace zen::compute { + +/** + * Mock orchestrator service, for testing dynamic provisioning + */ + +class HttpOrchestratorService : public HttpService +{ +public: + HttpOrchestratorService(); + ~HttpOrchestratorService(); + + HttpOrchestratorService(const HttpOrchestratorService&) = delete; + HttpOrchestratorService& operator=(const HttpOrchestratorService&) = delete; + + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& Request) override; + +private: + HttpRequestRouter m_Router; + LoggerRef m_Log; + + struct KnownWorker + { + std::string_view BaseUri; + Stopwatch LastSeen; + }; + + RwLock m_KnownWorkersLock; + std::unordered_map m_KnownWorkers; +}; + +} // namespace zen::compute diff --git a/src/zencompute/include/zencompute/recordingreader.h b/src/zencompute/include/zencompute/recordingreader.h new file mode 100644 index 000000000..bf1aff125 --- /dev/null +++ b/src/zencompute/include/zencompute/recordingreader.h @@ -0,0 +1,127 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace zen { +class CbObject; +class CbPackage; +struct IoHash; +} // namespace zen + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen::compute { + +////////////////////////////////////////////////////////////////////////// + +class RecordingReaderBase +{ + RecordingReaderBase(const RecordingReaderBase&) = delete; + RecordingReaderBase& operator=(const RecordingReaderBase&) = delete; + +public: + RecordingReaderBase() = default; + virtual ~RecordingReaderBase() = 0; + virtual std::unordered_map ReadWorkers() = 0; + virtual void IterateActions(std::function&& Callback, int TargetParallelism) = 0; + virtual size_t GetActionCount() const = 0; +}; + +////////////////////////////////////////////////////////////////////////// + +/** + * Reader for recordings done via the zencompute recording system, which + * have a shared chunk store and a log of actions with pointers into the + * chunk store for their data. + */ +class RecordingReader : public RecordingReaderBase, public ChunkResolver +{ +public: + explicit RecordingReader(const std::filesystem::path& RecordingPath); + ~RecordingReader(); + + virtual std::unordered_map ReadWorkers() override; + + virtual void IterateActions(std::function&& Callback, + int TargetParallelism) override; + virtual size_t GetActionCount() const override; + +private: + std::filesystem::path m_RecordingLogDir; + BasicFile m_WorkerDataFile; + BasicFile m_ActionDataFile; + GcManager m_Gc; + CidStore m_CidStore{m_Gc}; + + // ChunkResolver interface + virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + + struct ActionEntry + { + IoHash ActionId; + uint64_t Offset; + uint64_t Size; + }; + + std::vector m_Actions; + + void ScanActions(); +}; + +////////////////////////////////////////////////////////////////////////// + +struct LocalResolver : public ChunkResolver +{ + LocalResolver(const LocalResolver&) = delete; + LocalResolver& operator=(const LocalResolver&) = delete; + + LocalResolver() = default; + ~LocalResolver() = default; + + virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + void Add(const IoHash& Cid, IoBuffer Data); + +private: + RwLock MapLock; + std::unordered_map Attachments; +}; + +/** + * This is a reader for UE/DDB recordings, which have a different layout on + * disk (no shared chunk store) + */ +class UeRecordingReader : public RecordingReaderBase, public ChunkResolver +{ +public: + explicit UeRecordingReader(const std::filesystem::path& RecordingPath); + ~UeRecordingReader(); + + virtual std::unordered_map ReadWorkers() override; + virtual void IterateActions(std::function&& Callback, + int TargetParallelism) override; + virtual size_t GetActionCount() const override; + virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + +private: + std::filesystem::path m_RecordingDir; + LocalResolver m_LocalResolver; + std::vector m_WorkDirs; + + CbPackage ReadAction(std::filesystem::path WorkDir); +}; + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/include/zencompute/zencompute.h b/src/zencompute/include/zencompute/zencompute.h new file mode 100644 index 000000000..6dc32eeea --- /dev/null +++ b/src/zencompute/include/zencompute/zencompute.h @@ -0,0 +1,11 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include + +namespace zen { + +void zencompute_forcelinktests(); + +} diff --git a/src/zencompute/localrunner.cpp b/src/zencompute/localrunner.cpp new file mode 100644 index 000000000..9a27f3f3d --- /dev/null +++ b/src/zencompute/localrunner.cpp @@ -0,0 +1,722 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "localrunner.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include + +# include + +namespace zen::compute { + +using namespace std::literals; + +LocalProcessRunner::LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir) +: FunctionRunner(BaseDir) +, m_Log(logging::Get("local_exec")) +, m_ChunkResolver(Resolver) +, m_WorkerPath(std::filesystem::weakly_canonical(BaseDir / "workers")) +, m_SandboxPath(std::filesystem::weakly_canonical(BaseDir / "scratch")) +{ + SystemMetrics Sm = GetSystemMetricsForReporting(); + + m_MaxRunningActions = Sm.LogicalProcessorCount * 2; + + ZEN_INFO("Max concurrent action count: {}", m_MaxRunningActions); + + bool DidCleanup = false; + + if (std::filesystem::is_directory(m_ActionsPath)) + { + ZEN_INFO("Cleaning '{}'", m_ActionsPath); + + std::error_code Ec; + CleanDirectory(m_ActionsPath, /* ForceRemoveReadOnlyFiles */ true, Ec); + + if (Ec) + { + ZEN_WARN("Unable to clean '{}': {}", m_ActionsPath, Ec.message()); + } + + DidCleanup = true; + } + + if (std::filesystem::is_directory(m_SandboxPath)) + { + ZEN_INFO("Cleaning '{}'", m_SandboxPath); + std::error_code Ec; + CleanDirectory(m_SandboxPath, /* ForceRemoveReadOnlyFiles */ true, Ec); + + if (Ec) + { + ZEN_WARN("Unable to clean '{}': {}", m_SandboxPath, Ec.message()); + } + + DidCleanup = true; + } + + // We clean out all workers on startup since we can't know they are good. They could be bad + // due to tampering, malware (which I also mean to include AV and antimalware software) or + // other processes we have no control over + if (std::filesystem::is_directory(m_WorkerPath)) + { + ZEN_INFO("Cleaning '{}'", m_WorkerPath); + std::error_code Ec; + CleanDirectory(m_WorkerPath, /* ForceRemoveReadOnlyFiles */ true, Ec); + + if (Ec) + { + ZEN_WARN("Unable to clean '{}': {}", m_WorkerPath, Ec.message()); + } + + DidCleanup = true; + } + + if (DidCleanup) + { + ZEN_INFO("Cleanup complete"); + } + + m_MonitorThread = std::thread{&LocalProcessRunner::MonitorThreadFunction, this}; + +# if ZEN_PLATFORM_WINDOWS + // Suppress any error dialogs caused by missing dependencies + UINT OldMode = ::SetErrorMode(0); + ::SetErrorMode(OldMode | SEM_FAILCRITICALERRORS); +# endif + + m_AcceptNewActions = true; +} + +LocalProcessRunner::~LocalProcessRunner() +{ + try + { + Shutdown(); + } + catch (std::exception& Ex) + { + ZEN_WARN("exception during local process runner shutdown: {}", Ex.what()); + } +} + +void +LocalProcessRunner::Shutdown() +{ + m_AcceptNewActions = false; + + m_MonitorThreadEnabled = false; + m_MonitorThreadEvent.Set(); + if (m_MonitorThread.joinable()) + { + m_MonitorThread.join(); + } + + CancelRunningActions(); +} + +std::filesystem::path +LocalProcessRunner::CreateNewSandbox() +{ + std::string UniqueId = std::to_string(++m_SandboxCounter); + std::filesystem::path Path = m_SandboxPath / UniqueId; + zen::CreateDirectories(Path); + + return Path; +} + +void +LocalProcessRunner::RegisterWorker(const CbPackage& WorkerPackage) +{ + if (m_DumpActions) + { + CbObject WorkerDescriptor = WorkerPackage.GetObject(); + const IoHash& WorkerId = WorkerPackage.GetObjectHash(); + + std::string UniqueId = fmt::format("worker_{}"sv, WorkerId); + std::filesystem::path Path = m_ActionsPath / UniqueId; + + zen::WriteFile(Path / "worker.ucb", WorkerDescriptor.GetBuffer().AsIoBuffer()); + + ManifestWorker(WorkerPackage, Path / "tree", [&](const IoHash& Cid, CompressedBuffer& ChunkBuffer) { + std::filesystem::path ChunkPath = Path / "chunks" / Cid.ToHexString(); + zen::WriteFile(ChunkPath, ChunkBuffer.GetCompressed()); + }); + + ZEN_INFO("dumped worker '{}' to 'file://{}'", WorkerId, Path); + } +} + +size_t +LocalProcessRunner::QueryCapacity() +{ + // Estimate how much more work we're ready to accept + + RwLock::SharedLockScope _{m_RunningLock}; + + if (!m_AcceptNewActions) + { + return 0; + } + + size_t RunningCount = m_RunningMap.size(); + + if (RunningCount >= size_t(m_MaxRunningActions)) + { + return 0; + } + + return m_MaxRunningActions - RunningCount; +} + +std::vector +LocalProcessRunner::SubmitActions(const std::vector>& Actions) +{ + std::vector Results; + + for (const Ref& Action : Actions) + { + Results.push_back(SubmitAction(Action)); + } + + return Results; +} + +SubmitResult +LocalProcessRunner::SubmitAction(Ref Action) +{ + // Verify whether we can accept more work + + { + RwLock::SharedLockScope _{m_RunningLock}; + + if (!m_AcceptNewActions) + { + return SubmitResult{.IsAccepted = false}; + } + + if (m_RunningMap.size() >= size_t(m_MaxRunningActions)) + { + return SubmitResult{.IsAccepted = false}; + } + } + + using namespace std::literals; + + // Each enqueued action is assigned an integer index (logical sequence number), + // which we use as a key for tracking data structures and as an opaque id which + // may be used by clients to reference the scheduled action + + const int32_t ActionLsn = Action->ActionLsn; + const CbObject& ActionObj = Action->ActionObj; + const IoHash ActionId = ActionObj.GetHash(); + + MaybeDumpAction(ActionLsn, ActionObj); + + std::filesystem::path SandboxPath = CreateNewSandbox(); + + CbPackage WorkerPackage = Action->Worker.Descriptor; + + std::filesystem::path WorkerPath = ManifestWorker(Action->Worker); + + // Write out action + + zen::WriteFile(SandboxPath / "build.action", ActionObj.GetBuffer().AsIoBuffer()); + + // Manifest inputs in sandbox + + ActionObj.IterateAttachments([&](CbFieldView Field) { + const IoHash Cid = Field.AsHash(); + std::filesystem::path FilePath{SandboxPath / "Inputs"sv / Cid.ToHexString()}; + IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(Cid); + + if (!DataBuffer) + { + throw std::runtime_error(fmt::format("input CID chunk '{}' missing", Cid)); + } + + zen::WriteFile(FilePath, DataBuffer); + }); + +# if ZEN_PLATFORM_WINDOWS + // Set up environment variables + + StringBuilder<1024> EnvironmentBlock; + + CbObject WorkerDescription = WorkerPackage.GetObject(); + + for (auto& It : WorkerDescription["environment"sv]) + { + EnvironmentBlock.Append(It.AsString()); + EnvironmentBlock.Append('\0'); + } + EnvironmentBlock.Append('\0'); + EnvironmentBlock.Append('\0'); + + // Execute process - this spawns the child process immediately without waiting + // for completion + + std::string_view ExecPath = WorkerDescription["path"sv].AsString(); + std::filesystem::path ExePath = WorkerPath / std::filesystem::path(ExecPath).make_preferred(); + + ExtendableWideStringBuilder<512> CommandLine; + CommandLine.Append(L'"'); + CommandLine.Append(ExePath.c_str()); + CommandLine.Append(L'"'); + CommandLine.Append(L" -Build=build.action"); + + LPSECURITY_ATTRIBUTES lpProcessAttributes = nullptr; + LPSECURITY_ATTRIBUTES lpThreadAttributes = nullptr; + BOOL bInheritHandles = FALSE; + DWORD dwCreationFlags = 0; + + STARTUPINFO StartupInfo{}; + StartupInfo.cb = sizeof StartupInfo; + + PROCESS_INFORMATION ProcessInformation{}; + + ZEN_DEBUG("Executing: {}", WideToUtf8(CommandLine.c_str())); + + CommandLine.EnsureNulTerminated(); + + BOOL Success = CreateProcessW(nullptr, + CommandLine.Data(), + lpProcessAttributes, + lpThreadAttributes, + bInheritHandles, + dwCreationFlags, + (LPVOID)EnvironmentBlock.Data(), // Environment block + SandboxPath.c_str(), // Current directory + &StartupInfo, + /* out */ &ProcessInformation); + + if (!Success) + { + // TODO: this is probably not the best way to report failure. The return + // object should include a failure state and context + + zen::ThrowLastError("Unable to launch process" /* TODO: Add context */); + } + + CloseHandle(ProcessInformation.hThread); + + Ref NewAction{new RunningAction()}; + NewAction->Action = Action; + NewAction->ProcessHandle = ProcessInformation.hProcess; + NewAction->SandboxPath = std::move(SandboxPath); + + { + RwLock::ExclusiveLockScope _(m_RunningLock); + + m_RunningMap[ActionLsn] = std::move(NewAction); + } + + Action->SetActionState(RunnerAction::State::Running); +# else + ZEN_UNUSED(ActionId); + + ZEN_NOT_IMPLEMENTED(); + + int ExitCode = 0; +# endif + + return SubmitResult{.IsAccepted = true}; +} + +size_t +LocalProcessRunner::GetSubmittedActionCount() +{ + RwLock::SharedLockScope _(m_RunningLock); + return m_RunningMap.size(); +} + +std::filesystem::path +LocalProcessRunner::ManifestWorker(const WorkerDesc& Worker) +{ + RwLock::SharedLockScope _(m_WorkerLock); + + std::filesystem::path WorkerDir = m_WorkerPath / fmt::format("runner_{}", Worker.WorkerId); + + if (!std::filesystem::exists(WorkerDir)) + { + _.ReleaseNow(); + + RwLock::ExclusiveLockScope $(m_WorkerLock); + + if (!std::filesystem::exists(WorkerDir)) + { + ManifestWorker(Worker.Descriptor, WorkerDir, [](const IoHash&, CompressedBuffer&) {}); + } + } + + return WorkerDir; +} + +void +LocalProcessRunner::DecompressAttachmentToFile(const CbPackage& FromPackage, + CbObjectView FileEntry, + const std::filesystem::path& SandboxRootPath, + std::function& ChunkReferenceCallback) +{ + std::string_view Name = FileEntry["name"sv].AsString(); + const IoHash ChunkHash = FileEntry["hash"sv].AsHash(); + const uint64_t Size = FileEntry["size"sv].AsUInt64(); + + CompressedBuffer Compressed; + + if (const CbAttachment* Attachment = FromPackage.FindAttachment(ChunkHash)) + { + Compressed = Attachment->AsCompressedBinary(); + } + else + { + IoBuffer DataBuffer = m_ChunkResolver.FindChunkByCid(ChunkHash); + + if (!DataBuffer) + { + throw std::runtime_error(fmt::format("worker chunk '{}' missing", ChunkHash)); + } + + uint64_t DataRawSize = 0; + IoHash DataRawHash; + Compressed = CompressedBuffer::FromCompressed(SharedBuffer{DataBuffer}, DataRawHash, DataRawSize); + + if (DataRawSize != Size) + { + throw std::runtime_error( + fmt::format("worker chunk '{}' size: {}, action spec expected {}", ChunkHash, DataBuffer.Size(), Size)); + } + } + + ChunkReferenceCallback(ChunkHash, Compressed); + + std::filesystem::path FilePath{SandboxRootPath / std::filesystem::path(Name).make_preferred()}; + + SharedBuffer Decompressed = Compressed.Decompress(); + zen::WriteFile(FilePath, Decompressed.AsIoBuffer()); +} + +void +LocalProcessRunner::ManifestWorker(const CbPackage& WorkerPackage, + const std::filesystem::path& SandboxPath, + std::function&& ChunkReferenceCallback) +{ + CbObject WorkerDescription = WorkerPackage.GetObject(); + + // Manifest worker in Sandbox + + for (auto& It : WorkerDescription["executables"sv]) + { + DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback); + } + + for (auto& It : WorkerDescription["dirs"sv]) + { + std::string_view Name = It.AsString(); + std::filesystem::path DirPath{SandboxPath / std::filesystem::path(Name).make_preferred()}; + zen::CreateDirectories(DirPath); + } + + for (auto& It : WorkerDescription["files"sv]) + { + DecompressAttachmentToFile(WorkerPackage, It.AsObjectView(), SandboxPath, ChunkReferenceCallback); + } + + WriteFile(SandboxPath / "worker.zcb", WorkerDescription.GetBuffer().AsIoBuffer()); +} + +CbPackage +LocalProcessRunner::GatherActionOutputs(std::filesystem::path SandboxPath) +{ + std::filesystem::path OutputFile = SandboxPath / "build.output"; + FileContents OutputData = zen::ReadFile(OutputFile); + + if (OutputData.ErrorCode) + { + throw std::system_error(OutputData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputFile)); + } + + CbPackage OutputPackage; + CbObject Output = zen::LoadCompactBinaryObject(OutputData.Flatten()); + + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalRawAttachmentBytes = 0; + + Output.IterateAttachments([&](CbFieldView Field) { + IoHash Hash = Field.AsHash(); + std::filesystem::path OutputPath{SandboxPath / "Outputs" / Hash.ToHexString()}; + FileContents ChunkData = zen::ReadFile(OutputPath); + + if (ChunkData.ErrorCode) + { + throw std::system_error(ChunkData.ErrorCode, fmt::format("Failed to read build output file '{}'", OutputPath)); + } + + uint64_t ChunkDataRawSize = 0; + IoHash ChunkDataHash; + CompressedBuffer AttachmentBuffer = + CompressedBuffer::FromCompressed(SharedBuffer(ChunkData.Flatten()), ChunkDataHash, ChunkDataRawSize); + + if (!AttachmentBuffer) + { + throw std::runtime_error("Invalid output encountered (not valid CompressedBuffer format)"); + } + + TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); + TotalRawAttachmentBytes += ChunkDataRawSize; + + CbAttachment Attachment(std::move(AttachmentBuffer), ChunkDataHash); + OutputPackage.AddAttachment(Attachment); + }); + + OutputPackage.SetObject(Output); + + ZEN_DEBUG("Action completed with {} attachments ({} compressed, {} uncompressed)", + OutputPackage.GetAttachments().size(), + NiceBytes(TotalAttachmentBytes), + NiceBytes(TotalRawAttachmentBytes)); + + return OutputPackage; +} + +void +LocalProcessRunner::MonitorThreadFunction() +{ + SetCurrentThreadName("LocalProcessRunner_Monitor"); + + auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); }); + + do + { + // On Windows it's possible to wait on process handles, so we wait for either a process to exit + // or for the monitor event to be signaled (which indicates we should check for cancellation + // or shutdown). This could be further improved by using a completion port and registering process + // handles with it, but this is a reasonable first implementation given that we shouldn't be dealing + // with an enormous number of concurrent processes. + // + // On other platforms we just wait on the monitor event and poll for process exits at intervals. +# if ZEN_PLATFORM_WINDOWS + auto WaitOnce = [&] { + HANDLE WaitHandles[MAXIMUM_WAIT_OBJECTS]; + + uint32_t NumHandles = 0; + + WaitHandles[NumHandles++] = m_MonitorThreadEvent.GetWindowsHandle(); + + m_RunningLock.WithSharedLock([&] { + for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd && NumHandles < MAXIMUM_WAIT_OBJECTS; ++It) + { + Ref Action = It->second; + + WaitHandles[NumHandles++] = Action->ProcessHandle; + } + }); + + DWORD WaitResult = WaitForMultipleObjects(NumHandles, WaitHandles, FALSE, 1000); + + // return true if a handle was signaled + return (WaitResult <= NumHandles); + }; +# else + auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(1000); }; +# endif + + while (!WaitOnce()) + { + if (m_MonitorThreadEnabled == false) + { + return; + } + + SweepRunningActions(); + } + + // Signal received + + SweepRunningActions(); + } while (m_MonitorThreadEnabled); +} + +void +LocalProcessRunner::CancelRunningActions() +{ + Stopwatch Timer; + std::unordered_map> RunningMap; + + m_RunningLock.WithExclusiveLock([&] { std::swap(RunningMap, m_RunningMap); }); + + if (RunningMap.empty()) + { + return; + } + + ZEN_INFO("cancelling all running actions"); + + // For expedience we initiate the process termination for all known + // processes before attempting to wait for them to exit. + + std::vector TerminatedLsnList; + + for (const auto& Kv : RunningMap) + { + Ref Action = Kv.second; + + // Terminate running process + +# if ZEN_PLATFORM_WINDOWS + BOOL Success = TerminateProcess(Action->ProcessHandle, 222); + + if (Success) + { + TerminatedLsnList.push_back(Kv.first); + } + else + { + DWORD LastError = GetLastError(); + + if (LastError != ERROR_ACCESS_DENIED) + { + ZEN_WARN("TerminateProcess for LSN {} not successful: {}", Action->Action->ActionLsn, GetSystemErrorAsString(LastError)); + } + } +# else + ZEN_NOT_IMPLEMENTED("need to implement process termination"); +# endif + } + + // We only post results for processes we have terminated, in order + // to avoid multiple results getting posted for the same action + + for (int Lsn : TerminatedLsnList) + { + if (auto It = RunningMap.find(Lsn); It != RunningMap.end()) + { + Ref Running = It->second; + +# if ZEN_PLATFORM_WINDOWS + if (Running->ProcessHandle != INVALID_HANDLE_VALUE) + { + DWORD WaitResult = WaitForSingleObject(Running->ProcessHandle, 2000); + + if (WaitResult != WAIT_OBJECT_0) + { + ZEN_WARN("wait for LSN {}: process exit did not succeed, result = {}", Running->Action->ActionLsn, WaitResult); + } + else + { + ZEN_DEBUG("LSN {}: process exit OK", Running->Action->ActionLsn); + } + } +# endif + + // Clean up and post error result + + DeleteDirectories(Running->SandboxPath); + Running->Action->SetActionState(RunnerAction::State::Failed); + } + } + + ZEN_INFO("DONE - cancelled {} running processes (took {})", TerminatedLsnList.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); +} + +void +LocalProcessRunner::SweepRunningActions() +{ + std::vector> CompletedActions; + + m_RunningLock.WithExclusiveLock([&] { + // TODO: It would be good to not hold the exclusive lock while making + // system calls and other expensive operations. + + for (auto It = begin(m_RunningMap), ItEnd = end(m_RunningMap); It != ItEnd;) + { + Ref Action = It->second; + +# if ZEN_PLATFORM_WINDOWS + DWORD ExitCode = 0; + BOOL IsSuccess = GetExitCodeProcess(Action->ProcessHandle, &ExitCode); + + if (IsSuccess && ExitCode != STILL_ACTIVE) + { + CloseHandle(Action->ProcessHandle); + Action->ProcessHandle = INVALID_HANDLE_VALUE; + + CompletedActions.push_back(std::move(Action)); + It = m_RunningMap.erase(It); + } + else + { + ++It; + } +# else + // TODO: implement properly for Mac/Linux + + ZEN_UNUSED(Action); +# endif + } + }); + + // Notify outer. Note that this has to be done without holding any local locks + // otherwise we may end up with deadlocks. + + for (Ref Running : CompletedActions) + { + const int ActionLsn = Running->Action->ActionLsn; + + if (Running->ExitCode == 0) + { + try + { + // Gather outputs + + CbPackage OutputPackage = GatherActionOutputs(Running->SandboxPath); + + Running->Action->SetResult(std::move(OutputPackage)); + Running->Action->SetActionState(RunnerAction::State::Completed); + + // We can delete the files at this point + if (!DeleteDirectories(Running->SandboxPath)) + { + ZEN_WARN("Unable to delete directory '{}', this will continue to exist until service restart", Running->SandboxPath); + } + + // Success -- continue with next iteration of the loop + continue; + } + catch (std::exception& Ex) + { + ZEN_ERROR("Encountered failure while gathering outputs for action lsn {}, '{}'", ActionLsn, Ex.what()); + } + } + + // Failed - for now this is indicated with an empty package in + // the results map. We can clean out the sandbox directory immediately. + + std::error_code Ec; + DeleteDirectories(Running->SandboxPath, Ec); + + if (Ec) + { + ZEN_WARN("Unable to delete sandbox directory '{}': {}", Running->SandboxPath, Ec.message()); + } + + Running->Action->SetActionState(RunnerAction::State::Failed); + } +} + +} // namespace zen::compute + +#endif diff --git a/src/zencompute/localrunner.h b/src/zencompute/localrunner.h new file mode 100644 index 000000000..35f464805 --- /dev/null +++ b/src/zencompute/localrunner.h @@ -0,0 +1,100 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zencompute/functionservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "functionrunner.h" + +# include +# include +# include +# include +# include + +# include +# include +# include + +namespace zen { +class CbPackage; +} + +namespace zen::compute { + +/** Direct process spawner + + This runner simply sets up a directory structure for each job and + creates a process to perform the computation in it. It is not very + efficient and is intended mostly for testing. + + */ + +class LocalProcessRunner : public FunctionRunner +{ + LocalProcessRunner(LocalProcessRunner&&) = delete; + LocalProcessRunner& operator=(LocalProcessRunner&&) = delete; + +public: + LocalProcessRunner(ChunkResolver& Resolver, const std::filesystem::path& BaseDir); + ~LocalProcessRunner(); + + virtual void Shutdown() override; + virtual void RegisterWorker(const CbPackage& WorkerPackage) override; + [[nodiscard]] virtual SubmitResult SubmitAction(Ref Action) override; + [[nodiscard]] virtual bool IsHealthy() override { return true; } + [[nodiscard]] virtual size_t GetSubmittedActionCount() override; + [[nodiscard]] virtual size_t QueryCapacity() override; + [[nodiscard]] virtual std::vector SubmitActions(const std::vector>& Actions) override; + +protected: + LoggerRef Log() { return m_Log; } + + LoggerRef m_Log; + + struct RunningAction : public RefCounted + { + Ref Action; + void* ProcessHandle = nullptr; + int ExitCode = 0; + std::filesystem::path SandboxPath; + }; + + std::atomic_bool m_AcceptNewActions; + ChunkResolver& m_ChunkResolver; + RwLock m_WorkerLock; + std::filesystem::path m_WorkerPath; + std::atomic m_SandboxCounter = 0; + std::filesystem::path m_SandboxPath; + int32_t m_MaxRunningActions = 64; // arbitrary limit for testing + + // if used in conjuction with m_ResultsLock, this lock must be taken *after* + // m_ResultsLock to avoid deadlocks + RwLock m_RunningLock; + std::unordered_map> m_RunningMap; + + std::thread m_MonitorThread; + std::atomic m_MonitorThreadEnabled{true}; + Event m_MonitorThreadEvent; + void MonitorThreadFunction(); + void SweepRunningActions(); + void CancelRunningActions(); + + std::filesystem::path CreateNewSandbox(); + void ManifestWorker(const CbPackage& WorkerPackage, + const std::filesystem::path& SandboxPath, + std::function&& ChunkReferenceCallback); + std::filesystem::path ManifestWorker(const WorkerDesc& Worker); + CbPackage GatherActionOutputs(std::filesystem::path SandboxPath); + + void DecompressAttachmentToFile(const CbPackage& FromPackage, + CbObjectView FileEntry, + const std::filesystem::path& SandboxRootPath, + std::function& ChunkReferenceCallback); +}; + +} // namespace zen::compute + +#endif diff --git a/src/zencompute/recordingreader.cpp b/src/zencompute/recordingreader.cpp new file mode 100644 index 000000000..1c1a119cf --- /dev/null +++ b/src/zencompute/recordingreader.cpp @@ -0,0 +1,335 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencompute/recordingreader.h" + +#include +#include +#include +#include +#include +#include + +#if ZEN_PLATFORM_WINDOWS +# include +# define ZEN_CONCRT_AVAILABLE 1 +#else +# define ZEN_CONCRT_AVAILABLE 0 +#endif + +#if ZEN_WITH_COMPUTE_SERVICES + +namespace zen::compute { + +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +# if ZEN_PLATFORM_WINDOWS +# define ZEN_BUILD_ACTION L"Build.action" +# define ZEN_WORKER_UCB L"worker.ucb" +# else +# define ZEN_BUILD_ACTION "Build.action" +# define ZEN_WORKER_UCB "worker.ucb" +# endif + +////////////////////////////////////////////////////////////////////////// + +struct RecordingTreeVisitor : public FileSystemTraversal::TreeVisitor +{ + virtual void VisitFile(const std::filesystem::path& Parent, + const path_view& File, + uint64_t FileSize, + uint32_t NativeModeOrAttributes, + uint64_t NativeModificationTick) + { + ZEN_UNUSED(Parent, File, FileSize, NativeModeOrAttributes, NativeModificationTick); + + if (File.compare(path_view(ZEN_BUILD_ACTION)) == 0) + { + WorkDirs.push_back(Parent); + } + else if (File.compare(path_view(ZEN_WORKER_UCB)) == 0) + { + WorkerDirs.push_back(Parent); + } + } + + virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& DirectoryName, uint32_t NativeModeOrAttributes) + { + ZEN_UNUSED(Parent, DirectoryName, NativeModeOrAttributes); + + return true; + } + + std::vector WorkerDirs; + std::vector WorkDirs; +}; + +////////////////////////////////////////////////////////////////////////// + +void +IterateOverArray(auto Array, auto Func, int TargetParallelism) +{ +# if ZEN_CONCRT_AVAILABLE + if (TargetParallelism > 1) + { + concurrency::simple_partitioner Chunker(Array.size() / TargetParallelism); + concurrency::parallel_for_each(begin(Array), end(Array), [&](const auto& Item) { Func(Item); }); + + return; + } +# else + ZEN_UNUSED(TargetParallelism); +# endif + + for (const auto& Item : Array) + { + Func(Item); + } +} + +////////////////////////////////////////////////////////////////////////// + +RecordingReaderBase::~RecordingReaderBase() = default; + +////////////////////////////////////////////////////////////////////////// + +RecordingReader::RecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingLogDir(RecordingPath) +{ + CidStoreConfiguration CidConfig; + CidConfig.RootDirectory = m_RecordingLogDir / "cid"; + CidConfig.HugeValueThreshold = 128 * 1024 * 1024; + + m_CidStore.Initialize(CidConfig); +} + +RecordingReader::~RecordingReader() +{ + m_CidStore.Flush(); +} + +size_t +RecordingReader::GetActionCount() const +{ + return m_Actions.size(); +} + +IoBuffer +RecordingReader::FindChunkByCid(const IoHash& DecompressedId) +{ + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(DecompressedId)) + { + return Chunk; + } + + ZEN_ERROR("failed lookup of chunk with CID '{}'", DecompressedId); + + return {}; +} + +std::unordered_map +RecordingReader::ReadWorkers() +{ + std::unordered_map WorkerMap; + + { + CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "workers.ztoc"); + CbObject Toc = TocFile.Object; + + m_WorkerDataFile.Open(m_RecordingLogDir / "workers.zdat", BasicFile::Mode::kRead); + + ZEN_ASSERT(Toc["version"sv].AsInt32() == 1); + + for (auto& It : Toc["toc"]) + { + CbArrayView Entry = It.AsArrayView(); + CbFieldViewIterator Vit = Entry.CreateViewIterator(); + + const IoHash WorkerId = Vit++->AsHash(); + const uint64_t Offset = Vit++->AsInt64(0); + const uint64_t Size = Vit++->AsInt64(0); + + IoBuffer WorkerRange = m_WorkerDataFile.ReadRange(Offset, Size); + CbObject WorkerDesc = LoadCompactBinaryObject(WorkerRange); + CbPackage& WorkerPkg = WorkerMap[WorkerId]; + WorkerPkg.SetObject(WorkerDesc); + + WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) { + const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); + IoBuffer AttachmentData = m_CidStore.FindChunkByCid(AttachmentCid); + + if (AttachmentData) + { + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); + WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash)); + } + }); + } + } + + // Scan actions as well (this should be called separately, ideally) + + ScanActions(); + + return WorkerMap; +} + +void +RecordingReader::ScanActions() +{ + CbObjectFromFile TocFile = LoadCompactBinaryObject(m_RecordingLogDir / "actions.ztoc"); + CbObject Toc = TocFile.Object; + + m_ActionDataFile.Open(m_RecordingLogDir / "actions.zdat", BasicFile::Mode::kRead); + + ZEN_ASSERT(Toc["version"sv].AsInt32() == 1); + + for (auto& It : Toc["toc"]) + { + CbArrayView ArrayEntry = It.AsArrayView(); + CbFieldViewIterator Vit = ArrayEntry.CreateViewIterator(); + + ActionEntry Entry; + Entry.ActionId = Vit++->AsHash(); + Entry.Offset = Vit++->AsInt64(0); + Entry.Size = Vit++->AsInt64(0); + + m_Actions.push_back(Entry); + } +} + +void +RecordingReader::IterateActions(std::function&& Callback, int TargetParallelism) +{ + IterateOverArray( + m_Actions, + [&](const ActionEntry& Entry) { + CbObject ActionDesc = LoadCompactBinaryObject(m_ActionDataFile.ReadRange(Entry.Offset, Entry.Size)); + + Callback(ActionDesc, Entry.ActionId); + }, + TargetParallelism); +} + +////////////////////////////////////////////////////////////////////////// + +IoBuffer +LocalResolver::FindChunkByCid(const IoHash& DecompressedId) +{ + RwLock::SharedLockScope _(MapLock); + if (auto It = Attachments.find(DecompressedId); It != Attachments.end()) + { + return It->second; + } + + return {}; +} + +void +LocalResolver::Add(const IoHash& Cid, IoBuffer Data) +{ + RwLock::ExclusiveLockScope _(MapLock); + Data.SetContentType(ZenContentType::kCompressedBinary); + Attachments[Cid] = Data; +} + +/// + +UeRecordingReader::UeRecordingReader(const std::filesystem::path& RecordingPath) : m_RecordingDir(RecordingPath) +{ +} + +UeRecordingReader::~UeRecordingReader() +{ +} + +size_t +UeRecordingReader::GetActionCount() const +{ + return m_WorkDirs.size(); +} + +IoBuffer +UeRecordingReader::FindChunkByCid(const IoHash& DecompressedId) +{ + return m_LocalResolver.FindChunkByCid(DecompressedId); +} + +std::unordered_map +UeRecordingReader::ReadWorkers() +{ + std::unordered_map WorkerMap; + + FileSystemTraversal Traversal; + RecordingTreeVisitor Visitor; + Traversal.TraverseFileSystem(m_RecordingDir, Visitor); + + m_WorkDirs = std::move(Visitor.WorkDirs); + + for (const std::filesystem::path& WorkerDir : Visitor.WorkerDirs) + { + CbObjectFromFile WorkerFile = LoadCompactBinaryObject(WorkerDir / "worker.ucb"); + CbObject WorkerDesc = WorkerFile.Object; + const IoHash& WorkerId = WorkerFile.Hash; + CbPackage& WorkerPkg = WorkerMap[WorkerId]; + WorkerPkg.SetObject(WorkerDesc); + + WorkerDesc.IterateAttachments([&](const zen::CbFieldView AttachmentField) { + const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); + IoBuffer AttachmentData = ReadFile(WorkerDir / "chunks" / AttachmentCid.ToHexString()).Flatten(); + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); + WorkerPkg.AddAttachment(CbAttachment(CompressedData, RawHash)); + }); + } + + return WorkerMap; +} + +void +UeRecordingReader::IterateActions(std::function&& Callback, int ParallelismTarget) +{ + IterateOverArray( + m_WorkDirs, + [&](const std::filesystem::path& WorkDir) { + CbPackage WorkPackage = ReadAction(WorkDir); + CbObject ActionObject = WorkPackage.GetObject(); + const IoHash& ActionId = WorkPackage.GetObjectHash(); + + Callback(ActionObject, ActionId); + }, + ParallelismTarget); +} + +CbPackage +UeRecordingReader::ReadAction(std::filesystem::path WorkDir) +{ + CbPackage WorkPackage; + std::filesystem::path WorkDescPath = WorkDir / "Build.action"; + CbObjectFromFile ActionFile = LoadCompactBinaryObject(WorkDescPath); + CbObject& ActionObject = ActionFile.Object; + + WorkPackage.SetObject(ActionObject); + + ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) { + const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); + IoBuffer AttachmentData = ReadFile(WorkDir / "inputs" / AttachmentCid.ToHexString()).Flatten(); + + m_LocalResolver.Add(AttachmentCid, AttachmentData); + + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); + ZEN_ASSERT(AttachmentCid == RawHash); + WorkPackage.AddAttachment(CbAttachment(CompressedData, RawHash)); + }); + + return WorkPackage; +} + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zencompute/remotehttprunner.cpp b/src/zencompute/remotehttprunner.cpp new file mode 100644 index 000000000..98ced5fe8 --- /dev/null +++ b/src/zencompute/remotehttprunner.cpp @@ -0,0 +1,457 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "remotehttprunner.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include + +# include + +////////////////////////////////////////////////////////////////////////// + +namespace zen::compute { + +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver, const std::filesystem::path& BaseDir, std::string_view HostName) +: FunctionRunner(BaseDir) +, m_Log(logging::Get("http_exec")) +, m_ChunkResolver{InChunkResolver} +, m_BaseUrl{fmt::format("{}/apply", HostName)} +, m_Http(m_BaseUrl) +{ + m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this}; +} + +RemoteHttpRunner::~RemoteHttpRunner() +{ + Shutdown(); +} + +void +RemoteHttpRunner::Shutdown() +{ + // TODO: should cleanly drain/cancel pending work + + m_MonitorThreadEnabled = false; + m_MonitorThreadEvent.Set(); + if (m_MonitorThread.joinable()) + { + m_MonitorThread.join(); + } +} + +void +RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage) +{ + const IoHash WorkerId = WorkerPackage.GetObjectHash(); + CbPackage WorkerDesc = WorkerPackage; + + std::string WorkerUrl = fmt::format("/workers/{}", WorkerId); + + HttpClient::Response WorkerResponse = m_Http.Get(WorkerUrl); + + if (WorkerResponse.StatusCode == HttpResponseCode::NotFound) + { + HttpClient::Response DescResponse = m_Http.Post(WorkerUrl, WorkerDesc.GetObject()); + + if (DescResponse.StatusCode == HttpResponseCode::NotFound) + { + CbPackage Pkg = WorkerDesc; + + // Build response package by sending only the attachments + // the other end needs. We start with the full package and + // remove the attachments which are not needed. + + { + std::unordered_set Needed; + + CbObject Response = DescResponse.AsObject(); + + for (auto& Item : Response["need"sv]) + { + const IoHash NeedHash = Item.AsHash(); + + Needed.insert(NeedHash); + } + + std::unordered_set ToRemove; + + for (const CbAttachment& Attachment : Pkg.GetAttachments()) + { + const IoHash& Hash = Attachment.GetHash(); + + if (Needed.find(Hash) == Needed.end()) + { + ToRemove.insert(Hash); + } + } + + for (const IoHash& Hash : ToRemove) + { + int RemovedCount = Pkg.RemoveAttachment(Hash); + + ZEN_ASSERT(RemovedCount == 1); + } + } + + // Post resulting package + + HttpClient::Response PayloadResponse = m_Http.Post(WorkerUrl, Pkg); + + if (!IsHttpSuccessCode(PayloadResponse.StatusCode)) + { + ZEN_ERROR("ERROR: unable to register payloads for worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); + + // TODO: propagate error + } + } + else if (!IsHttpSuccessCode(DescResponse.StatusCode)) + { + ZEN_ERROR("ERROR: unable to register worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); + + // TODO: propagate error + } + else + { + ZEN_ASSERT(DescResponse.StatusCode == HttpResponseCode::NoContent); + } + } + else if (WorkerResponse.StatusCode == HttpResponseCode::OK) + { + // Already known from a previous run + } + else if (!IsHttpSuccessCode(WorkerResponse.StatusCode)) + { + ZEN_ERROR("ERROR: unable to look up worker {} at {}{} (error: {} {})", + WorkerId, + m_Http.GetBaseUri(), + WorkerUrl, + (int)WorkerResponse.StatusCode, + ToString(WorkerResponse.StatusCode)); + + // TODO: propagate error + } +} + +size_t +RemoteHttpRunner::QueryCapacity() +{ + // Estimate how much more work we're ready to accept + + RwLock::SharedLockScope _{m_RunningLock}; + + size_t RunningCount = m_RemoteRunningMap.size(); + + if (RunningCount >= size_t(m_MaxRunningActions)) + { + return 0; + } + + return m_MaxRunningActions - RunningCount; +} + +std::vector +RemoteHttpRunner::SubmitActions(const std::vector>& Actions) +{ + std::vector Results; + + for (const Ref& Action : Actions) + { + Results.push_back(SubmitAction(Action)); + } + + return Results; +} + +SubmitResult +RemoteHttpRunner::SubmitAction(Ref Action) +{ + // Verify whether we can accept more work + + { + RwLock::SharedLockScope _{m_RunningLock}; + if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions)) + { + return SubmitResult{.IsAccepted = false}; + } + } + + using namespace std::literals; + + // Each enqueued action is assigned an integer index (logical sequence number), + // which we use as a key for tracking data structures and as an opaque id which + // may be used by clients to reference the scheduled action + + const int32_t ActionLsn = Action->ActionLsn; + const CbObject& ActionObj = Action->ActionObj; + const IoHash ActionId = ActionObj.GetHash(); + + MaybeDumpAction(ActionLsn, ActionObj); + + // Enqueue job + + CbObject Result; + + HttpClient::Response WorkResponse = m_Http.Post("/jobs", ActionObj); + HttpResponseCode WorkResponseCode = WorkResponse.StatusCode; + + if (WorkResponseCode == HttpResponseCode::OK) + { + Result = WorkResponse.AsObject(); + } + else if (WorkResponseCode == HttpResponseCode::NotFound) + { + // Not all attachments are present + + // Build response package including all required attachments + + CbPackage Pkg; + Pkg.SetObject(ActionObj); + + CbObject Response = WorkResponse.AsObject(); + + for (auto& Item : Response["need"sv]) + { + const IoHash NeedHash = Item.AsHash(); + + if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash)) + { + uint64_t DataRawSize = 0; + IoHash DataRawHash; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); + + ZEN_ASSERT(DataRawHash == NeedHash); + + Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); + } + else + { + // No such attachment + + return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)}; + } + } + + // Post resulting package + + HttpClient::Response PayloadResponse = m_Http.Post("/jobs", Pkg); + + if (!PayloadResponse) + { + ZEN_WARN("unable to register payloads for action {} at {}/jobs", ActionId, m_Http.GetBaseUri()); + + // TODO: include more information about the failure in the response + + return {.IsAccepted = false, .Reason = "HTTP request failed"}; + } + else if (PayloadResponse.StatusCode == HttpResponseCode::OK) + { + Result = PayloadResponse.AsObject(); + } + else + { + // Unexpected response + + const int ResponseStatusCode = (int)PayloadResponse.StatusCode; + + ZEN_WARN("unable to register payloads for action {} at {}/jobs (error: {} {})", + ActionId, + m_Http.GetBaseUri(), + ResponseStatusCode, + ToString(ResponseStatusCode)); + + return {.IsAccepted = false, + .Reason = fmt::format("unexpected response code {} {} from {}/jobs", + ResponseStatusCode, + ToString(ResponseStatusCode), + m_Http.GetBaseUri())}; + } + } + + if (Result) + { + if (const int32_t LsnField = Result["lsn"].AsInt32(0)) + { + HttpRunningAction NewAction; + NewAction.Action = Action; + NewAction.RemoteActionLsn = LsnField; + + { + RwLock::ExclusiveLockScope _(m_RunningLock); + + m_RemoteRunningMap[LsnField] = std::move(NewAction); + } + + ZEN_DEBUG("scheduled action {} with remote LSN {} (local LSN {})", ActionId, LsnField, ActionLsn); + + Action->SetActionState(RunnerAction::State::Running); + + return SubmitResult{.IsAccepted = true}; + } + } + + return {}; +} + +bool +RemoteHttpRunner::IsHealthy() +{ + if (HttpClient::Response Ready = m_Http.Get("/ready")) + { + return true; + } + else + { + // TODO: use response to propagate context + return false; + } +} + +size_t +RemoteHttpRunner::GetSubmittedActionCount() +{ + RwLock::SharedLockScope _(m_RunningLock); + return m_RemoteRunningMap.size(); +} + +void +RemoteHttpRunner::MonitorThreadFunction() +{ + SetCurrentThreadName("RemoteHttpRunner_Monitor"); + + do + { + const int NormalWaitingTime = 1000; + int WaitTimeMs = NormalWaitingTime; + auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(WaitTimeMs); }; + auto SweepOnce = [&] { + const size_t RetiredCount = SweepRunningActions(); + + m_RunningLock.WithSharedLock([&] { + if (m_RemoteRunningMap.size() > 16) + { + WaitTimeMs = NormalWaitingTime / 4; + } + else + { + if (RetiredCount) + { + WaitTimeMs = NormalWaitingTime / 2; + } + else + { + WaitTimeMs = NormalWaitingTime; + } + } + }); + }; + + while (!WaitOnce()) + { + SweepOnce(); + } + + // Signal received - this may mean we should quit + + SweepOnce(); + } while (m_MonitorThreadEnabled); +} + +size_t +RemoteHttpRunner::SweepRunningActions() +{ + std::vector CompletedActions; + + // Poll remote for list of completed actions + + HttpClient::Response ResponseCompleted = m_Http.Get("/jobs/completed"sv); + + if (CbObject Completed = ResponseCompleted.AsObject()) + { + for (auto& FieldIt : Completed["completed"sv]) + { + const int32_t CompleteLsn = FieldIt.AsInt32(); + + if (HttpClient::Response ResponseJob = m_Http.Get(fmt::format("/jobs/{}"sv, CompleteLsn))) + { + m_RunningLock.WithExclusiveLock([&] { + if (auto CompleteIt = m_RemoteRunningMap.find(CompleteLsn); CompleteIt != m_RemoteRunningMap.end()) + { + HttpRunningAction CompletedAction = std::move(CompleteIt->second); + CompletedAction.ActionResults = ResponseJob.AsPackage(); + CompletedAction.Success = true; + + CompletedActions.push_back(std::move(CompletedAction)); + m_RemoteRunningMap.erase(CompleteIt); + } + else + { + // we received a completion notice for an action we don't know about, + // this can happen if the runner is used by multiple upstream schedulers, + // or if this compute node was recently restarted and lost track of + // previously scheduled actions + } + }); + } + } + + if (CbObjectView Metrics = Completed["metrics"sv].AsObjectView()) + { + // if (const size_t CpuCount = Metrics["core_count"].AsInt32(0)) + if (const int32_t CpuCount = Metrics["lp_count"].AsInt32(0)) + { + const int32_t NewCap = zen::Max(4, CpuCount); + + if (m_MaxRunningActions > NewCap) + { + ZEN_DEBUG("capping {} to {} actions (was {})", m_BaseUrl, NewCap, m_MaxRunningActions); + + m_MaxRunningActions = NewCap; + } + } + } + } + + // Notify outer. Note that this has to be done without holding any local locks + // otherwise we may end up with deadlocks. + + for (HttpRunningAction& HttpAction : CompletedActions) + { + const int ActionLsn = HttpAction.Action->ActionLsn; + + if (HttpAction.Success) + { + ZEN_DEBUG("completed: {} LSN {} (remote LSN {})", HttpAction.Action->ActionId, ActionLsn, HttpAction.RemoteActionLsn); + + HttpAction.Action->SetActionState(RunnerAction::State::Completed); + + HttpAction.Action->SetResult(std::move(HttpAction.ActionResults)); + } + else + { + HttpAction.Action->SetActionState(RunnerAction::State::Failed); + } + } + + return CompletedActions.size(); +} + +} // namespace zen::compute + +#endif diff --git a/src/zencompute/remotehttprunner.h b/src/zencompute/remotehttprunner.h new file mode 100644 index 000000000..1e885da3d --- /dev/null +++ b/src/zencompute/remotehttprunner.h @@ -0,0 +1,80 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zencompute/functionservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "functionrunner.h" + +# include +# include +# include +# include + +# include +# include +# include + +namespace zen { +class CidStore; +} + +namespace zen::compute { + +/** HTTP-based runner + + This implements a DDC remote compute execution strategy via REST API + + */ + +class RemoteHttpRunner : public FunctionRunner +{ + RemoteHttpRunner(RemoteHttpRunner&&) = delete; + RemoteHttpRunner& operator=(RemoteHttpRunner&&) = delete; + +public: + RemoteHttpRunner(ChunkResolver& InChunkResolver, const std::filesystem::path& BaseDir, std::string_view HostName); + ~RemoteHttpRunner(); + + virtual void Shutdown() override; + virtual void RegisterWorker(const CbPackage& WorkerPackage) override; + [[nodiscard]] virtual SubmitResult SubmitAction(Ref Action) override; + [[nodiscard]] virtual bool IsHealthy() override; + [[nodiscard]] virtual size_t GetSubmittedActionCount() override; + [[nodiscard]] virtual size_t QueryCapacity() override; + [[nodiscard]] virtual std::vector SubmitActions(const std::vector>& Actions) override; + +protected: + LoggerRef Log() { return m_Log; } + +private: + LoggerRef m_Log; + ChunkResolver& m_ChunkResolver; + std::string m_BaseUrl; + HttpClient m_Http; + + int32_t m_MaxRunningActions = 256; // arbitrary limit for testing + + struct HttpRunningAction + { + Ref Action; + int RemoteActionLsn = 0; // Remote LSN + bool Success = false; + CbPackage ActionResults; + }; + + RwLock m_RunningLock; + std::unordered_map m_RemoteRunningMap; // Note that this is keyed on the *REMOTE* lsn + + std::thread m_MonitorThread; + std::atomic m_MonitorThreadEnabled{true}; + Event m_MonitorThreadEvent; + void MonitorThreadFunction(); + size_t SweepRunningActions(); +}; + +} // namespace zen::compute + +#endif diff --git a/src/zencompute/xmake.lua b/src/zencompute/xmake.lua new file mode 100644 index 000000000..c710b662d --- /dev/null +++ b/src/zencompute/xmake.lua @@ -0,0 +1,11 @@ +-- Copyright Epic Games, Inc. All Rights Reserved. + +target('zencompute') + set_kind("static") + set_group("libs") + add_headerfiles("**.h") + add_files("**.cpp") + add_includedirs("include", {public=true}) + add_deps("zencore", "zenstore", "zenutil", "zennet", "zenhttp") + add_packages("vcpkg::gsl-lite") + add_packages("vcpkg::spdlog", "vcpkg::cxxopts") diff --git a/src/zencompute/zencompute.cpp b/src/zencompute/zencompute.cpp new file mode 100644 index 000000000..633250f4e --- /dev/null +++ b/src/zencompute/zencompute.cpp @@ -0,0 +1,12 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencompute/zencompute.h" + +namespace zen { + +void +zencompute_forcelinktests() +{ +} + +} // namespace zen diff --git a/src/zennet/beacon.cpp b/src/zennet/beacon.cpp new file mode 100644 index 000000000..394a4afbb --- /dev/null +++ b/src/zennet/beacon.cpp @@ -0,0 +1,170 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace zen { + +////////////////////////////////////////////////////////////////////////// + +struct FsBeacon::Impl +{ + Impl(std::filesystem::path ShareRoot); + ~Impl(); + + void EnsureValid(); + + void AddGroup(std::string_view GroupId, CbObject Metadata); + void ScanGroup(std::string_view GroupId, std::vector& OutSessions); + void ReadMetadata(std::string_view GroupId, const std::vector& InSessions, std::vector& OutMetadata); + +private: + std::filesystem::path m_ShareRoot; + zen::Oid m_SessionId; + + struct GroupData + { + CbObject Metadata; + BasicFile LockFile; + }; + + std::map m_Registration; + + std::filesystem::path GetSessionMarkerPath(std::string_view GroupId, const Oid& SessionId) + { + Oid::String_t SessionIdString; + SessionId.ToString(SessionIdString); + + return m_ShareRoot / GroupId / SessionIdString; + } +}; + +FsBeacon::Impl::Impl(std::filesystem::path ShareRoot) : m_ShareRoot(ShareRoot), m_SessionId(GetSessionId()) +{ +} + +FsBeacon::Impl::~Impl() +{ +} + +void +FsBeacon::Impl::EnsureValid() +{ +} + +void +FsBeacon::Impl::AddGroup(std::string_view GroupId, CbObject Metadata) +{ + zen::CreateDirectories(m_ShareRoot / GroupId); + std::filesystem::path MarkerFile = GetSessionMarkerPath(GroupId, m_SessionId); + + GroupData& Group = m_Registration[std::string(GroupId)]; + + Group.Metadata = Metadata; + + std::error_code Ec; + Group.LockFile.Open(MarkerFile, + BasicFile::Mode::kTruncate | BasicFile::Mode::kPreventDelete | + BasicFile::Mode::kPreventWrite /* | BasicFile::Mode::kDeleteOnClose */, + Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to open beacon marker file '{}' for write", MarkerFile)); + } + + Group.LockFile.WriteAll(Metadata.GetBuffer().AsIoBuffer(), Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to write to beacon marker file '{}'", MarkerFile)); + } + + Group.LockFile.Flush(); +} + +void +FsBeacon::Impl::ScanGroup(std::string_view GroupId, std::vector& OutSessions) +{ + DirectoryContent Dc; + zen::GetDirectoryContent(m_ShareRoot / GroupId, zen::DirectoryContentFlags::IncludeFiles, /* out */ Dc); + + for (const std::filesystem::path& FilePath : Dc.Files) + { + std::filesystem::path File = FilePath.filename(); + + std::error_code Ec; + if (std::filesystem::remove(FilePath, Ec) == false) + { + auto FileString = File.generic_string(); + + if (FileString.length() != Oid::StringLength) + continue; + + if (const Oid SessionId = Oid::FromHexString(FileString)) + { + if (std::filesystem::file_size(File, Ec) > 0) + { + OutSessions.push_back(SessionId); + } + } + } + } +} + +void +FsBeacon::Impl::ReadMetadata(std::string_view GroupId, const std::vector& InSessions, std::vector& OutMetadata) +{ + for (const Oid& SessionId : InSessions) + { + const std::filesystem::path MarkerFile = GetSessionMarkerPath(GroupId, SessionId); + + if (CbObject Metadata = LoadCompactBinaryObject(MarkerFile).Object) + { + OutMetadata.push_back(std::move(Metadata)); + } + } +} + +////////////////////////////////////////////////////////////////////////// + +FsBeacon::FsBeacon(std::filesystem::path ShareRoot) : m_Impl(std::make_unique(ShareRoot)) +{ +} + +FsBeacon::~FsBeacon() +{ +} + +void +FsBeacon::AddGroup(std::string_view GroupId, CbObject Metadata) +{ + m_Impl->AddGroup(GroupId, Metadata); +} + +void +FsBeacon::ScanGroup(std::string_view GroupId, std::vector& OutSessions) +{ + m_Impl->ScanGroup(GroupId, OutSessions); +} + +void +FsBeacon::ReadMetadata(std::string_view GroupId, const std::vector& InSessions, std::vector& OutMetadata) +{ + m_Impl->ReadMetadata(GroupId, InSessions, OutMetadata); +} + +////////////////////////////////////////////////////////////////////////// + +} // namespace zen diff --git a/src/zennet/include/zennet/beacon.h b/src/zennet/include/zennet/beacon.h new file mode 100644 index 000000000..a8d4805cb --- /dev/null +++ b/src/zennet/include/zennet/beacon.h @@ -0,0 +1,38 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include + +#include + +#include +#include +#include +#include + +namespace zen { + +class CbObject; + +/** File-system based peer discovery + + Intended to be used with an SMB file share as the root. + */ + +class FsBeacon +{ +public: + FsBeacon(std::filesystem::path ShareRoot); + ~FsBeacon(); + + void AddGroup(std::string_view GroupId, CbObject Metadata); + void ScanGroup(std::string_view GroupId, std::vector& OutSessions); + void ReadMetadata(std::string_view GroupId, const std::vector& InSessions, std::vector& OutMetadata); + +private: + struct Impl; + std::unique_ptr m_Impl; +}; + +} // namespace zen diff --git a/src/zennet/include/zennet/statsdclient.h b/src/zennet/include/zennet/statsdclient.h index c378e49ce..7688c132c 100644 --- a/src/zennet/include/zennet/statsdclient.h +++ b/src/zennet/include/zennet/statsdclient.h @@ -8,6 +8,8 @@ #include #include +#undef SendMessage + namespace zen { class StatsTransportBase diff --git a/src/zennet/statsdclient.cpp b/src/zennet/statsdclient.cpp index fe5ca4dda..a0e8cb6ce 100644 --- a/src/zennet/statsdclient.cpp +++ b/src/zennet/statsdclient.cpp @@ -12,6 +12,7 @@ ZEN_THIRD_PARTY_INCLUDES_START #include #include +#undef SendMessage ZEN_THIRD_PARTY_INCLUDES_END namespace zen { diff --git a/src/zenserver-test/function-tests.cpp b/src/zenserver-test/function-tests.cpp new file mode 100644 index 000000000..559387fa2 --- /dev/null +++ b/src/zenserver-test/function-tests.cpp @@ -0,0 +1,34 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include + +#if ZEN_WITH_TESTS + +# include +# include +# include +# include +# include + +# include "zenserver-test.h" + +namespace zen::tests { + +using namespace std::literals; + +TEST_CASE("function.run") +{ + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + + ZenServerInstance Instance(TestEnv); + Instance.SetDataDir(TestDir); + Instance.SpawnServer(13337); + + ZEN_INFO("Waiting..."); + + Instance.WaitUntilReady(); +} + +} // namespace zen::tests + +#endif diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp new file mode 100644 index 000000000..173f56386 --- /dev/null +++ b/src/zenserver/compute/computeserver.cpp @@ -0,0 +1,330 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "computeserver.h" +#include +#include "computeservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include + +ZEN_THIRD_PARTY_INCLUDES_START +# include +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +void +ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) +{ + Options.add_option("compute", + "", + "upstream-notification-endpoint", + "Endpoint URL for upstream notifications", + cxxopts::value(m_ServerOptions.UpstreamNotificationEndpoint)->default_value(""), + ""); + + Options.add_option("compute", + "", + "instance-id", + "Instance ID for use in notifications", + cxxopts::value(m_ServerOptions.InstanceId)->default_value(""), + ""); +} + +void +ZenComputeServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) +{ + ZEN_UNUSED(Options); +} + +void +ZenComputeServerConfigurator::ApplyOptions(cxxopts::Options& Options) +{ + ZEN_UNUSED(Options); +} + +void +ZenComputeServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions) +{ + ZEN_UNUSED(LuaOptions); +} + +void +ZenComputeServerConfigurator::ValidateOptions() +{ +} + +/////////////////////////////////////////////////////////////////////////// + +ZenComputeServer::ZenComputeServer() +{ +} + +ZenComputeServer::~ZenComputeServer() +{ + Cleanup(); +} + +int +ZenComputeServer::Initialize(const ZenComputeServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry) +{ + ZEN_TRACE_CPU("ZenComputeServer::Initialize"); + ZEN_MEMSCOPE(GetZenserverTag()); + + ZEN_INFO(ZEN_APP_NAME " initializing in HUB server mode"); + + const int EffectiveBasePort = ZenServerBase::Initialize(ServerConfig, ServerEntry); + if (EffectiveBasePort < 0) + { + return EffectiveBasePort; + } + + // This is a workaround to make sure we can have automated tests. Without + // this the ranges for different child zen hub processes could overlap with + // the main test range. + ZenServerEnvironment::SetBaseChildId(1000); + + m_DebugOptionForcedCrash = ServerConfig.ShouldCrash; + + InitializeState(ServerConfig); + InitializeServices(ServerConfig); + RegisterServices(ServerConfig); + + ZenServerBase::Finalize(); + + return EffectiveBasePort; +} + +void +ZenComputeServer::Cleanup() +{ + ZEN_TRACE_CPU("ZenStorageServer::Cleanup"); + ZEN_INFO(ZEN_APP_NAME " cleaning up"); + try + { + m_IoContext.stop(); + if (m_IoRunner.joinable()) + { + m_IoRunner.join(); + } + + if (m_Http) + { + m_Http->Close(); + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); + } +} + +void +ZenComputeServer::InitializeState(const ZenComputeServerConfig& ServerConfig) +{ + ZEN_UNUSED(ServerConfig); +} + +void +ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) +{ + ZEN_INFO("initializing storage"); + + CidStoreConfiguration Config; + Config.RootDirectory = m_DataRoot / "cas"; + + m_CidStore = std::make_unique(m_GcManager); + m_CidStore->Initialize(Config); + + ZEN_INFO("instantiating API service"); + m_ApiService = std::make_unique(*m_Http); + + ZEN_INFO("instantiating compute service"); + m_ComputeService = std::make_unique(ServerConfig.DataDir / "compute"); + + // Ref Runner; + // Runner = zen::compute::CreateLocalRunner(*m_CidStore, ServerConfig.DataDir / "runner"); + + // TODO: (re)implement default configuration here + + ZEN_INFO("instantiating function service"); + m_FunctionService = + std::make_unique(*m_CidStore, m_StatsService, ServerConfig.DataDir / "functions"); +} + +void +ZenComputeServer::RegisterServices(const ZenComputeServerConfig& ServerConfig) +{ + ZEN_UNUSED(ServerConfig); + + if (m_ComputeService) + { + m_Http->RegisterService(*m_ComputeService); + } + + if (m_ApiService) + { + m_Http->RegisterService(*m_ApiService); + } + + if (m_FunctionService) + { + m_Http->RegisterService(*m_FunctionService); + } +} + +void +ZenComputeServer::Run() +{ + if (m_ProcessMonitor.IsActive()) + { + CheckOwnerPid(); + } + + if (!m_TestMode) + { + // clang-format off + ZEN_INFO( R"(__________ _________ __ )" "\n" + R"(\____ /____ ____ \_ ___ \ ____ _____ ______ __ ___/ |_ ____ )" "\n" + R"( / // __ \ / \/ \ \/ / _ \ / \\____ \| | \ __\/ __ \ )" "\n" + R"( / /\ ___/| | \ \___( <_> ) Y Y \ |_> > | /| | \ ___/ )" "\n" + R"(/_______ \___ >___| /\______ /\____/|__|_| / __/|____/ |__| \___ >)" "\n" + R"( \/ \/ \/ \/ \/|__| \/ )"); + // clang-format on + + ExtendableStringBuilder<256> BuildOptions; + GetBuildOptions(BuildOptions, '\n'); + ZEN_INFO("Build options ({}/{}):\n{}", GetOperatingSystemName(), GetCpuName(), BuildOptions); + } + + ZEN_INFO(ZEN_APP_NAME " now running as COMPUTE (pid: {})", GetCurrentProcessId()); + +# if ZEN_PLATFORM_WINDOWS + if (zen::windows::IsRunningOnWine()) + { + ZEN_INFO("detected Wine session - " ZEN_APP_NAME " is not formally tested on Wine and may therefore not work or perform well"); + } +# endif + +# if ZEN_USE_SENTRY + ZEN_INFO("sentry crash handler {}", m_UseSentry ? "ENABLED" : "DISABLED"); + if (m_UseSentry) + { + SentryIntegration::ClearCaches(); + } +# endif + + if (m_DebugOptionForcedCrash) + { + ZEN_DEBUG_BREAK(); + } + + const bool IsInteractiveMode = IsInteractiveSession(); // &&!m_TestMode; + + SetNewState(kRunning); + + OnReady(); + + m_Http->Run(IsInteractiveMode); + + SetNewState(kShuttingDown); + + ZEN_INFO(ZEN_APP_NAME " exiting"); +} + +////////////////////////////////////////////////////////////////////////////////// + +ZenComputeServerMain::ZenComputeServerMain(ZenComputeServerConfig& ServerOptions) +: ZenServerMain(ServerOptions) +, m_ServerOptions(ServerOptions) +{ +} + +void +ZenComputeServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) +{ + ZenComputeServer Server; + Server.SetDataRoot(m_ServerOptions.DataDir); + Server.SetContentRoot(m_ServerOptions.ContentDir); + Server.SetTestMode(m_ServerOptions.IsTest); + Server.SetDedicatedMode(m_ServerOptions.IsDedicated); + + const int EffectiveBasePort = Server.Initialize(m_ServerOptions, Entry); + if (EffectiveBasePort == -1) + { + // Server.Initialize has already logged what the issue is - just exit with failure code here. + std::exit(1); + } + + Entry->EffectiveListenPort = uint16_t(EffectiveBasePort); + if (EffectiveBasePort != m_ServerOptions.BasePort) + { + ZEN_INFO(ZEN_APP_NAME " - relocated to base port {}", EffectiveBasePort); + m_ServerOptions.BasePort = EffectiveBasePort; + } + + std::unique_ptr ShutdownThread; + std::unique_ptr ShutdownEvent; + + ExtendableStringBuilder<64> ShutdownEventName; + ShutdownEventName << "Zen_" << m_ServerOptions.BasePort << "_Shutdown"; + ShutdownEvent.reset(new NamedEvent{ShutdownEventName}); + + // Monitor shutdown signals + + ShutdownThread.reset(new std::thread{[&] { + SetCurrentThreadName("shutdown_mon"); + + ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}' for process {}", ShutdownEventName, zen::GetCurrentProcessId()); + + if (ShutdownEvent->Wait()) + { + ZEN_INFO("shutdown signal for pid {} received", zen::GetCurrentProcessId()); + Server.RequestExit(0); + } + else + { + ZEN_INFO("shutdown signal wait() failed"); + } + }}); + + auto CleanupShutdown = MakeGuard([&ShutdownEvent, &ShutdownThread] { + ReportServiceStatus(ServiceStatus::Stopping); + + if (ShutdownEvent) + { + ShutdownEvent->Set(); + } + if (ShutdownThread && ShutdownThread->joinable()) + { + ShutdownThread->join(); + } + }); + + // If we have a parent process, establish the mechanisms we need + // to be able to communicate readiness with the parent + + Server.SetIsReadyFunc([&] { + std::error_code Ec; + m_LockFile.Update(MakeLockData(true), Ec); + ReportServiceStatus(ServiceStatus::Running); + NotifyReady(); + }); + + Server.Run(); +} + +} // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zenserver/compute/computeserver.h b/src/zenserver/compute/computeserver.h new file mode 100644 index 000000000..625140b23 --- /dev/null +++ b/src/zenserver/compute/computeserver.h @@ -0,0 +1,106 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zenserver.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include + +namespace cxxopts { +class Options; +} +namespace zen::LuaConfig { +struct Options; +} + +namespace zen::compute { +class HttpFunctionService; +} + +namespace zen { + +class CidStore; +class HttpApiService; +class HttpComputeService; + +struct ZenComputeServerConfig : public ZenServerConfig +{ + std::string UpstreamNotificationEndpoint; + std::string InstanceId; // For use in notifications +}; + +struct ZenComputeServerConfigurator : public ZenServerConfiguratorBase +{ + ZenComputeServerConfigurator(ZenComputeServerConfig& ServerOptions) + : ZenServerConfiguratorBase(ServerOptions) + , m_ServerOptions(ServerOptions) + { + } + + ~ZenComputeServerConfigurator() = default; + +private: + virtual void AddCliOptions(cxxopts::Options& Options) override; + virtual void AddConfigOptions(LuaConfig::Options& Options) override; + virtual void ApplyOptions(cxxopts::Options& Options) override; + virtual void OnConfigFileParsed(LuaConfig::Options& LuaOptions) override; + virtual void ValidateOptions() override; + + ZenComputeServerConfig& m_ServerOptions; +}; + +class ZenComputeServerMain : public ZenServerMain +{ +public: + ZenComputeServerMain(ZenComputeServerConfig& ServerOptions); + virtual void DoRun(ZenServerState::ZenServerEntry* Entry) override; + + ZenComputeServerMain(const ZenComputeServerMain&) = delete; + ZenComputeServerMain& operator=(const ZenComputeServerMain&) = delete; + + typedef ZenComputeServerConfig Config; + typedef ZenComputeServerConfigurator Configurator; + +private: + ZenComputeServerConfig& m_ServerOptions; +}; + +/** + * The compute server handles DDC build function execution requests + * only. It's intended to be used on a pure compute resource and does + * not handle any storage tasks. The actual scheduling happens upstream + * in a storage server instance. + */ + +class ZenComputeServer : public ZenServerBase +{ + ZenComputeServer& operator=(ZenComputeServer&&) = delete; + ZenComputeServer(ZenComputeServer&&) = delete; + +public: + ZenComputeServer(); + ~ZenComputeServer(); + + int Initialize(const ZenComputeServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry); + void Run(); + void Cleanup(); + +private: + HttpStatsService m_StatsService; + GcManager m_GcManager; + GcScheduler m_GcScheduler{m_GcManager}; + std::unique_ptr m_CidStore; + std::unique_ptr m_ComputeService; + std::unique_ptr m_ApiService; + std::unique_ptr m_FunctionService; + + void InitializeState(const ZenComputeServerConfig& ServerConfig); + void InitializeServices(const ZenComputeServerConfig& ServerConfig); + void RegisterServices(const ZenComputeServerConfig& ServerConfig); +}; + +} // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zenserver/compute/computeservice.cpp b/src/zenserver/compute/computeservice.cpp new file mode 100644 index 000000000..2c0bc0ae9 --- /dev/null +++ b/src/zenserver/compute/computeservice.cpp @@ -0,0 +1,100 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "computeservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include +# include +# include +# include +# include +# include + +ZEN_THIRD_PARTY_INCLUDES_START +# include +# include +ZEN_THIRD_PARTY_INCLUDES_END + +# include + +namespace zen { + +////////////////////////////////////////////////////////////////////////// + +struct ResourceMetrics +{ + uint64_t DiskUsageBytes = 0; + uint64_t MemoryUsageBytes = 0; +}; + +////////////////////////////////////////////////////////////////////////// + +struct HttpComputeService::Impl +{ + Impl(const Impl&) = delete; + Impl& operator=(const Impl&) = delete; + + Impl(); + ~Impl(); + + void Initialize(std::filesystem::path BaseDir) { ZEN_UNUSED(BaseDir); } + + void Cleanup() {} + +private: +}; + +HttpComputeService::Impl::Impl() +{ +} + +HttpComputeService::Impl::~Impl() +{ +} + +/////////////////////////////////////////////////////////////////////////// + +HttpComputeService::HttpComputeService(std::filesystem::path BaseDir) : m_Impl(std::make_unique()) +{ + using namespace std::literals; + + m_Impl->Initialize(BaseDir); + + m_Router.RegisterRoute( + "status", + [this](HttpRouterRequest& Req) { + CbObjectWriter Obj; + Obj.BeginArray("modules"); + Obj.EndArray(); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "stats", + [this](HttpRouterRequest& Req) { + CbObjectWriter Obj; + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + }, + HttpVerb::kGet); +} + +HttpComputeService::~HttpComputeService() +{ +} + +const char* +HttpComputeService::BaseUri() const +{ + return "/compute/"; +} + +void +HttpComputeService::HandleRequest(zen::HttpServerRequest& Request) +{ + m_Router.HandleRequest(Request); +} + +} // namespace zen +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zenserver/compute/computeservice.h b/src/zenserver/compute/computeservice.h new file mode 100644 index 000000000..339200dd8 --- /dev/null +++ b/src/zenserver/compute/computeservice.h @@ -0,0 +1,36 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include + +#if ZEN_WITH_COMPUTE_SERVICES +namespace zen { + +/** ZenServer Compute Service + * + * Manages a set of compute workers for use in UEFN content worker + * + */ +class HttpComputeService : public zen::HttpService +{ +public: + HttpComputeService(std::filesystem::path BaseDir); + ~HttpComputeService(); + + HttpComputeService(const HttpComputeService&) = delete; + HttpComputeService& operator=(const HttpComputeService&) = delete; + + virtual const char* BaseUri() const override; + virtual void HandleRequest(zen::HttpServerRequest& Request) override; + +private: + HttpRequestRouter m_Router; + + struct Impl; + + std::unique_ptr m_Impl; +}; + +} // namespace zen +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/src/zenserver/frontend/html/compute.html b/src/zenserver/frontend/html/compute.html new file mode 100644 index 000000000..668189fe5 --- /dev/null +++ b/src/zenserver/frontend/html/compute.html @@ -0,0 +1,991 @@ + + + + + + Zen Compute Dashboard + + + + +
+
+
+

Zen Compute Dashboard

+
Last updated: Never
+
+
+
+ Checking... +
+
+ +
+ + +
Action Queue
+
+
+
Pending Actions
+
-
+
Waiting to be scheduled
+
+
+
Running Actions
+
-
+
Currently executing
+
+
+
Completed Actions
+
-
+
Results available
+
+
+ + +
+
Action Queue History
+
+ +
+
+ + +
Performance Metrics
+
+
Completion Rate
+
+
+
-
+
1 min rate
+
+
+
-
+
5 min rate
+
+
+
-
+
15 min rate
+
+
+
+
+ Total Retired + - +
+
+ Mean Rate + - +
+
+
+ + +
Workers
+
+
Worker Status
+
+ Registered Workers + - +
+ +
+ + +
Recent Actions
+
+
Action History
+
No actions recorded yet.
+ +
+ + +
System Resources
+
+
+
CPU Usage
+
-
+
Percent
+
+
+
+
+ +
+
+
+ Packages + - +
+
+ Physical Cores + - +
+
+ Logical Processors + - +
+
+
+
+
Memory
+
+ Used + - +
+
+ Total + - +
+
+
+
+
+
+
Disk
+
+ Used + - +
+
+ Total + - +
+
+
+
+
+
+
+ + + + diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp index 1a929b026..ee783d2a6 100644 --- a/src/zenserver/main.cpp +++ b/src/zenserver/main.cpp @@ -23,6 +23,9 @@ #include #include "diag/logging.h" + +#include "compute/computeserver.h" + #include "storage/storageconfig.h" #include "storage/zenstorageserver.h" @@ -61,11 +64,19 @@ namespace zen { #if ZEN_PLATFORM_WINDOWS -template +/** Windows Service wrapper for Zen servers + * + * This class wraps a Zen server main entry point (the Main template parameter) + * into a Windows Service by implementing the WindowsService interface. + * + * The Main type needs to implement the virtual functions from the ZenServerMain + * base class, which provides the actual server logic. + */ +template class ZenWindowsService : public WindowsService { public: - ZenWindowsService(typename T::Config& ServerOptions) : m_EntryPoint(ServerOptions) {} + ZenWindowsService(typename Main::Config& ServerOptions) : m_EntryPoint(ServerOptions) {} ZenWindowsService(const ZenWindowsService&) = delete; ZenWindowsService& operator=(const ZenWindowsService&) = delete; @@ -73,7 +84,7 @@ public: virtual int Run() override { return m_EntryPoint.Run(); } private: - T m_EntryPoint; + Main m_EntryPoint; }; #endif // ZEN_PLATFORM_WINDOWS @@ -84,6 +95,23 @@ private: namespace zen { +/** Application main entry point template + * + * This function handles common application startup tasks while allowing + * different server types to be plugged in via the Main template parameter. + * + * On Windows, this function also handles platform-specific service + * installation and uninstallation. + * + * The Main type needs to implement the virtual functions from the ZenServerMain + * base class, which provides the actual server logic. + * + * The Main type is also expected to provide the following members: + * + * typedef Config -- Server configuration type, derived from ZenServerConfig + * typedef Configurator -- Server configuration handler type, implements ZenServerConfiguratorBase + * + */ template int AppMain(int argc, char* argv[]) @@ -241,7 +269,12 @@ main(int argc, char* argv[]) auto _ = zen::MakeGuard([] { // Allow some time for worker threads to unravel, in an effort - // to prevent shutdown races in TLS object destruction + // to prevent shutdown races in TLS object destruction, mainly due to + // threads which we don't directly control (Windows thread pool) and + // therefore can't join. + // + // This isn't a great solution, but for now it seems to help reduce + // shutdown crashes observed in some situations. WaitForThreads(1000); }); @@ -249,6 +282,7 @@ main(int argc, char* argv[]) { kHub, kStore, + kCompute, kTest } ServerMode = kStore; @@ -258,10 +292,14 @@ main(int argc, char* argv[]) { ServerMode = kHub; } - else if (argv[1] == "store"sv) + else if ((argv[1] == "store"sv) || (argv[1] == "storage"sv)) { ServerMode = kStore; } + else if (argv[1] == "compute"sv) + { + ServerMode = kCompute; + } else if (argv[1] == "test"sv) { ServerMode = kTest; @@ -280,6 +318,13 @@ main(int argc, char* argv[]) break; case kHub: return AppMain(argc, argv); + case kCompute: +#if ZEN_WITH_COMPUTE_SERVICES + return AppMain(argc, argv); +#else + fprintf(stderr, "compute services are not compiled in!\n"); + exit(5); +#endif default: case kStore: return AppMain(argc, argv); diff --git a/src/zenserver/storage/storageconfig.cpp b/src/zenserver/storage/storageconfig.cpp index 0f8ab1e98..089b6b572 100644 --- a/src/zenserver/storage/storageconfig.cpp +++ b/src/zenserver/storage/storageconfig.cpp @@ -797,6 +797,7 @@ ZenStorageServerCmdLineOptions::AddCacheOptions(cxxopts::Options& options, ZenSt cxxopts::value(ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds)->default_value("86400"), ""); + options.add_option("compute", "", "lie-cpus", "Lie to upstream about CPU capabilities", cxxopts::value(ServerOptions.LieCpu), ""); options.add_option("cache", "", "cache-bucket-maxblocksize", diff --git a/src/zenserver/storage/storageconfig.h b/src/zenserver/storage/storageconfig.h index d59d05cf6..b408b0c26 100644 --- a/src/zenserver/storage/storageconfig.h +++ b/src/zenserver/storage/storageconfig.h @@ -156,6 +156,7 @@ struct ZenStorageServerConfig : public ZenServerConfig ZenWorkspacesConfig WorksSpacesConfig; std::filesystem::path PluginsConfigFile; // Path to plugins config file bool ObjectStoreEnabled = false; + bool ComputeEnabled = true; std::string ScrubOptions; }; diff --git a/src/zenserver/storage/zenstorageserver.cpp b/src/zenserver/storage/zenstorageserver.cpp index 2b74395c3..ff854b72d 100644 --- a/src/zenserver/storage/zenstorageserver.cpp +++ b/src/zenserver/storage/zenstorageserver.cpp @@ -182,6 +182,13 @@ ZenStorageServer::RegisterServices() #endif // ZEN_WITH_VFS m_Http->RegisterService(*m_AdminService); + +#if ZEN_WITH_COMPUTE_SERVICES + if (m_HttpFunctionService) + { + m_Http->RegisterService(*m_HttpFunctionService); + } +#endif } void @@ -267,6 +274,16 @@ ZenStorageServer::InitializeServices(const ZenStorageServerConfig& ServerOptions m_BuildStoreService = std::make_unique(m_StatusService, m_StatsService, *m_BuildStore); } +#if ZEN_WITH_COMPUTE_SERVICES + if (ServerOptions.ComputeEnabled) + { + ZEN_OTEL_SPAN("InitializeComputeService"); + + m_HttpFunctionService = + std::make_unique(*m_CidStore, m_StatsService, ServerOptions.DataDir / "functions"); + } +#endif + #if ZEN_WITH_VFS m_VfsServiceImpl = std::make_unique(); m_VfsServiceImpl->AddService(Ref(m_ProjectStore)); @@ -805,6 +822,10 @@ ZenStorageServer::Cleanup() Flush(); +#if ZEN_WITH_COMPUTE_SERVICES + m_HttpFunctionService.reset(); +#endif + m_AdminService.reset(); m_VfsService.reset(); m_VfsServiceImpl.reset(); diff --git a/src/zenserver/storage/zenstorageserver.h b/src/zenserver/storage/zenstorageserver.h index 5ccb587d6..456447a2a 100644 --- a/src/zenserver/storage/zenstorageserver.h +++ b/src/zenserver/storage/zenstorageserver.h @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -23,6 +24,10 @@ #include "vfs/vfsservice.h" #include "workspaces/httpworkspaces.h" +#if ZEN_WITH_COMPUTE_SERVICES +# include +#endif + namespace zen { class ZenStorageServer : public ZenServerBase @@ -34,11 +39,6 @@ public: ZenStorageServer(); ~ZenStorageServer(); - void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; } - void SetTestMode(bool State) { m_TestMode = State; } - void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; } - void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } - int Initialize(const ZenStorageServerConfig& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry); void Run(); void Cleanup(); @@ -48,14 +48,9 @@ private: void InitializeStructuredCache(const ZenStorageServerConfig& ServerOptions); void Flush(); - bool m_IsDedicatedMode = false; - bool m_TestMode = false; - bool m_DebugOptionForcedCrash = false; - std::string m_StartupScrubOptions; - CbObject m_RootManifest; - std::filesystem::path m_DataRoot; - std::filesystem::path m_ContentRoot; - asio::steady_timer m_StateMarkerTimer{m_IoContext}; + std::string m_StartupScrubOptions; + CbObject m_RootManifest; + asio::steady_timer m_StateMarkerTimer{m_IoContext}; void EnqueueStateMarkerTimer(); void CheckStateMarker(); @@ -95,6 +90,11 @@ private: std::unique_ptr m_BuildStoreService; std::unique_ptr m_VfsService; std::unique_ptr m_AdminService; + std::unique_ptr m_ApiService; + +#if ZEN_WITH_COMPUTE_SERVICES + std::unique_ptr m_HttpFunctionService; +#endif }; struct ZenStorageServerConfigurator; diff --git a/src/zenserver/xmake.lua b/src/zenserver/xmake.lua index 6ee80dc62..9ab51beb2 100644 --- a/src/zenserver/xmake.lua +++ b/src/zenserver/xmake.lua @@ -2,7 +2,11 @@ target("zenserver") set_kind("binary") + if enable_unity then + add_rules("c++.unity_build", {batchsize = 4}) + end add_deps("zencore", + "zencompute", "zenhttp", "zennet", "zenremotestore", diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 7f9bf56a9..7bf6126df 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -145,6 +146,13 @@ ZenServerBase::Initialize(const ZenServerConfig& ServerOptions, ZenServerState:: InitializeSecuritySettings(ServerOptions); + if (ServerOptions.LieCpu) + { + SetCpuCountForReporting(ServerOptions.LieCpu); + + ZEN_INFO("Reporting concurrency: {}", ServerOptions.LieCpu); + } + m_StatusService.RegisterHandler("status", *this); m_Http->RegisterService(m_StatusService); diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h index efa46f361..5a8a079c0 100644 --- a/src/zenserver/zenserver.h +++ b/src/zenserver/zenserver.h @@ -43,6 +43,11 @@ public: void SetIsReadyFunc(std::function&& IsReadyFunc) { m_IsReadyFunc = std::move(IsReadyFunc); } + void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; } + void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } + void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; } + void SetTestMode(bool State) { m_TestMode = State; } + protected: int Initialize(const ZenServerConfig& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry); void Finalize(); @@ -55,6 +60,10 @@ protected: bool m_UseSentry = false; bool m_IsPowerCycle = false; + bool m_IsDedicatedMode = false; + bool m_TestMode = false; + bool m_DebugOptionForcedCrash = false; + std::thread m_IoRunner; asio::io_context m_IoContext; void EnsureIoRunner(); @@ -72,6 +81,9 @@ protected: std::function m_IsReadyFunc; void OnReady(); + std::filesystem::path m_DataRoot; // Root directory for server state + std::filesystem::path m_ContentRoot; // Root directory for frontend content + Ref m_Http; std::unique_ptr m_HttpRequestFilter; @@ -114,7 +126,6 @@ protected: private: void InitializeSecuritySettings(const ZenServerConfig& ServerOptions); }; - class ZenServerMain { public: diff --git a/src/zentest-appstub/xmake.lua b/src/zentest-appstub/xmake.lua index 97615e322..db3ff2e2d 100644 --- a/src/zentest-appstub/xmake.lua +++ b/src/zentest-appstub/xmake.lua @@ -5,6 +5,9 @@ target("zentest-appstub") set_group("tests") add_headerfiles("**.h") add_files("*.cpp") + add_deps("zencore") + add_packages("vcpkg::gsl-lite") -- this should ideally be propagated by the zencore dependency + add_packages("vcpkg::mimalloc") if is_os("linux") then add_syslinks("pthread") diff --git a/src/zentest-appstub/zentest-appstub.cpp b/src/zentest-appstub/zentest-appstub.cpp index 24cf21e97..926580d96 100644 --- a/src/zentest-appstub/zentest-appstub.cpp +++ b/src/zentest-appstub/zentest-appstub.cpp @@ -1,33 +1,408 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include +#include +#include +#include +#include +#include +#include + +#if ZEN_WITH_TESTS +# define ZEN_TEST_WITH_RUNNER 1 +# include +#endif + +#include + #include +#include #include #include #include +#include +#include +#include #include -using namespace std::chrono_literals; +using namespace std::literals; +using namespace zen; + +#if !defined(_MSC_VER) +# define _strnicmp strncasecmp // TEMPORARY WORKAROUND - should not be using this +#endif + +// Some basic functions to implement some test "compute" functions + +std::string +Rot13Function(std::string_view InputString) +{ + std::string OutputString{InputString}; + + std::transform(OutputString.begin(), + OutputString.end(), + OutputString.begin(), + [](std::string::value_type c) -> std::string::value_type { + if (c >= 'a' && c <= 'z') + { + return 'a' + (c - 'a' + 13) % 26; + } + else if (c >= 'A' && c <= 'Z') + { + return 'A' + (c - 'A' + 13) % 26; + } + else + { + return c; + } + }); + + return OutputString; +} + +std::string +ReverseFunction(std::string_view InputString) +{ + std::string OutputString{InputString}; + std::reverse(OutputString.begin(), OutputString.end()); + return OutputString; +} + +std::string +IdentityFunction(std::string_view InputString) +{ + return std::string{InputString}; +} + +std::string +NullFunction(std::string_view) +{ + return {}; +} + +zen::CbObject +DescribeFunctions() +{ + CbObjectWriter Versions; + Versions << "BuildSystemVersion" << Guid::FromString("17fe280d-ccd8-4be8-a9d1-89c944a70969"sv); + + Versions.BeginArray("Functions"sv); + Versions.BeginObject(); + Versions << "Name"sv + << "Null"sv; + Versions << "Version"sv << Guid::FromString("00000000-0000-0000-0000-000000000000"sv); + Versions.EndObject(); + Versions.BeginObject(); + Versions << "Name"sv + << "Identity"sv; + Versions << "Version"sv << Guid::FromString("11111111-1111-1111-1111-111111111111"sv); + Versions.EndObject(); + Versions.BeginObject(); + Versions << "Name"sv + << "Rot13"sv; + Versions << "Version"sv << Guid::FromString("13131313-1313-1313-1313-131313131313"sv); + Versions.EndObject(); + Versions.BeginObject(); + Versions << "Name"sv + << "Reverse"sv; + Versions << "Version"sv << Guid::FromString("31313131-3131-3131-3131-313131313131"sv); + Versions.EndObject(); + Versions.EndArray(); + + return Versions.Save(); +} + +struct ContentResolver +{ + std::filesystem::path InputsRoot; + + CompressedBuffer ResolveChunk(IoHash Hash, uint64_t ExpectedSize) + { + std::filesystem::path ChunkPath = InputsRoot / Hash.ToHexString(); + IoBuffer ChunkBuffer = IoBufferBuilder::MakeFromFile(ChunkPath); + + IoHash RawHash; + uint64_t RawSize = 0; + CompressedBuffer AsCompressed = CompressedBuffer::FromCompressed(SharedBuffer(ChunkBuffer), RawHash, RawSize); + + if (RawSize != ExpectedSize) + { + throw std::runtime_error( + fmt::format("chunk size mismatch - expected {}, got {} for '{}'", ExpectedSize, ChunkBuffer.Size(), ChunkPath)); + } + if (RawHash != Hash) + { + throw std::runtime_error(fmt::format("chunk hash mismatch - expected {}, got {} for '{}'", Hash, RawHash, ChunkPath)); + } + + return AsCompressed; + } +}; + +zen::CbPackage +ExecuteFunction(CbObject Action, ContentResolver ChunkResolver) +{ + auto Apply = [&](auto Func) { + zen::CbPackage Result; + auto Source = Action["Inputs"sv].AsObjectView()["Source"sv].AsObjectView(); + + IoHash InputRawHash = Source["RawHash"sv].AsHash(); + uint64_t InputRawSize = Source["RawSize"sv].AsUInt64(); + + zen::CompressedBuffer InputData = ChunkResolver.ResolveChunk(InputRawHash, InputRawSize); + SharedBuffer Input = InputData.Decompress(); + + std::string Output = Func(std::string_view(static_cast(Input.GetData()), Input.GetSize())); + zen::CompressedBuffer OutputData = + zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Output), OodleCompressor::Selkie, OodleCompressionLevel::HyperFast4); + IoHash OutputRawHash = OutputData.DecodeRawHash(); + + CbAttachment OutputAttachment(std::move(OutputData), OutputRawHash); + + CbObjectWriter Cbo; + Cbo.BeginArray("Values"sv); + Cbo.BeginObject(); + Cbo << "Id" << Oid{1, 2, 3}; + Cbo.AddAttachment("RawHash", OutputAttachment); + Cbo << "RawSize" << Output.size(); + Cbo.EndObject(); + Cbo.EndArray(); + + Result.SetObject(Cbo.Save()); + Result.AddAttachment(std::move(OutputAttachment)); + return Result; + }; + + std::string_view Function = Action["Function"sv].AsString(); + + if (Function == "Rot13"sv) + { + return Apply(Rot13Function); + } + else if (Function == "Reverse"sv) + { + return Apply(ReverseFunction); + } + else if (Function == "Identity"sv) + { + return Apply(IdentityFunction); + } + else if (Function == "Null"sv) + { + return Apply(NullFunction); + } + else + { + return {}; + } +} + +/* This implements a minimal application to help testing of process launch-related + functionality + + It also mimics the DDC2 worker command line interface, so it may be used to + exercise compute infrastructure. + */ int main(int argc, char* argv[]) { int ExitCode = 0; - for (int i = 0; i < argc; ++i) + try { - if (std::strncmp(argv[i], "-t=", 3) == 0) + std::filesystem::path BasePath = std::filesystem::current_path(); + std::filesystem::path InputPath = std::filesystem::current_path() / "Inputs"; + std::filesystem::path OutputPath = std::filesystem::current_path() / "Outputs"; + std::filesystem::path VersionPath = std::filesystem::current_path() / "Versions"; + std::vector ActionPaths; + + /* + GetSwitchValues(TEXT("-B="), ActionPathPatterns); + GetSwitchValues(TEXT("-Build="), ActionPathPatterns); + + GetSwitchValues(TEXT("-I="), InputDirectoryPaths); + GetSwitchValues(TEXT("-Input="), InputDirectoryPaths); + + GetSwitchValues(TEXT("-O="), OutputDirectoryPaths); + GetSwitchValues(TEXT("-Output="), OutputDirectoryPaths); + + GetSwitchValues(TEXT("-V="), VersionPaths); + GetSwitchValues(TEXT("-Version="), VersionPaths); + */ + + auto SplitArg = [](const char* Arg) -> std::string_view { + std::string_view ArgView{Arg}; + if (auto SplitPos = ArgView.find_first_of('='); SplitPos != std::string_view::npos) + { + return ArgView.substr(SplitPos + 1); + } + else + { + return {}; + } + }; + + auto ParseIntArg = [](std::string_view Arg) -> int { + int Rv = 0; + const auto Result = std::from_chars(Arg.data(), Arg.data() + Arg.size(), Rv); + + if (Result.ec != std::errc{}) + { + throw std::invalid_argument(fmt::format("bad argument (not an integer): {}", Arg).c_str()); + } + + return Rv; + }; + + for (int i = 1; i < argc; ++i) + { + std::string_view Arg = argv[i]; + + if (Arg.compare(0, 1, "-")) + { + continue; + } + + if (std::strncmp(argv[i], "-t=", 3) == 0) + { + const int SleepTime = std::atoi(argv[i] + 3); + + printf("[zentest] sleeping for %ds...\n", SleepTime); + + std::this_thread::sleep_for(SleepTime * 1s); + } + else if (std::strncmp(argv[i], "-f=", 3) == 0) + { + // Force a "failure" process exit code to return to the invoker + + // This may throw for invalid arguments, which makes this useful for + // testing exception handling + std::string_view ErrorArg = SplitArg(argv[i]); + ExitCode = ParseIntArg(ErrorArg); + } + else if ((_strnicmp(argv[i], "-input=", 7) == 0) || (_strnicmp(argv[i], "-i=", 3) == 0)) + { + /* mimic DDC2 + + GetSwitchValues(TEXT("-I="), InputDirectoryPaths); + GetSwitchValues(TEXT("-Input="), InputDirectoryPaths); + */ + + std::string_view InputArg = SplitArg(argv[i]); + InputPath = InputArg; + } + else if ((_strnicmp(argv[i], "-output=", 8) == 0) || (_strnicmp(argv[i], "-o=", 3) == 0)) + { + /* mimic DDC2 handling of where files storing output chunk files are directed + + GetSwitchValues(TEXT("-O="), OutputDirectoryPaths); + GetSwitchValues(TEXT("-Output="), OutputDirectoryPaths); + */ + + std::string_view OutputArg = SplitArg(argv[i]); + OutputPath = OutputArg; + } + else if ((_strnicmp(argv[i], "-version=", 8) == 0) || (_strnicmp(argv[i], "-v=", 3) == 0)) + { + /* mimic DDC2 + + GetSwitchValues(TEXT("-V="), VersionPaths); + GetSwitchValues(TEXT("-Version="), VersionPaths); + */ + + std::string_view VersionArg = SplitArg(argv[i]); + VersionPath = VersionArg; + } + else if ((_strnicmp(argv[i], "-build=", 7) == 0) || (_strnicmp(argv[i], "-b=", 3) == 0)) + { + /* mimic DDC2 + + GetSwitchValues(TEXT("-B="), ActionPathPatterns); + GetSwitchValues(TEXT("-Build="), ActionPathPatterns); + */ + + std::string_view BuildActionArg = SplitArg(argv[i]); + std::filesystem::path ActionPath{BuildActionArg}; + ActionPaths.push_back(ActionPath); + + ExitCode = 0; + } + } + + // Emit version information + + if (!VersionPath.empty()) { - const int SleepTime = std::atoi(argv[i] + 3); + CbObjectWriter Version; + + Version << "BuildSystemVersion" << Guid::FromString("17fe280d-ccd8-4be8-a9d1-89c944a70969"sv); + + Version.BeginArray("Functions"); + + Version.BeginObject(); + Version << "Name" + << "Rot13" + << "Version" << Guid::FromString("13131313-1313-1313-1313-131313131313"sv); + Version.EndObject(); - printf("[zentest] sleeping for %ds...\n", SleepTime); + Version.BeginObject(); + Version << "Name" + << "Reverse" + << "Version" << Guid::FromString("98765432-1000-0000-0000-000000000000"sv); + Version.EndObject(); - std::this_thread::sleep_for(SleepTime * 1s); + Version.BeginObject(); + Version << "Name" + << "Identity" + << "Version" << Guid::FromString("11111111-1111-1111-1111-111111111111"sv); + Version.EndObject(); + + Version.BeginObject(); + Version << "Name" + << "Null" + << "Version" << Guid::FromString("00000000-0000-0000-0000-000000000000"sv); + Version.EndObject(); + + Version.EndArray(); + CbObject VersionObject = Version.Save(); + + BinaryWriter Writer; + zen::SaveCompactBinary(Writer, VersionObject); + zen::WriteFile(VersionPath, IoBufferBuilder::MakeFromMemory(Writer.GetView())); } - else if (std::strncmp(argv[i], "-f=", 3) == 0) + + // Evaluate actions + + ContentResolver Resolver; + Resolver.InputsRoot = InputPath; + + for (std::filesystem::path ActionPath : ActionPaths) { - ExitCode = std::atoi(argv[i] + 3); + IoBuffer ActionDescBuffer = ReadFile(ActionPath).Flatten(); + CbObject ActionDesc = LoadCompactBinaryObject(ActionDescBuffer); + CbPackage Result = ExecuteFunction(ActionDesc, Resolver); + CbObject ResultObject = Result.GetObject(); + + BinaryWriter Writer; + zen::SaveCompactBinary(Writer, ResultObject); + zen::WriteFile(ActionPath.replace_extension(".output"), IoBufferBuilder::MakeFromMemory(Writer.GetView())); + + // Also marshal outputs + + for (const auto& Attachment : Result.GetAttachments()) + { + const CompositeBuffer& AttachmentBuffer = Attachment.AsCompressedBinary().GetCompressed(); + zen::WriteFile(OutputPath / Attachment.GetHash().ToHexString(), AttachmentBuffer.Flatten().AsIoBuffer()); + } } } + catch (std::exception& Ex) + { + printf("[zentest] exception caught in main: '%s'\n", Ex.what()); + + ExitCode = 99; + } printf("[zentest] exiting with exit code: %d\n", ExitCode); diff --git a/thirdparty/xmake.lua b/thirdparty/xmake.lua index f079d803d..07605a016 100644 --- a/thirdparty/xmake.lua +++ b/thirdparty/xmake.lua @@ -86,7 +86,7 @@ target("blake3") if is_os("windows") then add_cflags("/experimental:c11atomics") - add_cflags("/wd4245") -- conversion from 'type1' to 'type2', possible loss of data + add_cflags("/wd4245", {force = true}) -- conversion from 'type1' to 'type2', possible loss of data elseif is_os("macosx") then add_cflags("-Wno-unused-function") end diff --git a/xmake.lua b/xmake.lua index 5d3162e46..3537c618d 100644 --- a/xmake.lua +++ b/xmake.lua @@ -120,6 +120,9 @@ if has_config("zensentry") and not use_asan then add_requires("sentry-native 0.12.1", {configs = {backend = "crashpad"}}) end end + +enable_unity = false + --add_rules("c++.unity_build") if is_mode("release") then @@ -240,6 +243,14 @@ else add_defines("ZEN_WITH_HTTPSYS=0") end +option("zencompute") + set_default(false) + set_showmenu(true) + set_description("Enable compute services endpoint") +option_end() +add_define_by_config("ZEN_WITH_COMPUTE_SERVICES", "zencompute") + + if is_os("windows") then add_defines("UE_MEMORY_TRACE_AVAILABLE=1") option("zenmemtrack") @@ -272,6 +283,7 @@ includes("src/zencore", "src/zencore-test") includes("src/zenhttp", "src/zenhttp-test") includes("src/zennet", "src/zennet-test") includes("src/zenremotestore", "src/zenremotestore-test") +includes("src/zencompute", "src/zencompute-test") includes("src/zenstore", "src/zenstore-test") includes("src/zentelemetry", "src/zentelemetry-test") includes("src/zenutil", "src/zenutil-test") -- cgit v1.2.3