diff options
| author | Dan Engelbrecht <[email protected]> | 2022-04-26 10:46:38 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-04-26 10:46:38 +0200 |
| commit | bd5b1a15f65f2fde3721acd16ce3a316c03583ea (patch) | |
| tree | dfa0acea7237697cbdf14a1fa2bc121a060f6ec7 | |
| parent | Merge branch 'main' into de/cache-with-block-store (diff) | |
| parent | Compute tweaks (#78) (diff) | |
| download | zen-bd5b1a15f65f2fde3721acd16ce3a316c03583ea.tar.xz zen-bd5b1a15f65f2fde3721acd16ce3a316c03583ea.zip | |
Merge remote-tracking branch 'origin/main' into de/cache-with-block-store
| -rw-r--r-- | .github/workflows/self_host_build.yml (renamed from .github/workflows/on_prem_windows.yml) | 8 | ||||
| -rw-r--r-- | .github/workflows/update_release.yml | 124 | ||||
| -rw-r--r-- | zencore/compactbinary.cpp | 19 | ||||
| -rw-r--r-- | zencore/filesystem.cpp | 17 | ||||
| -rw-r--r-- | zencore/include/zencore/compactbinary.h | 1 | ||||
| -rw-r--r-- | zenserver/compute/function.cpp | 19 | ||||
| -rw-r--r-- | zenserver/config.cpp | 65 | ||||
| -rw-r--r-- | zenserver/config.h | 8 | ||||
| -rw-r--r-- | zenserver/upstream/hordecompute.cpp | 1374 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 46 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 7 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 1321 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.h | 45 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 19 | ||||
| -rw-r--r-- | zenstore-test/zenstore-test.cpp | 2 |
15 files changed, 1725 insertions, 1350 deletions
diff --git a/.github/workflows/on_prem_windows.yml b/.github/workflows/self_host_build.yml index 04036c38c..2645f9738 100644 --- a/.github/workflows/on_prem_windows.yml +++ b/.github/workflows/self_host_build.yml @@ -1,10 +1,8 @@ -name: Validate +name: Validate Build on: - push: - branches: - - main pull_request: + types: [opened, reopened] branches: [ main ] jobs: @@ -34,6 +32,7 @@ jobs: windows-build: name: Build Windows + needs: clang-format runs-on: [self-hosted, windows, x64] strategy: matrix: @@ -82,6 +81,7 @@ jobs: linux-build: name: Build Linux + needs: clang-format runs-on: [self-hosted, linux, x64] strategy: matrix: diff --git a/.github/workflows/update_release.yml b/.github/workflows/update_release.yml new file mode 100644 index 000000000..62568d1c0 --- /dev/null +++ b/.github/workflows/update_release.yml @@ -0,0 +1,124 @@ +name: Build release + +on: + # push + pull_request: + types: [closed] + branches: [ main ] + +jobs: + windows-build: + # if: github.event.pull_request.merged == true + name: Build Windows + runs-on: [self-hosted, windows, x64] + strategy: + matrix: + config: + - 'release' + arch: + - 'x64' + env: + VCPKG_VERSION: 2022.03.10 + + steps: + - uses: actions/checkout@v2 + + - name: Setup xmake + uses: xmake-io/github-action-setup-xmake@v1 + with: + xmake-version: 2.6.4 + + - name: Installing vcpkg + run: | + git clone -b ${{env.VCPKG_VERSION}} --single-branch https://github.com/Microsoft/vcpkg.git .vcpkg + cd .vcpkg + .\bootstrap-vcpkg.bat + .\vcpkg.exe integrate install + cd .. + + - name: Cache vcpkg + uses: actions/cache@v2 + with: + path: | + ${{ github.workspace }}\.vcpkg\installed + key: ${{ runner.os }}-${{ matrix.config }}-${{env.VCPKG_VERSION}}-${{ hashFiles('xmake.lua') }}-${{ matrix.arch }}-v5 + + - name: Config + run: | + xmake config -v -y -m ${{ matrix.config }} --arch=${{ matrix.arch }} + env: + VCPKG_ROOT: ${{ github.workspace }}/.vcpkg + + - name: Build + run: | + xmake build -v -y + env: + VCPKG_ROOT: ${{ github.workspace }}/.vcpkg + + # - name: Create Archive + # run: | + # cd .\build\windows\${{ matrix.arch }}\${{ matrix.config }} + # C:\'Program Files'\7-Zip\7z.exe a -r ..\..\..\..\windows-${{ matrix.arch }}-${{ matrix.config }}.zip * + # cd ..\..\..\.. + + - name: Create Archive + run: | + cd .\build\windows\${{ matrix.arch }}\${{ matrix.config }} + C:\'Program Files'\7-Zip\7z.exe a -r ..\..\..\..\zenserver-win64.zip zenserver.exe + cd ..\..\..\.. + + - name: Get current release version info + run: | + $repo = "EpicGames/zen" + $releases = "https://api.github.com/repos/$repo/releases/latest" + Write-Host Determining latest release + $latest = (Invoke-WebRequest -Headers @{"Accept"="application/vnd.github.v3+json";"Authorization"="token ${{ secrets.GITHUB_TOKEN }}"} $releases | ConvertFrom-Json)[0] + $current_version_tag = [version]$latest.tag_name.replace('v','') + echo "Current version" $current_version_tag + $new_version_tag = [version]::New($current_version_tag.Major,$current_version_tag.Minor,$current_version_tag.Build,$current_version_tag.Revision+1).toString() + echo $new_version_tag + echo "new_version_tag=$new_version_tag" | Out-File -FilePath $env:GITHUB_ENV -Encoding utf8 -Append + + - name: Create Release + id: create_release + uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tag_name: v${{ env.new_version_tag }} + release_name: Release + draft: false + prerelease: false + + # - name: Create Release + # id: create_release + # uses: actions/create-release@v1 + # env: + # GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + # with: + # tag_name: ${{ github.ref_name }} + # release_name: Release ${{ github.head_ref }} + # draft: false + # prerelease: false + + # - name: Upload Release Asset + # id: upload-release-asset + # uses: actions/upload-release-asset@v1 + # env: + # GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + # with: + # upload_url: ${{ steps.create_release.outputs.upload_url }} # This pulls from the CREATE RELEASE step above, referencing it's ID to get its outputs object, which include a `upload_url`. See this blog post for more info: https://jasonet.co/posts/new-features-of-github-actions/#passing-data-to-future-steps + # asset_path: .\windows-${{ matrix.arch }}-${{ matrix.config }}.zip + # asset_name: windows-${{ matrix.arch }}-${{ matrix.config }} + # asset_content_type: application/zip + - name: Upload Release Asset + id: upload-release-asset + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ steps.create_release.outputs.upload_url }} + asset_path: .\zenserver-win64.zip + asset_name: zenserver-win64.zip + asset_content_type: application/zip + diff --git a/zencore/compactbinary.cpp b/zencore/compactbinary.cpp index 31d449c41..ffc1da10c 100644 --- a/zencore/compactbinary.cpp +++ b/zencore/compactbinary.cpp @@ -60,22 +60,27 @@ GetPlatformToDateTimeBiasInSeconds() return uint64_t(double(PlatformEpochYear - DateTimeEpochYear) * 365.2425) * 86400; } -DateTime -DateTime::Now() +uint64_t +DateTime::NowTicks() { - static const uint64_t EpochBias = GetPlatformToDateTimeBiasInSeconds(); - static const uint64_t SecsTo100nsTicks = int64_t(10e9 / 100); + static constexpr uint64_t EpochBias = GetPlatformToDateTimeBiasInSeconds(); #if ZEN_PLATFORM_WINDOWS FILETIME SysTime; - GetSystemTimeAsFileTime(&SysTime); - return DateTime{(EpochBias * SecsTo100nsTicks) + (uint64_t(SysTime.dwHighDateTime) << 32) | SysTime.dwLowDateTime}; + GetSystemTimePreciseAsFileTime(&SysTime); + return (EpochBias * TimeSpan::TicksPerSecond) + ((uint64_t(SysTime.dwHighDateTime) << 32) | SysTime.dwLowDateTime); #else int64_t SecondsSinceUnixEpoch = time(nullptr); - return DateTime{(EpochBias + SecondsSinceUnixEpoch) * SecsTo100nsTicks}; + return (EpochBias + SecondsSinceUnixEpoch) * TimeSpan::TicksPerSecond; #endif } +DateTime +DateTime::Now() +{ + return DateTime{NowTicks()}; +} + void DateTime::Set(int Year, int Month, int Day, int Hour, int Minute, int Second, int MilliSecond) { diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp index 7ff0dc45c..437741161 100644 --- a/zencore/filesystem.cpp +++ b/zencore/filesystem.cpp @@ -993,7 +993,7 @@ MaximizeOpenFileCount() { #if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC struct rlimit Limit; - int Error = getrlimit(RLIMIT_NOFILE, &Limit); + int Error = getrlimit(RLIMIT_NOFILE, &Limit); if (Error) { ZEN_WARN("failed getting rlimit RLIMIT_NOFILE, reason '{}'", zen::MakeErrorCode(Error).message()); @@ -1001,13 +1001,22 @@ MaximizeOpenFileCount() else { struct rlimit NewLimit = Limit; - NewLimit.rlim_cur = NewLimit.rlim_max; - ZEN_INFO("changing RLIMIT_NOFILE from rlim_cur = {}, rlim_max {} to rlim_cur = {}, rlim_max {}", Limit.rlim_cur, Limit.rlim_max, NewLimit.rlim_cur, NewLimit.rlim_max); + NewLimit.rlim_cur = NewLimit.rlim_max; + ZEN_INFO("changing RLIMIT_NOFILE from rlim_cur = {}, rlim_max {} to rlim_cur = {}, rlim_max {}", + Limit.rlim_cur, + Limit.rlim_max, + NewLimit.rlim_cur, + NewLimit.rlim_max); Error = setrlimit(RLIMIT_NOFILE, &NewLimit); if (Error != 0) { - ZEN_WARN("failed to set RLIMIT_NOFILE limits from rlim_cur = {}, rlim_max {} to rlim_cur = {}, rlim_max {}, reason '{}'", Limit.rlim_cur, Limit.rlim_max, NewLimit.rlim_cur, NewLimit.rlim_max, zen::MakeErrorCode(Error).message()); + ZEN_WARN("failed to set RLIMIT_NOFILE limits from rlim_cur = {}, rlim_max {} to rlim_cur = {}, rlim_max {}, reason '{}'", + Limit.rlim_cur, + Limit.rlim_max, + NewLimit.rlim_cur, + NewLimit.rlim_max, + zen::MakeErrorCode(Error).message()); } } #endif diff --git a/zencore/include/zencore/compactbinary.h b/zencore/include/zencore/compactbinary.h index 25fd4a7b2..19f1597dc 100644 --- a/zencore/include/zencore/compactbinary.h +++ b/zencore/include/zencore/compactbinary.h @@ -43,6 +43,7 @@ public: inline uint64_t GetTicks() const { return Ticks; } + static uint64_t NowTicks(); static DateTime Now(); int GetYear() const; diff --git a/zenserver/compute/function.cpp b/zenserver/compute/function.cpp index 7c5f33e78..dd31013ef 100644 --- a/zenserver/compute/function.cpp +++ b/zenserver/compute/function.cpp @@ -54,6 +54,16 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, m_Router.AddPattern("action", "([[:xdigit:]]{40})"); m_Router.RegisterRoute( + "ready", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + // Todo: check upstream health + return HttpReq.WriteResponse(HttpResponseCode::OK); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( "workers/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); @@ -532,6 +542,15 @@ HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, CbObject& ResultObject.AddString("stdout"sv, Completed.StdOut); ResultObject.AddString("stderr"sv, Completed.StdErr); ResultObject.AddInteger("exitcode"sv, Completed.Error.ErrorCode); + ResultObject.BeginArray("stats"sv); + for (const auto& Timepoint : Completed.Timepoints) + { + ResultObject.BeginObject(); + ResultObject.AddString("name"sv, Timepoint.first); + ResultObject.AddDateTimeTicks("time"sv, Timepoint.second); + ResultObject.EndObject(); + } + ResultObject.EndArray(); ResultObject.BeginArray("files"sv); for (const auto& File : Completed.OutputFiles) diff --git a/zenserver/config.cpp b/zenserver/config.cpp index ac0f863cc..be91ae4f8 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -389,6 +389,49 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) options.add_option("compute", "", + "upstream-horde-storage-url", + "URL to a Horde Storage instance.", + cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageUrl)->default_value(""), + ""); + + options.add_option("compute", + "", + "upstream-horde-storage-oauth-url", + "URL to the OAuth provier", + cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthUrl)->default_value(""), + ""); + + options.add_option("compute", + "", + "upstream-horde-storage-oauth-clientid", + "The OAuth client ID", + cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthClientId)->default_value(""), + ""); + + options.add_option( + "compute", + "", + "upstream-horde-storage-oauth-clientsecret", + "The OAuth client secret", + cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthClientSecret)->default_value(""), + ""); + + options.add_option("compute", + "", + "upstream-horde-storage-openid-provider", + "Name of a registered Open ID provider", + cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOpenIdProvider)->default_value(""), + ""); + + options.add_option("compute", + "", + "upstream-horde-storage-token", + "A static authentication token", + cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageAccessToken)->default_value(""), + ""); + + options.add_option("compute", + "", "upstream-horde-cluster", "The Horde compute cluster id", cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.Cluster)->default_value(""), @@ -700,6 +743,28 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio std::string_view("namespace"), ServerOptions.UpstreamCacheConfig.HordeConfig.Namespace); }; + + if (auto StorageConfig = UpstreamConfig->get<sol::optional<sol::table>>("storage")) + { + UpdateStringValueFromConfig(StorageConfig.value(), + std::string_view("url"), + ServerOptions.UpstreamCacheConfig.HordeConfig.StorageUrl); + UpdateStringValueFromConfig(StorageConfig.value(), + std::string_view("oauthprovider"), + ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthUrl); + UpdateStringValueFromConfig(StorageConfig.value(), + std::string_view("oauthclientid"), + ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthClientId); + UpdateStringValueFromConfig(StorageConfig.value(), + std::string_view("oauthclientsecret"), + ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthClientSecret); + UpdateStringValueFromConfig(StorageConfig.value(), + std::string_view("openidprovider"), + ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOpenIdProvider); + UpdateStringValueFromConfig(StorageConfig.value(), + std::string_view("token"), + ServerOptions.UpstreamCacheConfig.HordeConfig.StorageAccessToken); + }; } } diff --git a/zenserver/config.h b/zenserver/config.h index 9f1b3645c..49f039d8d 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -38,6 +38,14 @@ struct ZenUpstreamHordeConfig std::string OAuthClientSecret; std::string OpenIdProvider; std::string AccessToken; + + std::string StorageUrl; + std::string StorageOAuthUrl; + std::string StorageOAuthClientId; + std::string StorageOAuthClientSecret; + std::string StorageOpenIdProvider; + std::string StorageAccessToken; + std::string Cluster; std::string Namespace; }; diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp new file mode 100644 index 000000000..dbf86cc13 --- /dev/null +++ b/zenserver/upstream/hordecompute.cpp @@ -0,0 +1,1374 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "upstreamapply.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "jupiter.h" + +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compactbinaryvalidation.h> +# include <zencore/fmtutils.h> +# include <zencore/session.h> +# include <zencore/stream.h> +# include <zencore/thread.h> +# include <zencore/timer.h> +# include <zencore/workthreadpool.h> + +# include <zenstore/cas.h> +# include <zenstore/cidstore.h> + +# include <auth/authmgr.h> +# include <upstream/upstreamcache.h> + +# include "cache/structuredcachestore.h" +# include "diag/logging.h" + +# include <fmt/format.h> + +# include <algorithm> +# include <atomic> +# include <set> +# include <stack> + +namespace zen { + +using namespace std::literals; + +static const IoBuffer EmptyBuffer; +static const IoHash EmptyBufferId = IoHash::HashBuffer(EmptyBuffer); + +namespace detail { + + class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint + { + public: + HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& ComputeOptions, + const UpstreamAuthConfig& ComputeAuthConfig, + const CloudCacheClientOptions& StorageOptions, + const UpstreamAuthConfig& StorageAuthConfig, + CasStore& CasStore, + CidStore& CidStore, + AuthMgr& Mgr) + : m_Log(logging::Get("upstream-apply")) + , m_CasStore(CasStore) + , m_CidStore(CidStore) + , m_AuthMgr(Mgr) + { + m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl); + m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString()); + + { + std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + + if (ComputeAuthConfig.OAuthUrl.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = ComputeAuthConfig.OAuthUrl, + .ClientId = ComputeAuthConfig.OAuthClientId, + .ClientSecret = ComputeAuthConfig.OAuthClientSecret}); + } + else if (ComputeAuthConfig.OpenIdProvider.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(ComputeAuthConfig.OpenIdProvider)]() { + AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + else + { + CloudCacheAccessToken AccessToken{.Value = std::string(ComputeAuthConfig.AccessToken), + .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; + TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); + } + + m_Client = new CloudCacheClient(ComputeOptions, std::move(TokenProvider)); + } + + { + std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + + if (StorageAuthConfig.OAuthUrl.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = StorageAuthConfig.OAuthUrl, + .ClientId = StorageAuthConfig.OAuthClientId, + .ClientSecret = StorageAuthConfig.OAuthClientSecret}); + } + else if (StorageAuthConfig.OpenIdProvider.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(StorageAuthConfig.OpenIdProvider)]() { + AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + else + { + CloudCacheAccessToken AccessToken{.Value = std::string(StorageAuthConfig.AccessToken), + .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; + TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); + } + + m_StorageClient = new CloudCacheClient(StorageOptions, std::move(TokenProvider)); + } + } + + virtual ~HordeUpstreamApplyEndpoint() = default; + + virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); } + + virtual bool IsHealthy() const override { return m_HealthOk.load(); } + + virtual UpstreamEndpointHealth CheckHealth() override + { + try + { + CloudCacheSession Session(m_Client); + CloudCacheResult Result = Session.Authenticate(); + + m_HealthOk = Result.ErrorCode == 0; + + return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; + } + catch (std::exception& Err) + { + return {.Reason = Err.what(), .Ok = false}; + } + } + + virtual std::string_view DisplayName() const override { return m_DisplayName; } + + virtual PostUpstreamApplyResult PostApply(UpstreamApplyRecord ApplyRecord) override + { + PostUpstreamApplyResult ApplyResult{}; + ApplyResult.Timepoints.merge(ApplyRecord.Timepoints); + + try + { + UpstreamData UpstreamData; + if (!ProcessApplyKey(ApplyRecord, UpstreamData)) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to generate task data"}}; + } + + { + ApplyResult.Timepoints["zen-storage-build-ref"] = DateTime::NowTicks(); + std::scoped_lock Lock(m_TaskMutex); + if (m_PendingTasks.contains(UpstreamData.TaskId)) + { + // Pending task is already queued, return success + ApplyResult.Success = true; + return ApplyResult; + } + m_PendingTasks[UpstreamData.TaskId] = std::move(ApplyRecord); + } + + CloudCacheSession ComputeSession(m_Client); + CloudCacheSession StorageSession(m_StorageClient); + + { + CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs); + ApplyResult.Bytes += Result.Bytes; + ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; + ApplyResult.Timepoints["zen-storage-upload-blobs"] = DateTime::NowTicks(); + if (!Result.Success) + { + ApplyResult.Error = {.ErrorCode = Result.ErrorCode, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload blobs"}; + return ApplyResult; + } + UpstreamData.Blobs.clear(); + } + + { + CloudCacheResult Result = BatchPutObjectsIfMissing(StorageSession, UpstreamData.Objects); + ApplyResult.Bytes += Result.Bytes; + ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; + ApplyResult.Timepoints["zen-storage-upload-objects"] = DateTime::NowTicks(); + if (!Result.Success) + { + ApplyResult.Error = {.ErrorCode = Result.ErrorCode, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload objects"}; + return ApplyResult; + } + } + + { + PutRefResult RefResult = StorageSession.PutRef("requests"sv, + UpstreamData.TaskId, + UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(), + ZenContentType::kCbObject); + Log().debug("Put ref {} Need={} Bytes={} Duration={}s Result={}", + UpstreamData.TaskId, + RefResult.Needs.size(), + RefResult.Bytes, + RefResult.ElapsedSeconds, + RefResult.Success); + ApplyResult.Bytes += RefResult.Bytes; + ApplyResult.ElapsedSeconds += RefResult.ElapsedSeconds; + ApplyResult.Timepoints["zen-storage-put-ref"] = DateTime::NowTicks(); + + if (RefResult.Needs.size() > 0) + { + Log().error("Failed to add task ref {} due to {} missing blobs", UpstreamData.TaskId, RefResult.Needs.size()); + for (const auto& Hash : RefResult.Needs) + { + Log().debug("Task ref {} missing blob {}", UpstreamData.TaskId, Hash); + } + + ApplyResult.Error = {.ErrorCode = RefResult.ErrorCode, + .Reason = !RefResult.Reason.empty() ? std::move(RefResult.Reason) + : "Failed to add task ref due to missing blob"}; + return ApplyResult; + } + + if (!RefResult.Success) + { + ApplyResult.Error = {.ErrorCode = RefResult.ErrorCode, + .Reason = !RefResult.Reason.empty() ? std::move(RefResult.Reason) : "Failed to add task ref"}; + return ApplyResult; + } + UpstreamData.Objects.clear(); + } + + { + CbObjectWriter Writer; + Writer.AddString("c"sv, m_ChannelId); + Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId); + Writer.BeginArray("t"sv); + Writer.AddObjectAttachment(UpstreamData.TaskId); + Writer.EndArray(); + CbObject TasksObject = Writer.Save(); + IoBuffer TasksData = TasksObject.GetBuffer().AsIoBuffer(); + + CloudCacheResult Result = ComputeSession.PostComputeTasks(TasksData); + Log().debug("Post compute task {} Bytes={} Duration={}s Result={}", + TasksObject.GetHash(), + Result.Bytes, + Result.ElapsedSeconds, + Result.Success); + ApplyResult.Bytes += Result.Bytes; + ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; + ApplyResult.Timepoints["zen-horde-post-task"] = DateTime::NowTicks(); + if (!Result.Success) + { + { + std::scoped_lock Lock(m_TaskMutex); + m_PendingTasks.erase(UpstreamData.TaskId); + } + + ApplyResult.Error = {.ErrorCode = Result.ErrorCode, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to post compute task"}; + return ApplyResult; + } + } + + Log().info("Task posted {}", UpstreamData.TaskId); + ApplyResult.Success = true; + return ApplyResult; + } + catch (std::exception& Err) + { + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; + } + } + + [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, const std::map<IoHash, IoBuffer>& Blobs) + { + if (Blobs.size() == 0) + { + return {.Success = true}; + } + + int64_t Bytes{}; + double ElapsedSeconds{}; + + // Batch check for missing blobs + std::set<IoHash> Keys; + std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); + + CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys); + Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}", + Keys.size(), + ExistsResult.Needs.size(), + ExistsResult.ElapsedSeconds, + ExistsResult.Success); + ElapsedSeconds += ExistsResult.ElapsedSeconds; + if (!ExistsResult.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, + .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if blobs exist"}; + } + + for (const auto& Hash : ExistsResult.Needs) + { + CloudCacheResult Result = Session.PutBlob(Hash, Blobs.at(Hash)); + Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put blobs"}; + } + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + + [[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map<IoHash, CbObject>& Objects) + { + if (Objects.size() == 0) + { + return {.Success = true}; + } + + int64_t Bytes{}; + double ElapsedSeconds{}; + + // Batch check for missing objects + std::set<IoHash> Keys; + std::transform(Objects.begin(), Objects.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); + + CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys); + Log().debug("Queried {} missing objects Need={} Duration={}s Result={}", + Keys.size(), + ExistsResult.Needs.size(), + ExistsResult.ElapsedSeconds, + ExistsResult.Success); + ElapsedSeconds += ExistsResult.ElapsedSeconds; + if (!ExistsResult.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, + .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if objects exist"}; + } + + for (const auto& Hash : ExistsResult.Needs) + { + CloudCacheResult Result = Session.PutObject(Hash, Objects.at(Hash).GetBuffer().AsIoBuffer()); + Log().debug("Put object {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put objects"}; + } + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + + enum class ComputeTaskState : int32_t + { + Queued = 0, + Executing = 1, + Complete = 2, + }; + + enum class ComputeTaskOutcome : int32_t + { + Success = 0, + Failed = 1, + Cancelled = 2, + NoResult = 3, + Exipred = 4, + BlobNotFound = 5, + Exception = 6, + }; + + [[nodiscard]] static std::string_view ComputeTaskStateToString(const ComputeTaskState Outcome) + { + switch (Outcome) + { + case ComputeTaskState::Queued: + return "Queued"sv; + case ComputeTaskState::Executing: + return "Executing"sv; + case ComputeTaskState::Complete: + return "Complete"sv; + }; + return "Unknown"sv; + } + + [[nodiscard]] static std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome) + { + switch (Outcome) + { + case ComputeTaskOutcome::Success: + return "Success"sv; + case ComputeTaskOutcome::Failed: + return "Failed"sv; + case ComputeTaskOutcome::Cancelled: + return "Cancelled"sv; + case ComputeTaskOutcome::NoResult: + return "NoResult"sv; + case ComputeTaskOutcome::Exipred: + return "Exipred"sv; + case ComputeTaskOutcome::BlobNotFound: + return "BlobNotFound"sv; + case ComputeTaskOutcome::Exception: + return "Exception"sv; + }; + return "Unknown"sv; + } + + virtual GetUpstreamApplyUpdatesResult GetUpdates(WorkerThreadPool& ThreadPool) override + { + int64_t Bytes{}; + double ElapsedSeconds{}; + + { + std::scoped_lock Lock(m_TaskMutex); + if (m_PendingTasks.empty()) + { + if (m_CompletedTasks.empty()) + { + // Nothing to do. + return {.Success = true}; + } + + UpstreamApplyCompleted CompletedTasks; + std::swap(CompletedTasks, m_CompletedTasks); + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; + } + } + + try + { + CloudCacheSession ComputeSession(m_Client); + + CloudCacheResult UpdatesResult = ComputeSession.GetComputeUpdates(m_ChannelId); + Log().debug("Get compute updates Bytes={} Duration={}s Result={}", + UpdatesResult.Bytes, + UpdatesResult.ElapsedSeconds, + UpdatesResult.Success); + Bytes += UpdatesResult.Bytes; + ElapsedSeconds += UpdatesResult.ElapsedSeconds; + if (!UpdatesResult.Success) + { + return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + if (!UpdatesResult.Success) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed get task updates"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; + } + + CbObject TaskStatus = LoadCompactBinaryObject(std::move(UpdatesResult.Response)); + + for (auto& It : TaskStatus["u"sv]) + { + CbObjectView Status = It.AsObjectView(); + IoHash TaskId = Status["h"sv].AsHash(); + const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32(); + const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)Status["o"sv].AsInt32(); + + Log().info("Task {} State={}", TaskId, ComputeTaskStateToString(State)); + + // Only completed tasks need to be processed + if (State != ComputeTaskState::Complete) + { + continue; + } + + IoHash WorkerId{}; + IoHash ActionId{}; + UpstreamApplyType ApplyType{}; + + { + std::scoped_lock Lock(m_TaskMutex); + auto TaskIt = m_PendingTasks.find(TaskId); + if (TaskIt != m_PendingTasks.end()) + { + WorkerId = TaskIt->second.WorkerDescriptor.GetHash(); + ActionId = TaskIt->second.Action.GetHash(); + ApplyType = TaskIt->second.Type; + m_PendingTasks.erase(TaskIt); + } + } + + if (WorkerId == IoHash::Zero) + { + Log().warn("Task {} missing from pending tasks", TaskId); + continue; + } + + std::map<std::string, uint64_t> Timepoints; + ProcessQueueTimings(Status["qs"sv].AsObjectView(), Timepoints); + ProcessExecuteTimings(Status["es"sv].AsObjectView(), Timepoints); + + if (Outcome != ComputeTaskOutcome::Success) + { + const std::string_view Detail = Status["d"sv].AsString(); + { + std::scoped_lock Lock(m_TaskMutex); + m_CompletedTasks[WorkerId][ActionId] = { + .Error{.ErrorCode = -1, .Reason = fmt::format("Task {} {}", ComputeTaskOutcomeToString(Outcome), Detail)}, + .Timepoints = std::move(Timepoints)}; + } + continue; + } + + Timepoints["zen-complete-queue-added"] = DateTime::NowTicks(); + ThreadPool.ScheduleWork([this, + ApplyType, + ResultHash = Status["r"sv].AsHash(), + Timepoints = std::move(Timepoints), + TaskId = std::move(TaskId), + WorkerId = std::move(WorkerId), + ActionId = std::move(ActionId)]() mutable { + Timepoints["zen-complete-queue-dispatched"] = DateTime::NowTicks(); + GetUpstreamApplyResult Result = ProcessTaskStatus(ApplyType, ResultHash); + Timepoints["zen-complete-queue-complete"] = DateTime::NowTicks(); + Result.Timepoints.merge(Timepoints); + + Log().debug("Task Processed {} Files={} Attachments={} ExitCode={}", + TaskId, + Result.OutputFiles.size(), + Result.OutputPackage.GetAttachments().size(), + Result.Error.ErrorCode); + { + std::scoped_lock Lock(m_TaskMutex); + m_CompletedTasks[WorkerId][ActionId] = std::move(Result); + } + }); + } + + { + std::scoped_lock Lock(m_TaskMutex); + if (m_CompletedTasks.empty()) + { + // Nothing to do. + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + UpstreamApplyCompleted CompletedTasks; + std::swap(CompletedTasks, m_CompletedTasks); + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; + } + } + catch (std::exception& Err) + { + m_HealthOk = false; + return { + .Error{.ErrorCode = -1, .Reason = Err.what()}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + }; + } + } + + virtual UpstreamApplyEndpointStats& Stats() override { return m_Stats; } + + private: + spdlog::logger& Log() { return m_Log; } + + spdlog::logger& m_Log; + CasStore& m_CasStore; + CidStore& m_CidStore; + AuthMgr& m_AuthMgr; + std::string m_DisplayName; + RefPtr<CloudCacheClient> m_Client; + RefPtr<CloudCacheClient> m_StorageClient; + UpstreamApplyEndpointStats m_Stats; + std::atomic_bool m_HealthOk{false}; + std::string m_ChannelId; + + std::mutex m_TaskMutex; + std::unordered_map<IoHash, UpstreamApplyRecord> m_PendingTasks; + UpstreamApplyCompleted m_CompletedTasks; + + struct UpstreamData + { + std::map<IoHash, IoBuffer> Blobs; + std::map<IoHash, CbObject> Objects; + IoHash TaskId; + IoHash RequirementsId; + }; + + struct UpstreamDirectory + { + std::filesystem::path Path; + std::map<std::string, UpstreamDirectory> Directories; + std::set<std::string> Files; + }; + + static void ProcessQueueTimings(CbObjectView QueueStats, std::map<std::string, uint64_t>& Timepoints) + { + uint64_t Ticks = QueueStats["t"sv].AsDateTimeTicks(); + if (Ticks == 0) + { + return; + } + + // Scope is an array of miliseconds after start time + // TODO: cleanup + Timepoints["horde-queue-added"] = Ticks; + int Index = 0; + for (auto& Item : QueueStats["s"sv].AsArrayView()) + { + Ticks += Item.AsInt32() * TimeSpan::TicksPerMillisecond; + switch (Index) + { + case 0: + Timepoints["horde-queue-dispatched"] = Ticks; + break; + case 1: + Timepoints["horde-queue-complete"] = Ticks; + break; + } + Index++; + } + } + + static void ProcessExecuteTimings(CbObjectView ExecutionStats, std::map<std::string, uint64_t>& Timepoints) + { + uint64_t Ticks = ExecutionStats["t"sv].AsDateTimeTicks(); + if (Ticks == 0) + { + return; + } + + // Scope is an array of miliseconds after start time + // TODO: cleanup + Timepoints["horde-execution-start"] = Ticks; + int Index = 0; + for (auto& Item : ExecutionStats["s"sv].AsArrayView()) + { + Ticks += Item.AsInt32() * TimeSpan::TicksPerMillisecond; + switch (Index) + { + case 0: + Timepoints["horde-execution-download-ref"] = Ticks; + break; + case 1: + Timepoints["horde-execution-download-input"] = Ticks; + break; + case 2: + Timepoints["horde-execution-execute"] = Ticks; + break; + case 3: + Timepoints["horde-execution-upload-log"] = Ticks; + break; + case 4: + Timepoints["horde-execution-upload-output"] = Ticks; + break; + case 5: + Timepoints["horde-execution-upload-ref"] = Ticks; + break; + } + Index++; + } + } + + [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const UpstreamApplyType ApplyType, const IoHash& ResultHash) + { + try + { + CloudCacheSession Session(m_StorageClient); + + GetUpstreamApplyResult ApplyResult{}; + + IoHash StdOutHash; + IoHash StdErrHash; + IoHash OutputHash; + + std::map<IoHash, IoBuffer> BinaryData; + + { + CloudCacheResult ObjectRefResult = Session.GetRef("responses"sv, ResultHash, ZenContentType::kCbObject); + Log().debug("Get ref {} Bytes={} Duration={}s Result={}", + ResultHash, + ObjectRefResult.Bytes, + ObjectRefResult.ElapsedSeconds, + ObjectRefResult.Success); + ApplyResult.Bytes += ObjectRefResult.Bytes; + ApplyResult.ElapsedSeconds += ObjectRefResult.ElapsedSeconds; + ApplyResult.Timepoints["zen-storage-get-ref"] = DateTime::NowTicks(); + + if (!ObjectRefResult.Success) + { + ApplyResult.Error.Reason = "Failed to get result object data"; + return ApplyResult; + } + + CbObject ResultObject = LoadCompactBinaryObject(ObjectRefResult.Response); + ApplyResult.Error.ErrorCode = ResultObject["e"sv].AsInt32(); + StdOutHash = ResultObject["so"sv].AsBinaryAttachment(); + StdErrHash = ResultObject["se"sv].AsBinaryAttachment(); + OutputHash = ResultObject["o"sv].AsObjectAttachment(); + } + + { + std::set<IoHash> NeededData; + if (OutputHash != IoHash::Zero) + { + GetObjectReferencesResult ObjectReferenceResult = Session.GetObjectReferences(OutputHash); + Log().debug("Get object references {} References={} Bytes={} Duration={}s Result={}", + ResultHash, + ObjectReferenceResult.References.size(), + ObjectReferenceResult.Bytes, + ObjectReferenceResult.ElapsedSeconds, + ObjectReferenceResult.Success); + ApplyResult.Bytes += ObjectReferenceResult.Bytes; + ApplyResult.ElapsedSeconds += ObjectReferenceResult.ElapsedSeconds; + ApplyResult.Timepoints["zen-storage-get-object-references"] = DateTime::NowTicks(); + + if (!ObjectReferenceResult.Success) + { + ApplyResult.Error.Reason = "Failed to get result object references"; + return ApplyResult; + } + + NeededData = std::move(ObjectReferenceResult.References); + } + + NeededData.insert(OutputHash); + NeededData.insert(StdOutHash); + NeededData.insert(StdErrHash); + + for (const auto& Hash : NeededData) + { + if (Hash == IoHash::Zero) + { + continue; + } + CloudCacheResult BlobResult = Session.GetBlob(Hash); + Log().debug("Get blob {} Bytes={} Duration={}s Result={}", + Hash, + BlobResult.Bytes, + BlobResult.ElapsedSeconds, + BlobResult.Success); + ApplyResult.Bytes += BlobResult.Bytes; + ApplyResult.ElapsedSeconds += BlobResult.ElapsedSeconds; + if (!BlobResult.Success) + { + ApplyResult.Error.Reason = "Failed to get blob"; + return ApplyResult; + } + BinaryData[Hash] = std::move(BlobResult.Response); + } + ApplyResult.Timepoints["zen-storage-get-blobs"] = DateTime::NowTicks(); + } + + ApplyResult.StdOut = StdOutHash != IoHash::Zero + ? std::string((const char*)BinaryData[StdOutHash].GetData(), BinaryData[StdOutHash].GetSize()) + : ""; + ApplyResult.StdErr = StdErrHash != IoHash::Zero + ? std::string((const char*)BinaryData[StdErrHash].GetData(), BinaryData[StdErrHash].GetSize()) + : ""; + + if (OutputHash == IoHash::Zero) + { + ApplyResult.Error.Reason = "Task completed with no output object"; + return ApplyResult; + } + + CbObject OutputObject = LoadCompactBinaryObject(BinaryData[OutputHash]); + + switch (ApplyType) + { + case UpstreamApplyType::Simple: + { + ResolveMerkleTreeDirectory(""sv, OutputHash, BinaryData, ApplyResult.OutputFiles); + for (const auto& Pair : BinaryData) + { + ApplyResult.FileData[Pair.first] = std::move(BinaryData.at(Pair.first)); + } + + ApplyResult.Success = ApplyResult.Error.ErrorCode == 0; + return ApplyResult; + } + break; + case UpstreamApplyType::Asset: + { + if (ApplyResult.Error.ErrorCode != 0) + { + ApplyResult.Error.Reason = "Task completed with errors"; + return ApplyResult; + } + + // Get build.output + IoHash BuildOutputId; + IoBuffer BuildOutput; + for (auto& It : OutputObject["f"sv]) + { + const CbObjectView FileObject = It.AsObjectView(); + if (FileObject["n"sv].AsString() == "Build.output"sv) + { + BuildOutputId = FileObject["h"sv].AsBinaryAttachment(); + BuildOutput = BinaryData[BuildOutputId]; + break; + } + } + + if (BuildOutput.GetSize() == 0) + { + ApplyResult.Error.Reason = "Build.output file not found in task results"; + return ApplyResult; + } + + // Get Output directory node + IoBuffer OutputDirectoryTree; + for (auto& It : OutputObject["d"sv]) + { + const CbObjectView DirectoryObject = It.AsObjectView(); + if (DirectoryObject["n"sv].AsString() == "Outputs"sv) + { + OutputDirectoryTree = BinaryData[DirectoryObject["h"sv].AsObjectAttachment()]; + break; + } + } + + if (OutputDirectoryTree.GetSize() == 0) + { + ApplyResult.Error.Reason = "Outputs directory not found in task results"; + return ApplyResult; + } + + // load build.output as CbObject + + // Move Outputs from Horde to CbPackage + + std::unordered_map<IoHash, IoHash> CidToCompressedId; + CbPackage OutputPackage; + CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree); + + for (auto& It : OutputDirectoryTreeObject["f"sv]) + { + CbObjectView FileObject = It.AsObjectView(); + // Name is the uncompressed hash + IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString()); + // Hash is the compressed data hash, and how it is stored in Horde + IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment(); + + if (!BinaryData.contains(CompressedId)) + { + Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId); + ApplyResult.Error.Reason = "Object attachment chunk not retrieved from Horde"; + return ApplyResult; + } + CidToCompressedId[DecompressedId] = CompressedId; + } + + // Iterate attachments, verify all chunks exist, and add to CbPackage + bool AnyErrors = false; + CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput); + BuildOutputObject.IterateAttachments([&](CbFieldView Field) { + const IoHash DecompressedId = Field.AsHash(); + if (!CidToCompressedId.contains(DecompressedId)) + { + Log().warn("Attachment not found {}", DecompressedId); + AnyErrors = true; + return; + } + const IoHash& CompressedId = CidToCompressedId.at(DecompressedId); + + if (!BinaryData.contains(CompressedId)) + { + Log().warn("Missing output {} compressed {} uncompressed", CompressedId, DecompressedId); + AnyErrors = true; + return; + } + + CompressedBuffer AttachmentBuffer = + CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId])); + + if (!AttachmentBuffer) + { + Log().warn( + "Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed", + CompressedId, + DecompressedId); + AnyErrors = true; + return; + } + + ApplyResult.TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); + ApplyResult.TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize(); + + CbAttachment Attachment(AttachmentBuffer); + OutputPackage.AddAttachment(Attachment); + }); + + if (AnyErrors) + { + ApplyResult.Error.Reason = "Failed to get result object attachment data"; + return ApplyResult; + } + + OutputPackage.SetObject(BuildOutputObject); + ApplyResult.OutputPackage = std::move(OutputPackage); + + ApplyResult.Success = ApplyResult.Error.ErrorCode == 0; + return ApplyResult; + } + break; + } + + ApplyResult.Error.Reason = "Unknown apply type"; + return ApplyResult; + } + catch (std::exception& Err) + { + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; + } + } + + [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data) + { + std::string ExecutablePath; + std::string WorkingDirectory; + std::vector<std::string> Arguments; + std::map<std::string, std::string> Environment; + std::set<std::filesystem::path> InputFiles; + std::set<std::string> Outputs; + std::map<std::filesystem::path, IoHash> InputFileHashes; + + ExecutablePath = ApplyRecord.WorkerDescriptor["path"sv].AsString(); + if (ExecutablePath.empty()) + { + Log().warn("process apply upstream FAILED, '{}', path missing from worker descriptor", + ApplyRecord.WorkerDescriptor.GetHash()); + return false; + } + + WorkingDirectory = ApplyRecord.WorkerDescriptor["workdir"sv].AsString(); + + for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv]) + { + CbObjectView FileEntry = It.AsObjectView(); + if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) + { + return false; + } + } + + for (auto& It : ApplyRecord.WorkerDescriptor["files"sv]) + { + CbObjectView FileEntry = It.AsObjectView(); + if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) + { + return false; + } + } + + for (auto& It : ApplyRecord.WorkerDescriptor["dirs"sv]) + { + std::string_view Directory = It.AsString(); + std::string DummyFile = fmt::format("{}/.zen_empty_file", Directory); + InputFiles.insert(DummyFile); + Data.Blobs[EmptyBufferId] = EmptyBuffer; + InputFileHashes[DummyFile] = EmptyBufferId; + } + + if (!WorkingDirectory.empty()) + { + std::string DummyFile = fmt::format("{}/.zen_empty_file", WorkingDirectory); + InputFiles.insert(DummyFile); + Data.Blobs[EmptyBufferId] = EmptyBuffer; + InputFileHashes[DummyFile] = EmptyBufferId; + } + + for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv]) + { + std::string_view Env = It.AsString(); + auto Index = Env.find('='); + if (Index == std::string_view::npos) + { + Log().warn("process apply upstream FAILED, environment '{}' malformed", Env); + return false; + } + + Environment[std::string(Env.substr(0, Index))] = Env.substr(Index + 1); + } + + switch (ApplyRecord.Type) + { + case UpstreamApplyType::Simple: + { + for (auto& It : ApplyRecord.WorkerDescriptor["arguments"sv]) + { + Arguments.push_back(std::string(It.AsString())); + } + + for (auto& It : ApplyRecord.WorkerDescriptor["outputs"sv]) + { + Outputs.insert(std::string(It.AsString())); + } + } + break; + case UpstreamApplyType::Asset: + { + static const std::filesystem::path BuildActionPath = "Build.action"sv; + static const std::filesystem::path InputPath = "Inputs"sv; + const IoHash ActionId = ApplyRecord.Action.GetHash(); + + Arguments.push_back("-Build=build.action"); + Outputs.insert("Build.output"); + Outputs.insert("Outputs"); + + InputFiles.insert(BuildActionPath); + InputFileHashes[BuildActionPath] = ActionId; + Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(), + ApplyRecord.Action.GetBuffer().GetSize()); + + bool AnyErrors = false; + ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) { + const IoHash Cid = Field.AsHash(); + const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()}; + IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid); + + if (!DataBuffer) + { + Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid); + AnyErrors = true; + return; + } + + if (InputFiles.contains(FilePath)) + { + return; + } + + const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize()); + + InputFiles.insert(FilePath); + InputFileHashes[FilePath] = CompressedId; + Data.Blobs[CompressedId] = std::move(DataBuffer); + }); + + if (AnyErrors) + { + return false; + } + } + break; + } + + const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles); + + CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Blobs, Data.Objects); + const IoHash SandboxHash = Sandbox.GetHash(); + Data.Objects[SandboxHash] = std::move(Sandbox); + + { + std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString(); + if (HostPlatform.empty()) + { + Log().warn("process apply upstream FAILED, 'host' platform not provided"); + return false; + } + + int32_t LogicalCores = ApplyRecord.WorkerDescriptor["cores"sv].AsInt32(); + int64_t Memory = ApplyRecord.WorkerDescriptor["memory"sv].AsInt64(); + bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool(); + + std::string Condition = fmt::format("Platform == '{}'", HostPlatform); + if (HostPlatform == "Win64") + { + // TODO + // Condition += " && Pool == 'Win-RemoteExec'"; + } + + std::map<std::string_view, int64_t> Resources; + if (LogicalCores > 0) + { + Resources["LogicalCores"sv] = LogicalCores; + } + if (Memory > 0) + { + Resources["RAM"sv] = std::max(Memory / 1024LL / 1024LL / 1024LL, 1LL); + } + + CbObject Requirements = BuildRequirements(Condition, Resources, Exclusive); + const IoHash RequirementsId = Requirements.GetHash(); + Data.Objects[RequirementsId] = std::move(Requirements); + Data.RequirementsId = RequirementsId; + } + + CbObject Task = BuildTask(ExecutablePath, Arguments, Environment, WorkingDirectory, SandboxHash, Data.RequirementsId, Outputs); + + const IoHash TaskId = Task.GetHash(); + Data.Objects[TaskId] = std::move(Task); + Data.TaskId = TaskId; + + return true; + } + + [[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry, + std::set<std::filesystem::path>& InputFiles, + std::map<std::filesystem::path, IoHash>& InputFileHashes, + std::map<IoHash, IoBuffer>& Blobs) + { + const std::filesystem::path FilePath = FileEntry["name"sv].AsString(); + const IoHash ChunkId = FileEntry["hash"sv].AsHash(); + const uint64_t Size = FileEntry["size"sv].AsUInt64(); + IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkId); + + if (!DataBuffer) + { + Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId); + return false; + } + + if (DataBuffer.Size() != Size) + { + Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {}, action spec expected {}", + ChunkId, + DataBuffer.Size(), + Size); + return false; + } + + if (InputFiles.contains(FilePath)) + { + Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath); + return false; + } + + InputFiles.insert(FilePath); + InputFileHashes[FilePath] = ChunkId; + Blobs[ChunkId] = std::move(DataBuffer); + return true; + } + + [[nodiscard]] UpstreamDirectory BuildDirectoryTree(const std::set<std::filesystem::path>& InputFiles) + { + static const std::filesystem::path RootPath; + std::map<std::filesystem::path, UpstreamDirectory*> AllDirectories; + UpstreamDirectory RootDirectory = {.Path = RootPath}; + + AllDirectories[RootPath] = &RootDirectory; + + // Build tree from flat list + for (const auto& Path : InputFiles) + { + if (Path.has_parent_path()) + { + if (!AllDirectories.contains(Path.parent_path())) + { + std::stack<std::string> PathSplit; + { + std::filesystem::path ParentPath = Path.parent_path(); + PathSplit.push(ParentPath.filename().string()); + while (ParentPath.has_parent_path()) + { + ParentPath = ParentPath.parent_path(); + PathSplit.push(ParentPath.filename().string()); + } + } + UpstreamDirectory* ParentPtr = &RootDirectory; + while (!PathSplit.empty()) + { + if (!ParentPtr->Directories.contains(PathSplit.top())) + { + std::filesystem::path NewParentPath = {ParentPtr->Path / PathSplit.top()}; + ParentPtr->Directories[PathSplit.top()] = {.Path = NewParentPath}; + AllDirectories[NewParentPath] = &ParentPtr->Directories[PathSplit.top()]; + } + ParentPtr = &ParentPtr->Directories[PathSplit.top()]; + PathSplit.pop(); + } + } + + AllDirectories[Path.parent_path()]->Files.insert(Path.filename().string()); + } + else + { + RootDirectory.Files.insert(Path.filename().string()); + } + } + + return RootDirectory; + } + + [[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory, + const std::map<std::filesystem::path, IoHash>& InputFileHashes, + const std::map<IoHash, IoBuffer>& Blobs, + std::map<IoHash, CbObject>& Objects) + { + CbObjectWriter DirectoryTreeWriter; + + if (!RootDirectory.Files.empty()) + { + DirectoryTreeWriter.BeginArray("f"sv); + for (const auto& File : RootDirectory.Files) + { + const std::filesystem::path FilePath = {RootDirectory.Path / File}; + const IoHash& FileHash = InputFileHashes.at(FilePath); + const uint64_t FileSize = Blobs.at(FileHash).Size(); + DirectoryTreeWriter.BeginObject(); + DirectoryTreeWriter.AddString("n"sv, File); + DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash); + DirectoryTreeWriter.AddInteger("s"sv, FileSize); // Size + // DirectoryTreeWriter.AddInteger("a"sv, 0); // Attributes Currently unneeded + DirectoryTreeWriter.EndObject(); + } + DirectoryTreeWriter.EndArray(); + } + + if (!RootDirectory.Directories.empty()) + { + DirectoryTreeWriter.BeginArray("d"sv); + for (const auto& Item : RootDirectory.Directories) + { + CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Blobs, Objects); + const IoHash DirectoryHash = Directory.GetHash(); + Objects[DirectoryHash] = std::move(Directory); + + DirectoryTreeWriter.BeginObject(); + DirectoryTreeWriter.AddString("n"sv, Item.first); + DirectoryTreeWriter.AddObjectAttachment("h"sv, DirectoryHash); + DirectoryTreeWriter.EndObject(); + } + DirectoryTreeWriter.EndArray(); + } + + return DirectoryTreeWriter.Save(); + } + + void ResolveMerkleTreeDirectory(const std::filesystem::path& ParentDirectory, + const IoHash& DirectoryHash, + const std::map<IoHash, IoBuffer>& Objects, + std::map<std::filesystem::path, IoHash>& OutputFiles) + { + CbObject Directory = LoadCompactBinaryObject(Objects.at(DirectoryHash)); + + for (auto& It : Directory["f"sv]) + { + const CbObjectView FileObject = It.AsObjectView(); + const std::filesystem::path Path = ParentDirectory / FileObject["n"sv].AsString(); + + OutputFiles[Path] = FileObject["h"sv].AsBinaryAttachment(); + } + + for (auto& It : Directory["d"sv]) + { + const CbObjectView DirectoryObject = It.AsObjectView(); + + ResolveMerkleTreeDirectory(ParentDirectory / DirectoryObject["n"sv].AsString(), + DirectoryObject["h"sv].AsObjectAttachment(), + Objects, + OutputFiles); + } + } + + [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition, + const std::map<std::string_view, int64_t>& Resources, + const bool Exclusive) + { + CbObjectWriter Writer; + Writer.AddString("c", Condition); + if (!Resources.empty()) + { + Writer.BeginArray("r"); + for (const auto& Resource : Resources) + { + Writer.BeginArray(); + Writer.AddString(Resource.first); + Writer.AddInteger(Resource.second); + Writer.EndArray(); + } + Writer.EndArray(); + } + Writer.AddBool("e", Exclusive); + return Writer.Save(); + } + + [[nodiscard]] CbObject BuildTask(const std::string_view Executable, + const std::vector<std::string>& Arguments, + const std::map<std::string, std::string>& Environment, + const std::string_view WorkingDirectory, + const IoHash& SandboxHash, + const IoHash& RequirementsId, + const std::set<std::string>& Outputs) + { + CbObjectWriter TaskWriter; + TaskWriter.AddString("e"sv, Executable); + + if (!Arguments.empty()) + { + TaskWriter.BeginArray("a"sv); + for (const auto& Argument : Arguments) + { + TaskWriter.AddString(Argument); + } + TaskWriter.EndArray(); + } + + if (!Environment.empty()) + { + TaskWriter.BeginArray("v"sv); + for (const auto& Env : Environment) + { + TaskWriter.BeginArray(); + TaskWriter.AddString(Env.first); + TaskWriter.AddString(Env.second); + TaskWriter.EndArray(); + } + TaskWriter.EndArray(); + } + + if (!WorkingDirectory.empty()) + { + TaskWriter.AddString("w"sv, WorkingDirectory); + } + + TaskWriter.AddObjectAttachment("s"sv, SandboxHash); + TaskWriter.AddObjectAttachment("r"sv, RequirementsId); + + // Outputs + if (!Outputs.empty()) + { + TaskWriter.BeginArray("o"sv); + for (const auto& Output : Outputs) + { + TaskWriter.AddString(Output); + } + TaskWriter.EndArray(); + } + + return TaskWriter.Save(); + } + }; +} // namespace detail + +////////////////////////////////////////////////////////////////////////// + +std::unique_ptr<UpstreamApplyEndpoint> +UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions, + const UpstreamAuthConfig& ComputeAuthConfig, + const CloudCacheClientOptions& StorageOptions, + const UpstreamAuthConfig& StorageAuthConfig, + CasStore& CasStore, + CidStore& CidStore, + AuthMgr& Mgr) +{ + return std::make_unique<detail::HordeUpstreamApplyEndpoint>(ComputeOptions, + ComputeAuthConfig, + StorageOptions, + StorageAuthConfig, + CasStore, + CidStore, + Mgr); +} + +} // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES
\ No newline at end of file diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 7eef96556..4bec41a29 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -540,6 +540,50 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } +GetObjectReferencesResult +CloudCacheSession::GetObjectReferences(const IoHash& Key) +{ + ZEN_TRACE_CPU("HordeClient::GetObjectReferences"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString() + << "/references"; + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); + Session.SetOption(cpr::Body{}); + + cpr::Response Response = Session.Get(); + ZEN_DEBUG("GET {}", Response); + + if (Response.error) + { + return {CloudCacheResult{.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}}; + } + + GetObjectReferencesResult Result{ + CloudCacheResult{.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}}; + + if (Result.Success) + { + IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + const CbObject ReferencesResponse = LoadCompactBinaryObject(Buffer); + for (auto& Item : ReferencesResponse["references"sv]) + { + Result.References.insert(Item.AsHash()); + } + } + + return Result; +} + CloudCacheResult CloudCacheSession::BlobExists(const IoHash& Key) { @@ -603,7 +647,7 @@ CloudCacheSession::PostComputeTasks(IoBuffer TasksData) return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } - return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } CloudCacheResult diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index bc0d84506..cff9a9ef1 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -76,6 +76,11 @@ struct CloudCacheExistsResult : CloudCacheResult std::set<IoHash> Needs; }; +struct GetObjectReferencesResult : CloudCacheResult +{ + std::set<IoHash> References; +}; + /** * Context for performing Jupiter operations * @@ -108,6 +113,8 @@ public: CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key); + GetObjectReferencesResult GetObjectReferences(const IoHash& Key); + CloudCacheResult BlobExists(const IoHash& Key); CloudCacheResult CompressedBlobExists(const IoHash& Key); CloudCacheResult ObjectExists(const IoHash& Key); diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp index 17a6bb3cf..9758e7565 100644 --- a/zenserver/upstream/upstreamapply.cpp +++ b/zenserver/upstream/upstreamapply.cpp @@ -4,1303 +4,26 @@ #if ZEN_WITH_COMPUTE_SERVICES -# include "jupiter.h" -# include "zen.h" - # include <zencore/compactbinary.h> # include <zencore/compactbinarybuilder.h> -# include <zencore/compactbinarypackage.h> -# include <zencore/compactbinaryvalidation.h> -# include <zencore/compress.h> # include <zencore/fmtutils.h> -# include <zencore/session.h> -# include <zencore/stats.h> # include <zencore/stream.h> -# include <zencore/thread.h> # include <zencore/timer.h> # include <zencore/workthreadpool.h> # include <zenstore/cas.h> # include <zenstore/cidstore.h> -# include <auth/authmgr.h> -# include <upstream/upstreamcache.h> - -# include "cache/structuredcachestore.h" # include "diag/logging.h" # include <fmt/format.h> -# include <algorithm> # include <atomic> -# include <set> -# include <stack> -# include <thread> -# include <unordered_map> namespace zen { using namespace std::literals; -static const IoBuffer EmptyBuffer; -static const IoHash EmptyBufferId = IoHash::HashBuffer(EmptyBuffer); - -namespace detail { - - class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint - { - public: - HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& ComputeOptions, - const UpstreamAuthConfig& ComputeAuthConfig, - const CloudCacheClientOptions& StorageOptions, - const UpstreamAuthConfig& StorageAuthConfig, - CasStore& CasStore, - CidStore& CidStore, - AuthMgr& Mgr) - : m_Log(logging::Get("upstream-apply")) - , m_CasStore(CasStore) - , m_CidStore(CidStore) - , m_AuthMgr(Mgr) - { - m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl); - m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString()); - - { - std::unique_ptr<CloudCacheTokenProvider> TokenProvider; - - if (ComputeAuthConfig.OAuthUrl.empty() == false) - { - TokenProvider = - CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = ComputeAuthConfig.OAuthUrl, - .ClientId = ComputeAuthConfig.OAuthClientId, - .ClientSecret = ComputeAuthConfig.OAuthClientSecret}); - } - else if (ComputeAuthConfig.OpenIdProvider.empty() == false) - { - TokenProvider = - CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(ComputeAuthConfig.OpenIdProvider)]() { - AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); - return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; - }); - } - else - { - CloudCacheAccessToken AccessToken{.Value = std::string(ComputeAuthConfig.AccessToken), - .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; - TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); - } - - m_Client = new CloudCacheClient(ComputeOptions, std::move(TokenProvider)); - } - - { - std::unique_ptr<CloudCacheTokenProvider> TokenProvider; - - if (StorageAuthConfig.OAuthUrl.empty() == false) - { - TokenProvider = - CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = StorageAuthConfig.OAuthUrl, - .ClientId = StorageAuthConfig.OAuthClientId, - .ClientSecret = StorageAuthConfig.OAuthClientSecret}); - } - else if (StorageAuthConfig.OpenIdProvider.empty() == false) - { - TokenProvider = - CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(StorageAuthConfig.OpenIdProvider)]() { - AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); - return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; - }); - } - else - { - CloudCacheAccessToken AccessToken{.Value = std::string(StorageAuthConfig.AccessToken), - .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; - TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); - } - - m_StorageClient = new CloudCacheClient(StorageOptions, std::move(TokenProvider)); - } - } - - virtual ~HordeUpstreamApplyEndpoint() = default; - - virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); } - - virtual bool IsHealthy() const override { return m_HealthOk.load(); } - - virtual UpstreamEndpointHealth CheckHealth() override - { - try - { - CloudCacheSession Session(m_Client); - CloudCacheResult Result = Session.Authenticate(); - - m_HealthOk = Result.ErrorCode == 0; - - return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; - } - catch (std::exception& Err) - { - return {.Reason = Err.what(), .Ok = false}; - } - } - - virtual std::string_view DisplayName() const override { return m_DisplayName; } - - virtual PostUpstreamApplyResult PostApply(UpstreamApplyRecord ApplyRecord) override - { - int64_t Bytes{}; - double ElapsedSeconds{}; - - try - { - UpstreamData UpstreamData; - if (!ProcessApplyKey(ApplyRecord, UpstreamData)) - { - return {.Error{.ErrorCode = -1, .Reason = "Failed to generate task data"}}; - } - - { - std::scoped_lock Lock(m_TaskMutex); - if (m_PendingTasks.contains(UpstreamData.TaskId)) - { - // Pending task is already queued, return success - return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; - } - m_PendingTasks[UpstreamData.TaskId] = std::move(ApplyRecord); - } - - CloudCacheSession ComputeSession(m_Client); - CloudCacheSession StorageSession(m_StorageClient); - - { - CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs); - Bytes += Result.Bytes; - ElapsedSeconds += Result.ElapsedSeconds; - if (!Result.Success) - { - return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, - .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload blobs"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - UpstreamData.Blobs.clear(); - } - - { - CloudCacheResult Result = BatchPutObjectsIfMissing(StorageSession, UpstreamData.Objects); - Bytes += Result.Bytes; - ElapsedSeconds += Result.ElapsedSeconds; - if (!Result.Success) - { - return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, - .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload objects"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - - PutRefResult RefResult = StorageSession.PutRef("requests"sv, - UpstreamData.TaskId, - UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(), - ZenContentType::kCbObject); - Log().debug("Put ref {} Need={} Bytes={} Duration={}s Result={}", - UpstreamData.TaskId, - RefResult.Needs.size(), - RefResult.Bytes, - RefResult.ElapsedSeconds, - RefResult.Success); - - Bytes += RefResult.Bytes; - ElapsedSeconds += RefResult.ElapsedSeconds; - if (!RefResult.Success) - { - return {.Error{.ErrorCode = RefResult.ErrorCode ? RefResult.ErrorCode : -1, - .Reason = !RefResult.Reason.empty() ? std::move(RefResult.Reason) : "Failed to add task ref"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - UpstreamData.Objects.clear(); - } - - CbObjectWriter Writer; - Writer.AddString("c"sv, m_ChannelId); - Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId); - Writer.BeginArray("t"sv); - Writer.AddObjectAttachment(UpstreamData.TaskId); - Writer.EndArray(); - CbObject TasksObject = Writer.Save(); - IoBuffer TasksData = TasksObject.GetBuffer().AsIoBuffer(); - - CloudCacheResult Result = ComputeSession.PostComputeTasks(TasksData); - Log().debug("Post compute task {} Bytes={} Duration={}s Result={}", - TasksObject.GetHash(), - Result.Bytes, - Result.ElapsedSeconds, - Result.Success); - Bytes += Result.Bytes; - ElapsedSeconds += Result.ElapsedSeconds; - if (!Result.Success) - { - { - std::scoped_lock Lock(m_TaskMutex); - m_PendingTasks.erase(UpstreamData.TaskId); - } - - return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, - .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to post compute task"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - - Log().info("Task posted {}", UpstreamData.TaskId); - return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; - } - catch (std::exception& Err) - { - m_HealthOk = false; - return {.Error{.ErrorCode = -1, .Reason = Err.what()}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; - } - } - - [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, const std::map<IoHash, IoBuffer>& Blobs) - { - if (Blobs.size() == 0) - { - return {.Success = true}; - } - - int64_t Bytes{}; - double ElapsedSeconds{}; - - // Batch check for missing blobs - std::set<IoHash> Keys; - std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); - - CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys); - Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}", - Keys.size(), - ExistsResult.Needs.size(), - ExistsResult.ElapsedSeconds, - ExistsResult.Success); - ElapsedSeconds += ExistsResult.ElapsedSeconds; - if (!ExistsResult.Success) - { - return {.Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, - .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if blobs exist"}; - } - - // TODO: Batch upload missing blobs - - for (const auto& Hash : ExistsResult.Needs) - { - CloudCacheResult Result = Session.PutBlob(Hash, Blobs.at(Hash)); - Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); - Bytes += Result.Bytes; - ElapsedSeconds += Result.ElapsedSeconds; - if (!Result.Success) - { - return {.Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, - .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put blobs"}; - } - } - - return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; - } - - [[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map<IoHash, CbObject>& Objects) - { - if (Objects.size() == 0) - { - return {.Success = true}; - } - - int64_t Bytes{}; - double ElapsedSeconds{}; - - // Batch check for missing objects - std::set<IoHash> Keys; - std::transform(Objects.begin(), Objects.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); - - // Todo: Endpoint doesn't exist for objects - CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys); - Log().debug("Queried {} missing objects Need={} Duration={}s Result={}", - Keys.size(), - ExistsResult.Needs.size(), - ExistsResult.ElapsedSeconds, - ExistsResult.Success); - ElapsedSeconds += ExistsResult.ElapsedSeconds; - if (!ExistsResult.Success) - { - return {.Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, - .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if objects exist"}; - } - - // TODO: Batch upload missing objects - - for (const auto& Hash : ExistsResult.Needs) - { - CloudCacheResult Result = Session.PutObject(Hash, Objects.at(Hash).GetBuffer().AsIoBuffer()); - Log().debug("Put object {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); - Bytes += Result.Bytes; - ElapsedSeconds += Result.ElapsedSeconds; - if (!Result.Success) - { - return {.Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, - .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put objects"}; - } - } - - return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; - } - - enum class ComputeTaskState : int32_t - { - Queued = 0, - Executing = 1, - Complete = 2, - }; - - enum class ComputeTaskOutcome : int32_t - { - Success = 0, - Failed = 1, - Cancelled = 2, - NoResult = 3, - Exipred = 4, - BlobNotFound = 5, - Exception = 6, - }; - - [[nodiscard]] static std::string_view ComputeTaskStateToString(const ComputeTaskState Outcome) - { - switch (Outcome) - { - case ComputeTaskState::Queued: - return "Queued"sv; - case ComputeTaskState::Executing: - return "Executing"sv; - case ComputeTaskState::Complete: - return "Complete"sv; - }; - return "Unknown"sv; - } - - [[nodiscard]] static std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome) - { - switch (Outcome) - { - case ComputeTaskOutcome::Success: - return "Success"sv; - case ComputeTaskOutcome::Failed: - return "Failed"sv; - case ComputeTaskOutcome::Cancelled: - return "Cancelled"sv; - case ComputeTaskOutcome::NoResult: - return "NoResult"sv; - case ComputeTaskOutcome::Exipred: - return "Exipred"sv; - case ComputeTaskOutcome::BlobNotFound: - return "BlobNotFound"sv; - case ComputeTaskOutcome::Exception: - return "Exception"sv; - }; - return "Unknown"sv; - } - - virtual GetUpstreamApplyUpdatesResult GetUpdates(WorkerThreadPool& ThreadPool) override - { - int64_t Bytes{}; - double ElapsedSeconds{}; - - { - std::scoped_lock Lock(m_TaskMutex); - if (m_PendingTasks.empty()) - { - if (m_CompletedTasks.empty()) - { - // Nothing to do. - return {.Success = true}; - } - - UpstreamApplyCompleted CompletedTasks; - std::swap(CompletedTasks, m_CompletedTasks); - return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; - } - } - - try - { - CloudCacheSession ComputeSession(m_Client); - - CloudCacheResult UpdatesResult = ComputeSession.GetComputeUpdates(m_ChannelId); - Log().debug("Get compute updates Bytes={} Duration={}s Result={}", - UpdatesResult.Bytes, - UpdatesResult.ElapsedSeconds, - UpdatesResult.Success); - Bytes += UpdatesResult.Bytes; - ElapsedSeconds += UpdatesResult.ElapsedSeconds; - if (!UpdatesResult.Success) - { - return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - - if (!UpdatesResult.Success) - { - return {.Error{.ErrorCode = -1, .Reason = "Failed get task updates"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; - } - - CbObject TaskStatus = LoadCompactBinaryObject(std::move(UpdatesResult.Response)); - - for (auto& It : TaskStatus["u"sv]) - { - CbObjectView Status = It.AsObjectView(); - IoHash TaskId = Status["h"sv].AsHash(); - const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32(); - const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)Status["o"sv].AsInt32(); - - Log().info("Task {} State={}", TaskId, ComputeTaskStateToString(State)); - - // Only completed tasks need to be processed - if (State != ComputeTaskState::Complete) - { - continue; - } - - IoHash WorkerId{}; - IoHash ActionId{}; - UpstreamApplyType ApplyType{}; - - { - std::scoped_lock Lock(m_TaskMutex); - auto TaskIt = m_PendingTasks.find(TaskId); - if (TaskIt != m_PendingTasks.end()) - { - WorkerId = TaskIt->second.WorkerDescriptor.GetHash(); - ActionId = TaskIt->second.Action.GetHash(); - ApplyType = TaskIt->second.Type; - m_PendingTasks.erase(TaskIt); - } - } - - if (WorkerId == IoHash::Zero) - { - Log().warn("Task {} missing from pending tasks", TaskId); - continue; - } - - if (Outcome != ComputeTaskOutcome::Success) - { - const std::string_view Detail = Status["d"sv].AsString(); - { - std::scoped_lock Lock(m_TaskMutex); - m_CompletedTasks[WorkerId][ActionId] = { - .Error{.ErrorCode = -1, .Reason = fmt::format("Task {} {}", ComputeTaskOutcomeToString(Outcome), Detail)}}; - } - continue; - } - - ThreadPool.ScheduleWork([this, - ApplyType, - ResultHash = Status["r"sv].AsHash(), - TaskId = std::move(TaskId), - WorkerId = std::move(WorkerId), - ActionId = std::move(ActionId)]() { - GetUpstreamApplyResult Result = ProcessTaskStatus(ApplyType, ResultHash); - Log().debug("Task Processed {} Files={} Attachments={} ExitCode={}", - TaskId, - Result.OutputFiles.size(), - Result.OutputPackage.GetAttachments().size(), - Result.Error.ErrorCode); - { - std::scoped_lock Lock(m_TaskMutex); - m_CompletedTasks[WorkerId][ActionId] = std::move(Result); - } - }); - } - - { - std::scoped_lock Lock(m_TaskMutex); - if (m_CompletedTasks.empty()) - { - // Nothing to do. - return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; - } - UpstreamApplyCompleted CompletedTasks; - std::swap(CompletedTasks, m_CompletedTasks); - return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; - } - } - catch (std::exception& Err) - { - m_HealthOk = false; - return { - .Error{.ErrorCode = -1, .Reason = Err.what()}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - }; - } - } - - virtual UpstreamApplyEndpointStats& Stats() override { return m_Stats; } - - private: - spdlog::logger& Log() { return m_Log; } - - spdlog::logger& m_Log; - CasStore& m_CasStore; - CidStore& m_CidStore; - AuthMgr& m_AuthMgr; - std::string m_DisplayName; - RefPtr<CloudCacheClient> m_Client; - RefPtr<CloudCacheClient> m_StorageClient; - UpstreamApplyEndpointStats m_Stats; - std::atomic_bool m_HealthOk{false}; - std::string m_ChannelId; - - std::mutex m_TaskMutex; - std::unordered_map<IoHash, UpstreamApplyRecord> m_PendingTasks; - UpstreamApplyCompleted m_CompletedTasks; - - struct UpstreamData - { - std::map<IoHash, IoBuffer> Blobs; - std::map<IoHash, CbObject> Objects; - IoHash TaskId; - IoHash RequirementsId; - }; - - struct UpstreamDirectory - { - std::filesystem::path Path; - std::map<std::string, UpstreamDirectory> Directories; - std::set<std::string> Files; - }; - - [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const UpstreamApplyType ApplyType, const IoHash& ResultHash) - { - try - { - CloudCacheSession Session(m_StorageClient); - - int64_t Bytes{}; - double ElapsedSeconds{}; - - // Get Result object and all Object Attachments + Binary Attachment IDs - CloudCacheResult ObjectRefResult = Session.GetRef("responses"sv, ResultHash, ZenContentType::kCbObject); - Log().debug("Get ref {} Bytes={} Duration={}s Result={}", - ResultHash, - ObjectRefResult.Bytes, - ObjectRefResult.ElapsedSeconds, - ObjectRefResult.Success); - Bytes += ObjectRefResult.Bytes; - ElapsedSeconds += ObjectRefResult.ElapsedSeconds; - - if (!ObjectRefResult.Success) - { - return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object data"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - - std::vector<IoHash> ObjectsToIterate; - std::map<IoHash, IoBuffer> ObjectData; - std::map<IoHash, IoBuffer> BinaryData; - - ObjectData[ResultHash] = ObjectRefResult.Response; - CbObject Object = LoadCompactBinaryObject(ObjectData[ResultHash]); - Object.IterateAttachments([&](CbFieldView Field) { - if (Field.IsObjectAttachment()) - { - const IoHash AttachmentHash = Field.AsObjectAttachment(); - if (!ObjectData.contains(AttachmentHash)) - { - ObjectsToIterate.push_back(AttachmentHash); - } - } - else if (Field.IsBinaryAttachment()) - { - const IoHash AttachmentHash = Field.AsBinaryAttachment(); - BinaryData[AttachmentHash] = {}; - } - }); - - while (!ObjectsToIterate.empty()) - { - const IoHash Hash = ObjectsToIterate.back(); - ObjectsToIterate.pop_back(); - - CloudCacheResult ObjectResult = Session.GetObject(Hash); - Log().debug("Get object {} Bytes={} Duration={}s Result={}", - Hash, - ObjectResult.Bytes, - ObjectResult.ElapsedSeconds, - ObjectResult.Success); - Bytes += ObjectRefResult.Bytes; - ElapsedSeconds += ObjectRefResult.ElapsedSeconds; - if (!ObjectResult.Success) - { - return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - ObjectData[Hash] = std::move(ObjectResult.Response); - - CbObject IterateObject = LoadCompactBinaryObject(ObjectData[Hash]); - IterateObject.IterateAttachments([&](CbFieldView Field) { - if (Field.IsObjectAttachment()) - { - const IoHash AttachmentHash = Field.AsObjectAttachment(); - if (!ObjectData.contains(AttachmentHash)) - { - ObjectsToIterate.push_back(AttachmentHash); - } - } - else if (Field.IsBinaryAttachment()) - { - const IoHash AttachmentHash = Field.AsBinaryAttachment(); - BinaryData[AttachmentHash] = {}; - } - }); - } - - // Batch load all binary data - for (auto& It : BinaryData) - { - CloudCacheResult BlobResult = Session.GetBlob(It.first); - Log().debug("Get blob {} Bytes={} Duration={}s Result={}", - It.first, - BlobResult.Bytes, - BlobResult.ElapsedSeconds, - BlobResult.Success); - Bytes += ObjectRefResult.Bytes; - ElapsedSeconds += ObjectRefResult.ElapsedSeconds; - if (!BlobResult.Success) - { - return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - It.second = std::move(BlobResult.Response); - } - - CbObject ResultObject = LoadCompactBinaryObject(ObjectData[ResultHash]); - int32_t ExitCode = ResultObject["e"sv].AsInt32(); - IoHash StdOutHash = ResultObject["so"sv].AsBinaryAttachment(); - IoHash StdErrHash = ResultObject["se"sv].AsBinaryAttachment(); - IoHash OutputHash = ResultObject["o"sv].AsObjectAttachment(); - - std::string StdOut = std::string((const char*)BinaryData[StdOutHash].GetData(), BinaryData[StdOutHash].GetSize()); - std::string StdErr = std::string((const char*)BinaryData[StdErrHash].GetData(), BinaryData[StdErrHash].GetSize()); - - if (OutputHash == IoHash::Zero) - { - return {.Error{.ErrorCode = ExitCode, .Reason = "Task completed with no output object"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .StdOut = std::move(StdOut), - .StdErr = std::move(StdErr)}; - } - - CbObject OutputObject = LoadCompactBinaryObject(ObjectData[OutputHash]); - - switch (ApplyType) - { - case UpstreamApplyType::Simple: - { - std::map<std::filesystem::path, IoHash> OutputFiles; - - ResolveMerkleTreeDirectory(""sv, OutputHash, ObjectData, OutputFiles); - - return {.OutputFiles = std::move(OutputFiles), - .FileData = std::move(BinaryData), - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .StdOut = std::move(StdOut), - .StdErr = std::move(StdErr), - .Success = true}; - } - break; - case UpstreamApplyType::Asset: - { - if (ExitCode != 0) - { - return {.Error{.ErrorCode = ExitCode, .Reason = "Task completed with errors"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .StdOut = std::move(StdOut), - .StdErr = std::move(StdErr)}; - } - - // Get build.output - IoHash BuildOutputId; - IoBuffer BuildOutput; - for (auto& It : OutputObject["f"sv]) - { - const CbObjectView FileObject = It.AsObjectView(); - if (FileObject["n"sv].AsString() == "Build.output"sv) - { - BuildOutputId = FileObject["h"sv].AsBinaryAttachment(); - BuildOutput = BinaryData[BuildOutputId]; - break; - } - } - - if (BuildOutput.GetSize() == 0) - { - return {.Error{.ErrorCode = ExitCode, .Reason = "Build.output file not found in task results"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .StdOut = std::move(StdOut), - .StdErr = std::move(StdErr)}; - } - - // Get Output directory node - IoBuffer OutputDirectoryTree; - for (auto& It : OutputObject["d"sv]) - { - const CbObjectView DirectoryObject = It.AsObjectView(); - if (DirectoryObject["n"sv].AsString() == "Outputs"sv) - { - OutputDirectoryTree = ObjectData[DirectoryObject["h"sv].AsObjectAttachment()]; - break; - } - } - - if (OutputDirectoryTree.GetSize() == 0) - { - return {.Error{.ErrorCode = ExitCode, .Reason = "Outputs directory not found in task results"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .StdOut = std::move(StdOut), - .StdErr = std::move(StdErr)}; - } - - // load build.output as CbObject - - // Move Outputs from Horde to CbPackage - - std::unordered_map<IoHash, IoHash> CidToCompressedId; - CbPackage OutputPackage; - CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree); - int64_t TotalAttachmentBytes = 0; - int64_t TotalRawAttachmentBytes = 0; - - for (auto& It : OutputDirectoryTreeObject["f"sv]) - { - CbObjectView FileObject = It.AsObjectView(); - // Name is the uncompressed hash - IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString()); - // Hash is the compressed data hash, and how it is stored in Horde - IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment(); - - if (!BinaryData.contains(CompressedId)) - { - Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId); - return {.Error{.ErrorCode = -1, .Reason = "Object attachment chunk not retrieved from Horde"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - CidToCompressedId[DecompressedId] = CompressedId; - } - - // Iterate attachments, verify all chunks exist, and add to CbPackage - bool AnyErrors = false; - CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput); - BuildOutputObject.IterateAttachments([&](CbFieldView Field) { - const IoHash DecompressedId = Field.AsHash(); - if (!CidToCompressedId.contains(DecompressedId)) - { - Log().warn("Attachment not found {}", DecompressedId); - AnyErrors = true; - return; - } - const IoHash& CompressedId = CidToCompressedId.at(DecompressedId); - - if (!BinaryData.contains(CompressedId)) - { - Log().warn("Missing output {} compressed {} uncompressed", CompressedId, DecompressedId); - AnyErrors = true; - return; - } - - CompressedBuffer AttachmentBuffer = - CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId])); - - if (!AttachmentBuffer) - { - Log().warn( - "Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed", - CompressedId, - DecompressedId); - AnyErrors = true; - return; - } - - TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); - TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize(); - - CbAttachment Attachment(AttachmentBuffer); - OutputPackage.AddAttachment(Attachment); - }); - - if (AnyErrors) - { - return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .StdOut = std::move(StdOut), - .StdErr = std::move(StdErr)}; - } - - OutputPackage.SetObject(BuildOutputObject); - - return {.OutputPackage = std::move(OutputPackage), - .TotalAttachmentBytes = TotalAttachmentBytes, - .TotalRawAttachmentBytes = TotalRawAttachmentBytes, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .StdOut = std::move(StdOut), - .StdErr = std::move(StdErr), - .Success = true}; - } - break; - } - - return {.Error{.ErrorCode = ExitCode, .Reason = "Unknown apply type"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .StdOut = std::move(StdOut), - .StdErr = std::move(StdErr)}; - } - catch (std::exception& Err) - { - return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; - } - } - - [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data) - { - std::string ExecutablePath; - std::string WorkingDirectory; - std::vector<std::string> Arguments; - std::map<std::string, std::string> Environment; - std::set<std::filesystem::path> InputFiles; - std::set<std::string> Outputs; - std::map<std::filesystem::path, IoHash> InputFileHashes; - - ExecutablePath = ApplyRecord.WorkerDescriptor["path"sv].AsString(); - if (ExecutablePath.empty()) - { - Log().warn("process apply upstream FAILED, '{}', path missing from worker descriptor", - ApplyRecord.WorkerDescriptor.GetHash()); - return false; - } - - WorkingDirectory = ApplyRecord.WorkerDescriptor["workdir"sv].AsString(); - - for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv]) - { - CbObjectView FileEntry = It.AsObjectView(); - if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) - { - return false; - } - } - - for (auto& It : ApplyRecord.WorkerDescriptor["files"sv]) - { - CbObjectView FileEntry = It.AsObjectView(); - if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) - { - return false; - } - } - - for (auto& It : ApplyRecord.WorkerDescriptor["dirs"sv]) - { - std::string_view Directory = It.AsString(); - std::string DummyFile = fmt::format("{}/.zen_empty_file", Directory); - InputFiles.insert(DummyFile); - Data.Blobs[EmptyBufferId] = EmptyBuffer; - InputFileHashes[DummyFile] = EmptyBufferId; - } - - for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv]) - { - std::string_view Env = It.AsString(); - auto Index = Env.find('='); - if (Index == std::string_view::npos) - { - Log().warn("process apply upstream FAILED, environment '{}' malformed", Env); - return false; - } - - Environment[std::string(Env.substr(0, Index))] = Env.substr(Index + 1); - } - - switch (ApplyRecord.Type) - { - case UpstreamApplyType::Simple: - { - for (auto& It : ApplyRecord.WorkerDescriptor["arguments"sv]) - { - Arguments.push_back(std::string(It.AsString())); - } - - for (auto& It : ApplyRecord.WorkerDescriptor["outputs"sv]) - { - Outputs.insert(std::string(It.AsString())); - } - } - break; - case UpstreamApplyType::Asset: - { - static const std::filesystem::path BuildActionPath = "Build.action"sv; - static const std::filesystem::path InputPath = "Inputs"sv; - const IoHash ActionId = ApplyRecord.Action.GetHash(); - - Arguments.push_back("-Build=build.action"); - Outputs.insert("Build.output"); - Outputs.insert("Outputs"); - - InputFiles.insert(BuildActionPath); - InputFileHashes[BuildActionPath] = ActionId; - Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(), - ApplyRecord.Action.GetBuffer().GetSize()); - - bool AnyErrors = false; - ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) { - const IoHash Cid = Field.AsHash(); - const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()}; - IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid); - - if (!DataBuffer) - { - Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid); - AnyErrors = true; - return; - } - - if (InputFiles.contains(FilePath)) - { - return; - } - - const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize()); - - InputFiles.insert(FilePath); - InputFileHashes[FilePath] = CompressedId; - Data.Blobs[CompressedId] = std::move(DataBuffer); - }); - - if (AnyErrors) - { - return false; - } - } - break; - } - - const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles); - - CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Blobs, Data.Objects); - const IoHash SandboxHash = Sandbox.GetHash(); - Data.Objects[SandboxHash] = std::move(Sandbox); - - { - std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString(); - if (HostPlatform.empty()) - { - Log().warn("process apply upstream FAILED, 'host' platform not provided"); - return false; - } - - int32_t LogicalCores = ApplyRecord.WorkerDescriptor["cores"sv].AsInt32(); - int64_t Memory = ApplyRecord.WorkerDescriptor["memory"sv].AsInt64(); - bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool(); - - std::string Condition = fmt::format("Platform == '{}'", HostPlatform); - if (HostPlatform == "Win64") - { - // TODO - // Condition += " && Pool == 'Win-RemoteExec'"; - } - - std::map<std::string_view, int64_t> Resources; - if (LogicalCores > 0) - { - Resources["LogicalCores"sv] = LogicalCores; - } - if (Memory > 0) - { - Resources["RAM"sv] = std::max(Memory / 1024LL / 1024LL / 1024LL, 1LL); - } - - CbObject Requirements = BuildRequirements(Condition, Resources, Exclusive); - const IoHash RequirementsId = Requirements.GetHash(); - Data.Objects[RequirementsId] = std::move(Requirements); - Data.RequirementsId = RequirementsId; - } - - CbObject Task = BuildTask(ExecutablePath, Arguments, Environment, WorkingDirectory, SandboxHash, Data.RequirementsId, Outputs); - - const IoHash TaskId = Task.GetHash(); - Data.Objects[TaskId] = std::move(Task); - Data.TaskId = TaskId; - - return true; - } - - [[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry, - std::set<std::filesystem::path>& InputFiles, - std::map<std::filesystem::path, IoHash>& InputFileHashes, - std::map<IoHash, IoBuffer>& Blobs) - { - const std::filesystem::path FilePath = FileEntry["name"sv].AsString(); - const IoHash ChunkId = FileEntry["hash"sv].AsHash(); - const uint64_t Size = FileEntry["size"sv].AsUInt64(); - IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkId); - - if (!DataBuffer) - { - Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId); - return false; - } - - if (DataBuffer.Size() != Size) - { - Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {}, action spec expected {}", - ChunkId, - DataBuffer.Size(), - Size); - return false; - } - - if (InputFiles.contains(FilePath)) - { - Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath); - return false; - } - - InputFiles.insert(FilePath); - InputFileHashes[FilePath] = ChunkId; - Blobs[ChunkId] = std::move(DataBuffer); - return true; - } - - [[nodiscard]] UpstreamDirectory BuildDirectoryTree(const std::set<std::filesystem::path>& InputFiles) - { - static const std::filesystem::path RootPath; - std::map<std::filesystem::path, UpstreamDirectory*> AllDirectories; - UpstreamDirectory RootDirectory = {.Path = RootPath}; - - AllDirectories[RootPath] = &RootDirectory; - - // Build tree from flat list - for (const auto& Path : InputFiles) - { - if (Path.has_parent_path()) - { - if (!AllDirectories.contains(Path.parent_path())) - { - std::stack<std::string> PathSplit; - { - std::filesystem::path ParentPath = Path.parent_path(); - PathSplit.push(ParentPath.filename().string()); - while (ParentPath.has_parent_path()) - { - ParentPath = ParentPath.parent_path(); - PathSplit.push(ParentPath.filename().string()); - } - } - UpstreamDirectory* ParentPtr = &RootDirectory; - while (!PathSplit.empty()) - { - if (!ParentPtr->Directories.contains(PathSplit.top())) - { - std::filesystem::path NewParentPath = {ParentPtr->Path / PathSplit.top()}; - ParentPtr->Directories[PathSplit.top()] = {.Path = NewParentPath}; - AllDirectories[NewParentPath] = &ParentPtr->Directories[PathSplit.top()]; - } - ParentPtr = &ParentPtr->Directories[PathSplit.top()]; - PathSplit.pop(); - } - } - - AllDirectories[Path.parent_path()]->Files.insert(Path.filename().string()); - } - else - { - RootDirectory.Files.insert(Path.filename().string()); - } - } - - return RootDirectory; - } - - [[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory, - const std::map<std::filesystem::path, IoHash>& InputFileHashes, - const std::map<IoHash, IoBuffer>& Blobs, - std::map<IoHash, CbObject>& Objects) - { - CbObjectWriter DirectoryTreeWriter; - - if (!RootDirectory.Files.empty()) - { - DirectoryTreeWriter.BeginArray("f"sv); - for (const auto& File : RootDirectory.Files) - { - const std::filesystem::path FilePath = {RootDirectory.Path / File}; - const IoHash& FileHash = InputFileHashes.at(FilePath); - const uint64_t FileSize = Blobs.at(FileHash).Size(); - DirectoryTreeWriter.BeginObject(); - DirectoryTreeWriter.AddString("n"sv, File); - DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash); - DirectoryTreeWriter.AddInteger("s"sv, FileSize); // Size - // DirectoryTreeWriter.AddInteger("a"sv, 0); // Attributes Currently unneeded - DirectoryTreeWriter.EndObject(); - } - DirectoryTreeWriter.EndArray(); - } - - if (!RootDirectory.Directories.empty()) - { - DirectoryTreeWriter.BeginArray("d"sv); - for (const auto& Item : RootDirectory.Directories) - { - CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Blobs, Objects); - const IoHash DirectoryHash = Directory.GetHash(); - Objects[DirectoryHash] = std::move(Directory); - - DirectoryTreeWriter.BeginObject(); - DirectoryTreeWriter.AddString("n"sv, Item.first); - DirectoryTreeWriter.AddObjectAttachment("h"sv, DirectoryHash); - DirectoryTreeWriter.EndObject(); - } - DirectoryTreeWriter.EndArray(); - } - - return DirectoryTreeWriter.Save(); - } - - void ResolveMerkleTreeDirectory(const std::filesystem::path& ParentDirectory, - const IoHash& DirectoryHash, - const std::map<IoHash, IoBuffer>& Objects, - std::map<std::filesystem::path, IoHash>& OutputFiles) - { - CbObject Directory = LoadCompactBinaryObject(Objects.at(DirectoryHash)); - - for (auto& It : Directory["f"sv]) - { - const CbObjectView FileObject = It.AsObjectView(); - const std::filesystem::path Path = ParentDirectory / FileObject["n"sv].AsString(); - - OutputFiles[Path] = FileObject["h"sv].AsBinaryAttachment(); - } - - for (auto& It : Directory["d"sv]) - { - const CbObjectView DirectoryObject = It.AsObjectView(); - - ResolveMerkleTreeDirectory(ParentDirectory / DirectoryObject["n"sv].AsString(), - DirectoryObject["h"sv].AsObjectAttachment(), - Objects, - OutputFiles); - } - } - - [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition, - const std::map<std::string_view, int64_t>& Resources, - const bool Exclusive) - { - CbObjectWriter Writer; - Writer.AddString("c", Condition); - if (!Resources.empty()) - { - Writer.BeginArray("r"); - for (const auto& Resource : Resources) - { - Writer.BeginArray(); - Writer.AddString(Resource.first); - Writer.AddInteger(Resource.second); - Writer.EndArray(); - } - Writer.EndArray(); - } - Writer.AddBool("e", Exclusive); - return Writer.Save(); - } - - [[nodiscard]] CbObject BuildTask(const std::string_view Executable, - const std::vector<std::string>& Arguments, - const std::map<std::string, std::string>& Environment, - const std::string_view WorkingDirectory, - const IoHash& SandboxHash, - const IoHash& RequirementsId, - const std::set<std::string>& Outputs) - { - CbObjectWriter TaskWriter; - TaskWriter.AddString("e"sv, Executable); - - if (!Arguments.empty()) - { - TaskWriter.BeginArray("a"sv); - for (const auto& Argument : Arguments) - { - TaskWriter.AddString(Argument); - } - TaskWriter.EndArray(); - } - - if (!Environment.empty()) - { - TaskWriter.BeginArray("v"sv); - for (const auto& Env : Environment) - { - TaskWriter.BeginArray(); - TaskWriter.AddString(Env.first); - TaskWriter.AddString(Env.second); - TaskWriter.EndArray(); - } - TaskWriter.EndArray(); - } - - if (!WorkingDirectory.empty()) - { - TaskWriter.AddString("w"sv, WorkingDirectory); - } - - TaskWriter.AddObjectAttachment("s"sv, SandboxHash); - TaskWriter.AddObjectAttachment("r"sv, RequirementsId); - - // Outputs - if (!Outputs.empty()) - { - TaskWriter.BeginArray("o"sv); - for (const auto& Output : Outputs) - { - TaskWriter.AddString(Output); - } - TaskWriter.EndArray(); - } - - return TaskWriter.Save(); - } - }; -} // namespace detail - -////////////////////////////////////////////////////////////////////////// - struct UpstreamApplyStats { static constexpr uint64_t MaxSampleCount = 1000ull; @@ -1360,7 +83,8 @@ public: , m_CasStore(CasStore) , m_CidStore(CidStore) , m_Stats(Options.StatsEnabled) - , m_AsyncWorkPool(Options.ThreadCount) + , m_UpstreamAsyncWorkPool(Options.UpstreamThreadCount) + , m_DownstreamAsyncWorkPool(Options.DownstreamThreadCount) { } @@ -1423,7 +147,9 @@ public: m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)}; } - m_AsyncWorkPool.ScheduleWork([this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); }); + ApplyRecord.Timepoints["zen-queue-added"] = DateTime::NowTicks(); + m_UpstreamAsyncWorkPool.ScheduleWork( + [this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); }); return {.ApplyId = ActionId, .Success = true}; } @@ -1447,8 +173,10 @@ public: virtual void GetStatus(CbObjectWriter& Status) override { - Status << "worker_threads" << m_Options.ThreadCount; - Status << "queue_count" << m_AsyncWorkPool.PendingWork(); + Status << "upstream_worker_threads" << m_Options.UpstreamThreadCount; + Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWork(); + Status << "downstream_worker_threads" << m_Options.DownstreamThreadCount; + Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWork(); Status.BeginArray("endpoints"); for (const auto& Ep : m_Endpoints) @@ -1501,11 +229,14 @@ private: { if (Endpoint->IsHealthy()) { - PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord)); + ApplyRecord.Timepoints["zen-queue-dispatched"] = DateTime::NowTicks(); + PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord)); { std::scoped_lock Lock(m_ApplyTasksMutex); if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) { + Status->Timepoints.merge(Result.Timepoints); + if (Result.Success) { Status->State = UpstreamApplyState::Executing; @@ -1553,7 +284,7 @@ private: { if (Endpoint->IsHealthy()) { - GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_AsyncWorkPool); + GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_DownstreamAsyncWorkPool); m_Stats.Add(*Endpoint, Result); if (!Result.Success) @@ -1572,6 +303,9 @@ private: { Status->State = UpstreamApplyState::Complete; Status->Result = std::move(It2.second); + Status->Result.Timepoints.merge(Status->Timepoints); + Status->Result.Timepoints["zen-queue-complete"] = DateTime::NowTicks(); + Status->Timepoints.clear(); } } } @@ -1686,7 +420,8 @@ private: std::mutex m_ApplyTasksMutex; std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints; Event m_ShutdownEvent; - WorkerThreadPool m_AsyncWorkPool; + WorkerThreadPool m_UpstreamAsyncWorkPool; + WorkerThreadPool m_DownstreamAsyncWorkPool; std::thread m_UpstreamUpdatesThread; std::thread m_EndpointMonitorThread; RunState m_RunState; @@ -1700,24 +435,6 @@ UpstreamApply::Create(const UpstreamApplyOptions& Options, CasStore& CasStore, C return std::make_unique<UpstreamApplyImpl>(Options, CasStore, CidStore); } -std::unique_ptr<UpstreamApplyEndpoint> -UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions, - const UpstreamAuthConfig& ComputeAuthConfig, - const CloudCacheClientOptions& StorageOptions, - const UpstreamAuthConfig& StorageAuthConfig, - CasStore& CasStore, - CidStore& CidStore, - AuthMgr& Mgr) -{ - return std::make_unique<detail::HordeUpstreamApplyEndpoint>(ComputeOptions, - ComputeAuthConfig, - StorageOptions, - StorageAuthConfig, - CasStore, - CidStore, - Mgr); -} - } // namespace zen #endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h index 9cea88c56..c6e38142c 100644 --- a/zenserver/upstream/upstreamapply.h +++ b/zenserver/upstream/upstreamapply.h @@ -10,10 +10,8 @@ # include <zencore/stats.h> # include <zencore/zencore.h> -# include <atomic> # include <chrono> # include <map> -# include <memory> # include <unordered_map> # include <unordered_set> @@ -44,17 +42,19 @@ enum class UpstreamApplyType struct UpstreamApplyRecord { - CbObject WorkerDescriptor; - CbObject Action; - UpstreamApplyType Type; + CbObject WorkerDescriptor; + CbObject Action; + UpstreamApplyType Type; + std::map<std::string, uint64_t> Timepoints{}; }; struct UpstreamApplyOptions { std::chrono::seconds HealthCheckInterval{5}; std::chrono::seconds UpdatesInterval{5}; - uint32_t ThreadCount = 4; - bool StatsEnabled = false; + uint32_t UpstreamThreadCount = 4; + uint32_t DownstreamThreadCount = 4; + bool StatsEnabled = false; }; struct UpstreamApplyError @@ -67,31 +67,33 @@ struct UpstreamApplyError struct PostUpstreamApplyResult { - UpstreamApplyError Error{}; - int64_t Bytes{}; - double ElapsedSeconds{}; - bool Success = false; + UpstreamApplyError Error{}; + int64_t Bytes{}; + double ElapsedSeconds{}; + std::map<std::string, uint64_t> Timepoints{}; + bool Success = false; }; struct GetUpstreamApplyResult { // UpstreamApplyType::Simple - std::map<std::filesystem::path, IoHash> OutputFiles; - std::map<IoHash, IoBuffer> FileData; + std::map<std::filesystem::path, IoHash> OutputFiles{}; + std::map<IoHash, IoBuffer> FileData{}; // UpstreamApplyType::Asset CbPackage OutputPackage{}; int64_t TotalAttachmentBytes{}; int64_t TotalRawAttachmentBytes{}; - UpstreamApplyError Error{}; - int64_t Bytes{}; - double ElapsedSeconds{}; - std::string StdOut{}; - std::string StdErr{}; - std::string Agent{}; - std::string Detail{}; - bool Success = false; + UpstreamApplyError Error{}; + int64_t Bytes{}; + double ElapsedSeconds{}; + std::string StdOut{}; + std::string StdErr{}; + std::string Agent{}; + std::string Detail{}; + std::map<std::string, uint64_t> Timepoints{}; + bool Success = false; }; using UpstreamApplyCompleted = std::unordered_map<IoHash, std::unordered_map<IoHash, GetUpstreamApplyResult>>; @@ -110,6 +112,7 @@ struct UpstreamApplyStatus UpstreamApplyState State{}; GetUpstreamApplyResult Result{}; std::chrono::steady_clock::time_point ExpireTime{}; + std::map<std::string, uint64_t> Timepoints{}; }; using UpstreamApplyTasks = std::unordered_map<IoHash, std::unordered_map<IoHash, UpstreamApplyStatus>>; diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 3ac5b9992..abaec888a 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -850,7 +850,7 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions) const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; // Horde compute upstream - if (UpstreamConfig.HordeConfig.Url.empty() == false && UpstreamConfig.JupiterConfig.Url.empty() == false) + if (UpstreamConfig.HordeConfig.Url.empty() == false && UpstreamConfig.HordeConfig.StorageUrl.empty() == false) { ZEN_INFO("instantiating compute service"); @@ -861,8 +861,7 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions) .ServiceUrl = UpstreamConfig.HordeConfig.Url, .ComputeCluster = UpstreamConfig.HordeConfig.Cluster, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), - .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds), - .UseLegacyDdc = false}; + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; auto ComputeAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.OAuthUrl, .OAuthClientId = UpstreamConfig.HordeConfig.OAuthClientId, @@ -872,16 +871,16 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions) auto StorageOptions = zen::CloudCacheClientOptions{.Name = EndpointName, - .ServiceUrl = UpstreamConfig.JupiterConfig.Url, + .ServiceUrl = UpstreamConfig.HordeConfig.StorageUrl, .BlobStoreNamespace = UpstreamConfig.HordeConfig.Namespace, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; - auto StorageAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl, - .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId, - .OAuthClientSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret, - .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider, - .AccessToken = UpstreamConfig.JupiterConfig.AccessToken}; + auto StorageAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.StorageOAuthUrl, + .OAuthClientId = UpstreamConfig.HordeConfig.StorageOAuthClientId, + .OAuthClientSecret = UpstreamConfig.HordeConfig.StorageOAuthClientSecret, + .OpenIdProvider = UpstreamConfig.HordeConfig.StorageOpenIdProvider, + .AccessToken = UpstreamConfig.HordeConfig.StorageAccessToken}; m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore, *m_CidStore, @@ -893,7 +892,7 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions) } else { - ZEN_INFO("NOT instantiating compute service (missing Horde or Jupiter config)"); + ZEN_INFO("NOT instantiating compute service (missing Horde or Storage config)"); } } #endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/zenstore-test/zenstore-test.cpp b/zenstore-test/zenstore-test.cpp index 030b1159d..ebc0806d5 100644 --- a/zenstore-test/zenstore-test.cpp +++ b/zenstore-test/zenstore-test.cpp @@ -1,7 +1,7 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include <zencore/logging.h> #include <zencore/filesystem.h> +#include <zencore/logging.h> #include <zencore/zencore.h> #include <zenstore/zenstore.h> |