diff options
Diffstat (limited to 'zenserver/upstream/upstreamapply.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 1492 |
1 files changed, 1492 insertions, 0 deletions
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp new file mode 100644 index 000000000..3f1b0d8f9 --- /dev/null +++ b/zenserver/upstream/upstreamapply.cpp @@ -0,0 +1,1492 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "upstreamapply.h" +#include "jupiter.h" +#include "zen.h" + +#include <zencore/blockingqueue.h> +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/compress.h> +#include <zencore/fmtutils.h> +#include <zencore/session.h> +#include <zencore/stats.h> +#include <zencore/stream.h> +#include <zencore/timer.h> + +#include <zenstore/cas.h> +#include <zenstore/cidstore.h> + +#include "cache/structuredcachestore.h" +#include "diag/logging.h" + +#include <fmt/format.h> + +#include <algorithm> +#include <atomic> +#include <map> +#include <set> +#include <stack> +#include <thread> +#include <unordered_map> + +namespace zen { + +using namespace std::literals; + +namespace detail { + + class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint + { + public: + HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& Options, CasStore& CasStore, CidStore& CidStore) + : m_Log(logging::Get("upstream-apply")) + , m_CasStore(CasStore) + , m_CidStore(CidStore) + { + using namespace fmt::literals; + m_DisplayName = "Horde - '{}'"_format(Options.ServiceUrl); + m_Client = new CloudCacheClient(Options); + m_ChannelId = "zen-{}"_format(zen::GetSessionIdString()); + } + + virtual ~HordeUpstreamApplyEndpoint() = default; + + virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); } + + virtual bool IsHealthy() const override { return m_HealthOk.load(); } + + virtual UpstreamEndpointHealth CheckHealth() override + { + try + { + CloudCacheSession Session(m_Client); + CloudCacheResult Result = Session.Authenticate(); + + m_HealthOk = Result.ErrorCode == 0; + + return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; + } + catch (std::exception& Err) + { + return {.Reason = Err.what(), .Ok = false}; + } + } + + virtual std::string_view DisplayName() const override { return m_DisplayName; } + + virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) override + { + int64_t Bytes{}; + double ElapsedSeconds{}; + + try + { + UpstreamData UpstreamData; + if (!ProcessApplyKey(ApplyRecord, UpstreamData)) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to generate task data"}}; + } + + { + std::scoped_lock Lock(m_TaskMutex); + if (m_PendingTasks.contains(UpstreamData.TaskId)) + { + // Pending task is already queued, return success + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + m_PendingTasks[UpstreamData.TaskId] = ApplyRecord; + } + + CloudCacheSession Session(m_Client); + + { + CloudCacheResult Result = BatchPutBlobsIfMissing(Session, UpstreamData.Blobs); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + UpstreamData.Blobs.clear(); + } + + { + CloudCacheResult Result = BatchPutObjectsIfMissing(Session, UpstreamData.Objects); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + UpstreamData.Objects.clear(); + } + + CbObjectWriter Writer; + Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId); + Writer.BeginArray("t"sv); + Writer.AddObjectAttachment(UpstreamData.TaskId); + Writer.EndArray(); + IoBuffer TasksData = Writer.Save().GetBuffer().AsIoBuffer(); + + CloudCacheResult Result = Session.PostComputeTasks(m_ChannelId, TasksData); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + { + std::scoped_lock Lock(m_TaskMutex); + m_PendingTasks.erase(UpstreamData.TaskId); + } + + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + catch (std::exception& Err) + { + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; + } + } + + [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, const std::map<IoHash, IoBuffer>& Blobs) + { + if (Blobs.size() == 0) + { + return {.Success = true}; + } + + int64_t Bytes{}; + double ElapsedSeconds{}; + + // Batch check for missing blobs + std::set<IoHash> Keys; + for (const auto& It : Blobs) + { + Keys.insert(It.first); + } + + CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys); + ElapsedSeconds += ExistsResult.ElapsedSeconds; + if (ExistsResult.ErrorCode != 0) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = ExistsResult.ErrorCode, + .Reason = std::move(ExistsResult.Reason)}; + } + + // TODO: Batch upload missing blobs + + for (const auto& It : Blobs) + { + if (ExistsResult.Have.contains(It.first)) + { + continue; + } + + CloudCacheResult Result = Session.PutBlob(It.first, It.second); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = Result.ErrorCode, + .Reason = std::move(Result.Reason)}; + } + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + + [[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map<IoHash, CbObject>& Objects) + { + if (Objects.size() == 0) + { + return {.Success = true}; + } + + int64_t Bytes{}; + double ElapsedSeconds{}; + + // Batch check for missing objects + std::set<IoHash> Keys; + for (const auto& It : Objects) + { + Keys.insert(It.first); + } + + CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys); + ElapsedSeconds += ExistsResult.ElapsedSeconds; + if (ExistsResult.ErrorCode != 0) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = ExistsResult.ErrorCode, + .Reason = std::move(ExistsResult.Reason)}; + } + + // TODO: Batch upload missing objects + + for (const auto& It : Objects) + { + if (ExistsResult.Have.contains(It.first)) + { + continue; + } + + CloudCacheResult Result = Session.PutObject(It.first, It.second.GetBuffer().AsIoBuffer()); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = Result.ErrorCode, + .Reason = std::move(Result.Reason)}; + } + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + + enum class ComputeTaskState : int32_t + { + Queued = 0, + Executing = 1, + Complete = 2, + }; + + enum class ComputeTaskOutcome : int32_t + { + Success = 0, + Failed = 1, + Cancelled = 2, + NoResult = 3, + Exipred = 4, + BlobNotFound = 5, + Exception = 6, + }; + + virtual GetUpstreamApplyUpdatesResult GetUpdates() override + { + int64_t Bytes{}; + double ElapsedSeconds{}; + UpstreamApplyCompleted CompletedTasks; + + { + std::scoped_lock Lock(m_TaskMutex); + if (m_PendingTasks.empty()) + { + // Nothing to do. + return {.Success = true}; + } + } + + try + { + CloudCacheSession Session(m_Client); + + CloudCacheResult UpdatesResult = Session.GetComputeUpdates(m_ChannelId); + Bytes += UpdatesResult.Bytes; + ElapsedSeconds += UpdatesResult.ElapsedSeconds; + if (UpdatesResult.ErrorCode != 0) + { + return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + if (!UpdatesResult.Success) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed get task updates"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; + } + + CbObject TaskStatus = LoadCompactBinaryObject(UpdatesResult.Response); + + // zen::StringBuilder<4096> ObjStr; + // zen::CompactBinaryToJson(TaskStatus, ObjStr); + + for (auto& It : TaskStatus["u"sv]) + { + CbObjectView Status = It.AsObjectView(); + const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32(); + + const std::string_view AgentId = TaskStatus["a"sv].AsString(); + const std::string_view LeaseId = TaskStatus["l"sv].AsString(); + + // Only care about completed tasks + if (State != ComputeTaskState::Complete) + { + continue; + } + + const IoHash TaskId = Status["h"sv].AsObjectAttachment(); + + IoHash WorkerId; + IoHash ActionId; + + { + std::scoped_lock Lock(m_TaskMutex); + auto TaskIt = m_PendingTasks.find(TaskId); + if (TaskIt == m_PendingTasks.end()) + { + continue; + } + WorkerId = TaskIt->second.WorkerDescriptor.GetHash(); + ActionId = TaskIt->second.Action.GetHash(); + m_PendingTasks.erase(TaskIt); + } + + GetUpstreamApplyResult Result = ProcessTaskStatus(Status, Session); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + + CompletedTasks[WorkerId][ActionId] = std::move(Result); + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; + } + catch (std::exception& Err) + { + m_HealthOk = false; + return { + .Error{.ErrorCode = -1, .Reason = Err.what()}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .Completed = std::move(CompletedTasks), + }; + } + } + + virtual UpstreamApplyEndpointStats& Stats() override { return m_Stats; } + + private: + spdlog::logger& Log() { return m_Log; } + + CasStore& m_CasStore; + CidStore& m_CidStore; + spdlog::logger& m_Log; + std::string m_DisplayName; + RefPtr<CloudCacheClient> m_Client; + UpstreamApplyEndpointStats m_Stats; + std::atomic_bool m_HealthOk{false}; + std::string m_ChannelId; + + std::mutex m_TaskMutex; + std::unordered_map<IoHash, UpstreamApplyRecord> m_PendingTasks; + + struct UpstreamData + { + std::map<IoHash, IoBuffer> Blobs; + std::map<IoHash, CbObject> Objects; + IoHash TaskId; + IoHash RequirementsId; + }; + + struct UpstreamDirectory + { + std::filesystem::path Path; + std::map<std::string, UpstreamDirectory> Directories; + std::set<std::string> Files; + }; + + [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const CbObjectView& TaskStatus, CloudCacheSession& Session) + { + try + { + const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)TaskStatus["o"sv].AsInt32(); + + if (Outcome != ComputeTaskOutcome::Success) + { + const std::string_view Detail = TaskStatus["d"sv].AsString(); + return {.Error{.ErrorCode = -1, .Reason = std::string(Detail)}}; + } + + const IoHash TaskId = TaskStatus["h"sv].AsObjectAttachment(); + const DateTime Time = TaskStatus["t"sv].AsDateTime(); + const IoHash ResultHash = TaskStatus["r"sv].AsObjectAttachment(); + const std::string_view AgentId = TaskStatus["a"sv].AsString(); + const std::string_view LeaseId = TaskStatus["l"sv].AsString(); + + int64_t Bytes{}; + double ElapsedSeconds{}; + + // Get Result object and all Object Attachments + Binary Attachment IDs + CloudCacheResult ObjectTreeResult = Session.GetObjectTree(ResultHash); + Bytes += ObjectTreeResult.Bytes; + ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; + + if (ObjectTreeResult.ErrorCode != 0) + { + return {.Error{.ErrorCode = ObjectTreeResult.ErrorCode, .Reason = std::move(ObjectTreeResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + if (!ObjectTreeResult.Success) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object data"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + std::map<IoHash, IoBuffer> TreeObjectData; + std::map<IoHash, IoBuffer> TreeBinaryData; + + MemoryView ResponseView = ObjectTreeResult.Response; + while (ResponseView.GetSize() > 0) + { + CbFieldView Field = CbFieldView(ResponseView.GetData()); + ResponseView += Field.GetSize(); + if (Field.IsObjectAttachment()) + { + const IoHash Hash = Field.AsObjectAttachment(); + Field = CbFieldView(ResponseView.GetData()); + ResponseView += Field.GetSize(); + if (!Field.IsObject()) // No data + { + TreeObjectData[Hash] = {}; + continue; + } + MemoryView FieldView = Field.AsObjectView().GetView(); + + TreeObjectData[Hash] = IoBuffer(IoBuffer::Wrap, FieldView.GetData(), FieldView.GetSize()); + } + else if (Field.IsBinaryAttachment()) + { + const IoHash Hash = Field.AsBinaryAttachment(); + TreeBinaryData[Hash] = {}; + } + else // Unknown type + { + } + } + + for (auto& It : TreeObjectData) + { + if (It.second.GetSize() == 0) + { + CloudCacheResult ObjectResult = Session.GetObject(It.first); + Bytes += ObjectTreeResult.Bytes; + ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; + if (ObjectTreeResult.ErrorCode != 0) + { + return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + if (!ObjectResult.Success) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + It.second = std::move(ObjectResult.Response); + } + } + + for (auto& It : TreeBinaryData) + { + if (It.second.GetSize() == 0) + { + CloudCacheResult BlobResult = Session.GetBlob(It.first); + Bytes += ObjectTreeResult.Bytes; + ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; + if (BlobResult.ErrorCode != 0) + { + return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + if (!BlobResult.Success) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to get result binary attachment data"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + It.second = std::move(BlobResult.Response); + } + } + + CbObject ResultObject = LoadCompactBinaryObject(TreeObjectData[ResultHash]); + int32_t ExitCode = ResultObject["e"sv].AsInt32(); + IoHash StdOutHash = ResultObject["so"sv].AsBinaryAttachment(); + IoHash StdErrHash = ResultObject["se"sv].AsBinaryAttachment(); + IoHash OutputHash = ResultObject["o"sv].AsObjectAttachment(); + + std::string StdOut = std::string((const char*)TreeBinaryData[StdOutHash].GetData(), TreeBinaryData[StdOutHash].GetSize()); + std::string StdErr = std::string((const char*)TreeBinaryData[StdErrHash].GetData(), TreeBinaryData[StdErrHash].GetSize()); + + if (ExitCode != 0) + { + return {.Error{.ErrorCode = ExitCode, .Reason = "Task completed with errors"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr)}; + } + + CbObject OutputObject = LoadCompactBinaryObject(TreeObjectData[OutputHash]); + + // Get build.output + IoHash BuildOutputId; + IoBuffer BuildOutput; + for (auto& It : OutputObject["f"sv]) + { + const CbObjectView FileObject = It.AsObjectView(); + if (FileObject["n"sv].AsString() == "Build.output"sv) + { + BuildOutputId = FileObject["h"sv].AsBinaryAttachment(); + BuildOutput = TreeBinaryData[BuildOutputId]; + break; + } + } + + if (BuildOutput.GetSize() == 0) + { + return {.Error{.ErrorCode = ExitCode, .Reason = "Build.output file not found in task results"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + // Get Output directory node + IoBuffer OutputDirectoryTree; + for (auto& It : OutputObject["d"sv]) + { + const CbObjectView DirectoryObject = It.AsObjectView(); + if (DirectoryObject["n"sv].AsString() == "Outputs"sv) + { + OutputDirectoryTree = TreeObjectData[DirectoryObject["h"sv].AsObjectAttachment()]; + break; + } + } + + if (OutputDirectoryTree.GetSize() == 0) + { + return {.Error{.ErrorCode = ExitCode, .Reason = "Outputs directory not found in task results"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + // load build.output as CbObject + + // Move Outputs from Horde to CbPackage + + std::unordered_map<IoHash, IoHash> CidToCompressedId; + CbPackage OutputPackage; + CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree); + int64_t TotalAttachmentBytes = 0; + int64_t TotalRawAttachmentBytes = 0; + + for (auto& It : OutputDirectoryTreeObject["f"sv]) + { + CbObjectView FileObject = It.AsObjectView(); + // Name is the uncompressed hash + IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString()); + // Hash is the compressed data hash, and how it is stored in Horde + IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment(); + + if (!TreeBinaryData.contains(CompressedId)) + { + Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId.ToHexString()); + return {.Error{.ErrorCode = -1, .Reason = "Object attachment chunk not retrieved from Horde"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + CidToCompressedId[DecompressedId] = CompressedId; + } + + // Iterate attachments, verify all chunks exist, and add to CbPackage + bool AnyErrors = false; + CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput); + BuildOutputObject.IterateAttachments([&](CbFieldView Field) { + const IoHash DecompressedId = Field.AsHash(); + if (!CidToCompressedId.contains(DecompressedId)) + { + Log().warn("Attachment not found {}", DecompressedId.ToHexString()); + AnyErrors = true; + return; + } + const IoHash& CompressedId = CidToCompressedId.at(DecompressedId); + + if (!TreeBinaryData.contains(CompressedId)) + { + Log().warn("Missing output {} compressed {} uncompressed", + CompressedId.ToHexString(), + DecompressedId.ToHexString()); + AnyErrors = true; + return; + } + + CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(TreeBinaryData[CompressedId])); + + if (!AttachmentBuffer) + { + Log().warn("Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed", + CompressedId.ToHexString(), + DecompressedId.ToHexString()); + AnyErrors = true; + return; + } + + TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); + TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize(); + + CbAttachment Attachment(AttachmentBuffer); + OutputPackage.AddAttachment(Attachment); + }); + + if (AnyErrors) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + OutputPackage.SetObject(BuildOutputObject); + + return {.OutputPackage = std::move(OutputPackage), + .TotalAttachmentBytes = TotalAttachmentBytes, + .TotalRawAttachmentBytes = TotalRawAttachmentBytes, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr), + .Success = true}; + } + catch (std::exception& Err) + { + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; + } + } + + [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data) + { + std::string ExecutablePath; + std::map<std::string, std::string> Environment; + std::set<std::filesystem::path> InputFiles; + std::map<std::filesystem::path, IoHash> InputFileHashes; + + ExecutablePath = ApplyRecord.WorkerDescriptor["path"sv].AsString(); + if (ExecutablePath.empty()) + { + Log().warn("process apply upstream FAILED, '{}', path missing from worker descriptor", + ApplyRecord.WorkerDescriptor.GetHash()); + return false; + } + + for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv]) + { + CbObjectView FileEntry = It.AsObjectView(); + if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) + { + return false; + } + } + + for (auto& It : ApplyRecord.WorkerDescriptor["files"sv]) + { + CbObjectView FileEntry = It.AsObjectView(); + if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) + { + return false; + } + } + + for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv]) + { + std::string_view Env = It.AsString(); + auto Index = Env.find('='); + if (Index < 0) + { + Log().warn("process apply upstream FAILED, environment '{}' malformed", Env); + return false; + } + + Environment[std::string(Env.substr(0, Index))] = Env.substr(Index + 1); + } + + { + static const std::filesystem::path BuildActionPath = "Build.action"sv; + static const std::filesystem::path InputPath = "Inputs"sv; + const IoHash ActionId = ApplyRecord.Action.GetHash(); + + InputFiles.insert(BuildActionPath); + InputFileHashes[BuildActionPath] = ActionId; + Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(), + ApplyRecord.Action.GetBuffer().GetSize()); + + bool AnyErrors = false; + ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) { + const IoHash Cid = Field.AsHash(); + const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()}; + IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid); + + if (!DataBuffer) + { + Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid); + AnyErrors = true; + return; + } + + if (InputFiles.contains(FilePath)) + { + return; + } + + const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize()); + + InputFiles.insert(FilePath); + InputFileHashes[FilePath] = CompressedId; + Data.Blobs[CompressedId] = std::move(DataBuffer); + }); + + if (AnyErrors) + { + return false; + } + } + + const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles); + + CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Blobs, Data.Objects); + const IoHash SandboxHash = Sandbox.GetHash(); + Data.Objects[SandboxHash] = std::move(Sandbox); + + CbObject Requirements = BuildRequirements("OSFamily == 'Windows'"sv, {}, false); + const IoHash RequirementsId = Requirements.GetHash(); + Data.Objects[RequirementsId] = std::move(Requirements); + Data.RequirementsId = RequirementsId; + + CbObject Task = BuildTask(ExecutablePath, + {"-Build=build.action"}, + Environment, + {}, + SandboxHash, + RequirementsId, + {"Build.output", "Outputs"}); + + const IoHash TaskId = Task.GetHash(); + Data.Objects[TaskId] = std::move(Task); + Data.TaskId = TaskId; + + return true; + } + + [[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry, + std::set<std::filesystem::path>& InputFiles, + std::map<std::filesystem::path, IoHash>& InputFileHashes, + std::map<IoHash, IoBuffer>& Blobs) + { + const std::filesystem::path FilePath = FileEntry["name"sv].AsString(); + const IoHash ChunkId = FileEntry["hash"sv].AsHash(); + const uint64_t Size = FileEntry["size"sv].AsUInt64(); + IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkId); + + if (!DataBuffer) + { + Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId); + return false; + } + + if (DataBuffer.Size() != Size) + { + Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {}, action spec expected {}", + ChunkId, + DataBuffer.Size(), + Size); + return false; + } + + if (InputFiles.contains(FilePath)) + { + Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath); + return false; + } + + InputFiles.insert(FilePath); + InputFileHashes[FilePath] = ChunkId; + Blobs[ChunkId] = std::move(DataBuffer); + return true; + } + + [[nodiscard]] UpstreamDirectory BuildDirectoryTree(const std::set<std::filesystem::path>& InputFiles) + { + static const std::filesystem::path RootPath; + std::map<std::filesystem::path, UpstreamDirectory*> AllDirectories; + UpstreamDirectory RootDirectory = {.Path = RootPath}; + + AllDirectories[RootPath] = &RootDirectory; + + // Build tree from flat list + for (const auto& Path : InputFiles) + { + if (Path.has_parent_path()) + { + if (!AllDirectories.contains(Path.parent_path())) + { + std::stack<std::string> PathSplit; + { + std::filesystem::path ParentPath = Path.parent_path(); + PathSplit.push(ParentPath.filename().string()); + while (ParentPath.has_parent_path()) + { + ParentPath = ParentPath.parent_path(); + PathSplit.push(ParentPath.filename().string()); + } + } + UpstreamDirectory* ParentPtr = &RootDirectory; + while (!PathSplit.empty()) + { + if (!ParentPtr->Directories.contains(PathSplit.top())) + { + std::filesystem::path NewParentPath = {ParentPtr->Path / PathSplit.top()}; + ParentPtr->Directories[PathSplit.top()] = {.Path = NewParentPath}; + AllDirectories[NewParentPath] = &ParentPtr->Directories[PathSplit.top()]; + } + ParentPtr = &ParentPtr->Directories[PathSplit.top()]; + PathSplit.pop(); + } + } + + AllDirectories[Path.parent_path()]->Files.insert(Path.filename().string()); + } + else + { + RootDirectory.Files.insert(Path.filename().string()); + } + } + + return RootDirectory; + } + + [[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory, + const std::map<std::filesystem::path, IoHash>& InputFileHashes, + const std::map<IoHash, IoBuffer>& Blobs, + std::map<IoHash, CbObject>& Objects) + { + CbObjectWriter DirectoryTreeWriter; + + if (!RootDirectory.Files.empty()) + { + DirectoryTreeWriter.BeginArray("f"sv); + for (const auto& File : RootDirectory.Files) + { + const std::filesystem::path FilePath = {RootDirectory.Path / File}; + const IoHash& FileHash = InputFileHashes.at(FilePath); + const uint64_t FileSize = Blobs.at(FileHash).Size(); + DirectoryTreeWriter.BeginObject(); + DirectoryTreeWriter.AddString("n"sv, File); + DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash); + DirectoryTreeWriter.AddInteger("s"sv, FileSize); // Size + // DirectoryTreeWriter.AddInteger("a"sv, 0); // Attributes Currently unneeded + DirectoryTreeWriter.EndObject(); + } + DirectoryTreeWriter.EndArray(); + } + + if (!RootDirectory.Directories.empty()) + { + DirectoryTreeWriter.BeginArray("d"sv); + for (const auto& Item : RootDirectory.Directories) + { + CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Blobs, Objects); + const IoHash DirectoryHash = Directory.GetHash(); + Objects[DirectoryHash] = std::move(Directory); + + DirectoryTreeWriter.BeginObject(); + DirectoryTreeWriter.AddString("n"sv, Item.first); + DirectoryTreeWriter.AddObjectAttachment("h"sv, DirectoryHash); + DirectoryTreeWriter.EndObject(); + } + DirectoryTreeWriter.EndArray(); + } + + return std::move(DirectoryTreeWriter.Save()); + } + + [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition, + const std::map<std::string_view, int32_t>& Resources, + const bool Exclusive) + { + CbObjectWriter Writer; + Writer.AddString("c", Condition); + if (!Resources.empty()) + { + Writer.BeginArray("r"); + for (const auto& Resource : Resources) + { + Writer.BeginArray(); + Writer.AddString(Resource.first); + Writer.AddInteger(Resource.second); + Writer.EndArray(); + } + Writer.EndArray(); + } + Writer.AddBool("e", Exclusive); + return std::move(Writer.Save()); + } + + [[nodiscard]] CbObject BuildTask(const std::string_view Executable, + const std::vector<std::string>& Arguments, + const std::map<std::string, std::string>& Environment, + const std::string_view WorkingDirectory, + const IoHash& SandboxHash, + const IoHash& RequirementsId, + const std::set<std::string>& Outputs) + { + CbObjectWriter TaskWriter; + TaskWriter.AddString("e"sv, Executable); + + if (!Arguments.empty()) + { + TaskWriter.BeginArray("a"sv); + for (const auto& Argument : Arguments) + { + TaskWriter.AddString(Argument); + } + TaskWriter.EndArray(); + } + + if (!Environment.empty()) + { + TaskWriter.BeginArray("v"sv); + for (const auto& Env : Environment) + { + TaskWriter.BeginArray(); + TaskWriter.AddString(Env.first); + TaskWriter.AddString(Env.second); + TaskWriter.EndArray(); + } + TaskWriter.EndArray(); + } + + if (!WorkingDirectory.empty()) + { + TaskWriter.AddString("s"sv, WorkingDirectory); + } + + TaskWriter.AddObjectAttachment("s"sv, SandboxHash); + TaskWriter.AddObjectAttachment("r"sv, RequirementsId); + + // Outputs + if (!Outputs.empty()) + { + TaskWriter.BeginArray("o"sv); + for (const auto& Output : Outputs) + { + TaskWriter.AddString(Output); + } + TaskWriter.EndArray(); + } + + return std::move(TaskWriter.Save()); + } + }; +} // namespace detail + +////////////////////////////////////////////////////////////////////////// + +struct UpstreamApplyStats +{ + static constexpr uint64_t MaxSampleCount = 1000ull; + + UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {} + + void Add(spdlog::logger& Logger, + UpstreamApplyEndpoint& Endpoint, + const PostUpstreamApplyResult& Result, + const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints) + { + UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); + + if (Result.Error) + { + Stats.ErrorCount++; + } + else if (Result.Success) + { + Stats.PostCount++; + Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); + Stats.SecondsUp.fetch_add(Result.ElapsedSeconds); + } + + if (m_Enabled && m_SampleCount++ % MaxSampleCount) + { + Dump(Logger, Endpoints); + } + } + + void Add(spdlog::logger& Logger, + UpstreamApplyEndpoint& Endpoint, + const GetUpstreamApplyUpdatesResult& Result, + const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints) + { + UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); + + if (Result.Error) + { + Stats.ErrorCount++; + } + else if (Result.Success) + { + Stats.UpdateCount++; + Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); + Stats.SecondsDown.fetch_add(Result.ElapsedSeconds); + if (!Result.Completed.empty()) + { + uint64_t Completed = 0; + for (auto& It : Result.Completed) + { + Completed += It.second.size(); + } + Stats.CompleteCount.fetch_add(Completed); + } + } + + if (m_Enabled && m_SampleCount++ % MaxSampleCount) + { + Dump(Logger, Endpoints); + } + } + + void Dump(spdlog::logger& Logger, const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints) + { + for (auto& Ep : Endpoints) + { + // These stats will not be totally correct as the numbers are not captured atomically + + UpstreamApplyEndpointStats& Stats = Ep->Stats(); + const uint64_t PostCount = Stats.PostCount; + const uint64_t CompleteCount = Stats.CompleteCount; + const uint64_t UpdateCount = Stats.UpdateCount; + const double DownBytes = Stats.DownBytes; + const double SecondsDown = Stats.SecondsDown; + const double UpBytes = Stats.UpBytes; + const double SecondsUp = Stats.SecondsUp; + + const double UpSpeed = UpBytes > 0 ? UpBytes / SecondsUp : 0.0; + const double DownSpeed = DownBytes > 0 ? DownBytes / SecondsDown : 0.0; + const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0; + + Logger.debug("STATS - '{}', Complete rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", + Ep->DisplayName(), + CompleteRate, + DownBytes, + DownSpeed, + UpBytes, + UpSpeed); + } + } + + bool m_Enabled; + std::atomic_uint64_t m_SampleCount = {}; +}; + +////////////////////////////////////////////////////////////////////////// + +class DefaultUpstreamApply final : public UpstreamApply +{ +public: + DefaultUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) + : m_Log(logging::Get("upstream-apply")) + , m_Options(Options) + , m_CasStore(CasStore) + , m_CidStore(CidStore) + , m_Stats(Options.StatsEnabled) + { + } + + virtual ~DefaultUpstreamApply() { Shutdown(); } + + virtual bool Initialize() override + { + for (auto& Endpoint : m_Endpoints) + { + const UpstreamEndpointHealth Health = Endpoint->Initialize(); + if (Health.Ok) + { + Log().info("initialize endpoint '{}' OK", Endpoint->DisplayName()); + } + else + { + Log().warn("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + } + } + + m_RunState.IsRunning = !m_Endpoints.empty(); + + if (m_RunState.IsRunning) + { + for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) + { + m_UpstreamThreads.emplace_back(&DefaultUpstreamApply::ProcessUpstreamQueue, this); + } + + m_UpstreamUpdatesThread = std::thread(&DefaultUpstreamApply::ProcessUpstreamUpdates, this); + + m_EndpointMonitorThread = std::thread(&DefaultUpstreamApply::MonitorEndpoints, this); + } + + return m_RunState.IsRunning; + } + + virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) override + { + m_Endpoints.emplace_back(std::move(Endpoint)); + } + + virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) override + { + if (m_RunState.IsRunning) + { + const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash(); + const IoHash ActionId = ApplyRecord.Action.GetHash(); + const uint32_t TimeoutSeconds = ApplyRecord.WorkerDescriptor["timeout"sv].AsInt32(300); + + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + // Already in progress + return {.ApplyId = ActionId, .Success = true}; + } + + std::chrono::steady_clock::time_point ExpireTime = + TimeoutSeconds > 0 ? std::chrono::steady_clock::now() + std::chrono::seconds(TimeoutSeconds) + : std::chrono::steady_clock::time_point::max(); + + m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)}; + } + + if (!m_UpstreamThreads.empty()) + { + m_UpstreamQueue.Enqueue(std::move(ApplyRecord)); + } + else + { + ProcessApplyRecord(std::move(ApplyRecord)); + } + + return {.ApplyId = ActionId, .Success = true}; + } + + return {}; + } + + virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) override + { + if (m_RunState.IsRunning) + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + return {.Status = *Status, .Success = true}; + } + } + + return {}; + } + + virtual void GetStatus(CbObjectWriter& Status) override + { + Status << "worker_threads" << m_Options.ThreadCount; + Status << "queue_count" << m_UpstreamQueue.Size(); + + Status.BeginArray("endpoints"); + for (const auto& Ep : m_Endpoints) + { + Status.BeginObject(); + Status << "name" << Ep->DisplayName(); + Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); + + UpstreamApplyEndpointStats& Stats = Ep->Stats(); + const uint64_t PostCount = Stats.PostCount; + const uint64_t CompleteCount = Stats.CompleteCount; + const uint64_t UpdateCount = Stats.UpdateCount; + const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0; + + Status << "post_count" << PostCount; + Status << "complete_count" << PostCount; + Status << "update_count" << Stats.UpdateCount; + + Status << "complete_ratio" << CompleteRate; + Status << "downloaded_mb" << Stats.DownBytes; + Status << "uploaded_mb" << Stats.UpBytes; + Status << "error_count" << Stats.ErrorCount; + + Status.EndObject(); + } + Status.EndArray(); + } + +private: + // The caller is responsible for locking if required + UpstreamApplyStatus* FindStatus(const IoHash& WorkerId, const IoHash& ActionId) + { + if (auto It = m_ApplyTasks.find(WorkerId); It != m_ApplyTasks.end()) + { + if (auto It2 = It->second.find(ActionId); It2 != It->second.end()) + { + return &It2->second; + } + } + return nullptr; + } + + void ProcessApplyRecord(UpstreamApplyRecord ApplyRecord) + { + const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash(); + const IoHash ActionId = ApplyRecord.Action.GetHash(); + try + { + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy()) + { + PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord)); + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + if (Result.Success) + { + Status->State = UpstreamApplyState::Executing; + } + else + { + Status->State = UpstreamApplyState::Complete; + Status->Result = {.Error = std::move(Result.Error), + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds}; + } + } + } + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + return; + } + } + + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + Status->State = UpstreamApplyState::Complete; + Status->Result = {.Error{.ErrorCode = -1, .Reason = "No available endpoint"}}; + } + Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId); + } + } + catch (std::exception& e) + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + Status->State = UpstreamApplyState::Complete; + Status->Result = {.Error{.ErrorCode = -1, .Reason = e.what()}}; + } + Log().warn("process upstream apply ({}/{}) FAILED '{}'", WorkerId, ActionId, e.what()); + } + } + + void ProcessUpstreamQueue() + { + for (;;) + { + UpstreamApplyRecord ApplyRecord; + if (m_UpstreamQueue.WaitAndDequeue(ApplyRecord)) + { + ProcessApplyRecord(std::move(ApplyRecord)); + } + + if (!m_RunState.IsRunning) + { + break; + } + } + } + + void ProcessApplyUpdates() + { + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy()) + { + GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(); + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + + if (!Result.Success) + { + Log().warn("process upstream apply updates FAILED '{}'", Result.Error.Reason); + } + + if (!Result.Completed.empty()) + { + for (auto& It : Result.Completed) + { + for (auto& It2 : It.second) + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(It.first, It2.first); Status != nullptr) + { + Status->State = UpstreamApplyState::Complete; + Status->Result = std::move(It2.second); + } + } + } + } + } + } + } + + void ProcessUpstreamUpdates() + { + const auto& UpdateSleep = std::chrono::seconds(m_Options.UpdatesInterval); + for (;;) + { + std::this_thread::sleep_for(UpdateSleep); + + if (!m_RunState.IsRunning) + { + break; + } + + ProcessApplyUpdates(); + + // Remove any expired tasks, regardless of state + { + std::scoped_lock Lock(m_ApplyTasksMutex); + for (auto& WorkerIt : m_ApplyTasks) + { + const auto Count = std::erase_if(WorkerIt.second, [](const auto& Item) { + return Item.second.ExpireTime < std::chrono::steady_clock::now(); + }); + if (Count > 0) + { + Log().debug("Removed '{}' expired tasks", Count); + } + } + const auto Count = std::erase_if(m_ApplyTasks, [](const auto& Item) { return Item.second.empty(); }); + if (Count > 0) + { + Log().debug("Removed '{}' empty task lists", Count); + } + } + } + } + + void MonitorEndpoints() + { + for (;;) + { + { + std::unique_lock Lock(m_RunState.Mutex); + if (m_RunState.ExitSignal.wait_for(Lock, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); })) + { + break; + } + } + + for (auto& Endpoint : m_Endpoints) + { + if (!Endpoint->IsHealthy()) + { + if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) + { + Log().warn("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason); + } + else + { + Log().warn("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + } + } + } + } + } + + void Shutdown() + { + if (m_RunState.Stop()) + { + m_UpstreamQueue.CompleteAdding(); + for (std::thread& Thread : m_UpstreamThreads) + { + Thread.join(); + } + + m_EndpointMonitorThread.join(); + m_UpstreamUpdatesThread.join(); + m_UpstreamThreads.clear(); + m_Endpoints.clear(); + } + } + + spdlog::logger& Log() { return m_Log; } + + using UpstreamApplyQueue = BlockingQueue<UpstreamApplyRecord>; + + struct RunState + { + std::mutex Mutex; + std::condition_variable ExitSignal; + std::atomic_bool IsRunning{false}; + + bool Stop() + { + bool Stopped = false; + { + std::scoped_lock Lock(Mutex); + Stopped = IsRunning.exchange(false); + } + if (Stopped) + { + ExitSignal.notify_all(); + } + return Stopped; + } + }; + + spdlog::logger& m_Log; + UpstreamApplyOptions m_Options; + CasStore& m_CasStore; + CidStore& m_CidStore; + UpstreamApplyQueue m_UpstreamQueue; + UpstreamApplyStats m_Stats; + UpstreamApplyTasks m_ApplyTasks; + std::mutex m_ApplyTasksMutex; + std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints; + std::vector<std::thread> m_UpstreamThreads; + std::thread m_UpstreamUpdatesThread; + std::thread m_EndpointMonitorThread; + RunState m_RunState; +}; + +////////////////////////////////////////////////////////////////////////// + +std::unique_ptr<UpstreamApply> +MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) +{ + return std::make_unique<DefaultUpstreamApply>(Options, CasStore, CidStore); +} + +std::unique_ptr<UpstreamApplyEndpoint> +MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options, CasStore& CasStore, CidStore& CidStore) +{ + return std::make_unique<detail::HordeUpstreamApplyEndpoint>(Options, CasStore, CidStore); +} + +} // namespace zen |