diff options
| author | Joe Kirchoff <[email protected]> | 2022-03-30 14:15:15 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-03-30 14:15:15 -0700 |
| commit | eb33c52b8e338b1bccf0d9d26b56d7ef611f6059 (patch) | |
| tree | 994b2af87e7b0cfba3250d41227c94d777738dc4 /zenserver/upstream/upstreamapply.cpp | |
| parent | Retain flags for small objects in structured cache (#68) (diff) | |
| download | zen-eb33c52b8e338b1bccf0d9d26b56d7ef611f6059.tar.xz zen-eb33c52b8e338b1bccf0d9d26b56d7ef611f6059.zip | |
Simple file-based compute (#65)
Diffstat (limited to 'zenserver/upstream/upstreamapply.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 682 |
1 files changed, 402 insertions, 280 deletions
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp index fd304adb8..17a6bb3cf 100644 --- a/zenserver/upstream/upstreamapply.cpp +++ b/zenserver/upstream/upstreamapply.cpp @@ -7,7 +7,6 @@ # include "jupiter.h" # include "zen.h" -# include <zencore/blockingqueue.h> # include <zencore/compactbinary.h> # include <zencore/compactbinarybuilder.h> # include <zencore/compactbinarypackage.h> @@ -19,6 +18,7 @@ # include <zencore/stream.h> # include <zencore/thread.h> # include <zencore/timer.h> +# include <zencore/workthreadpool.h> # include <zenstore/cas.h> # include <zenstore/cidstore.h> @@ -33,7 +33,6 @@ # include <algorithm> # include <atomic> -# include <map> # include <set> # include <stack> # include <thread> @@ -148,7 +147,7 @@ namespace detail { virtual std::string_view DisplayName() const override { return m_DisplayName; } - virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) override + virtual PostUpstreamApplyResult PostApply(UpstreamApplyRecord ApplyRecord) override { int64_t Bytes{}; double ElapsedSeconds{}; @@ -168,7 +167,7 @@ namespace detail { // Pending task is already queued, return success return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; } - m_PendingTasks[UpstreamData.TaskId] = ApplyRecord; + m_PendingTasks[UpstreamData.TaskId] = std::move(ApplyRecord); } CloudCacheSession ComputeSession(m_Client); @@ -200,17 +199,23 @@ namespace detail { .ElapsedSeconds = ElapsedSeconds}; } - Result = StorageSession.PutRef("requests"sv, - UpstreamData.TaskId, - UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(), - ZenContentType::kCbObject); - - Bytes += Result.Bytes; - ElapsedSeconds += Result.ElapsedSeconds; - if (!Result.Success) + PutRefResult RefResult = StorageSession.PutRef("requests"sv, + UpstreamData.TaskId, + UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(), + ZenContentType::kCbObject); + Log().debug("Put ref {} Need={} Bytes={} Duration={}s Result={}", + UpstreamData.TaskId, + RefResult.Needs.size(), + RefResult.Bytes, + RefResult.ElapsedSeconds, + RefResult.Success); + + Bytes += RefResult.Bytes; + ElapsedSeconds += RefResult.ElapsedSeconds; + if (!RefResult.Success) { - return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, - .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to add task ref"}, + return {.Error{.ErrorCode = RefResult.ErrorCode ? RefResult.ErrorCode : -1, + .Reason = !RefResult.Reason.empty() ? std::move(RefResult.Reason) : "Failed to add task ref"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } @@ -223,9 +228,15 @@ namespace detail { Writer.BeginArray("t"sv); Writer.AddObjectAttachment(UpstreamData.TaskId); Writer.EndArray(); - IoBuffer TasksData = Writer.Save().GetBuffer().AsIoBuffer(); + CbObject TasksObject = Writer.Save(); + IoBuffer TasksData = TasksObject.GetBuffer().AsIoBuffer(); CloudCacheResult Result = ComputeSession.PostComputeTasks(TasksData); + Log().debug("Post compute task {} Bytes={} Duration={}s Result={}", + TasksObject.GetHash(), + Result.Bytes, + Result.ElapsedSeconds, + Result.Success); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) @@ -241,6 +252,7 @@ namespace detail { .ElapsedSeconds = ElapsedSeconds}; } + Log().info("Task posted {}", UpstreamData.TaskId); return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; } catch (std::exception& Err) @@ -262,12 +274,14 @@ namespace detail { // Batch check for missing blobs std::set<IoHash> Keys; - for (const auto& It : Blobs) - { - Keys.insert(It.first); - } + std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys); + Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}", + Keys.size(), + ExistsResult.Needs.size(), + ExistsResult.ElapsedSeconds, + ExistsResult.Success); ElapsedSeconds += ExistsResult.ElapsedSeconds; if (!ExistsResult.Success) { @@ -279,14 +293,10 @@ namespace detail { // TODO: Batch upload missing blobs - for (const auto& It : Blobs) + for (const auto& Hash : ExistsResult.Needs) { - if (ExistsResult.Have.contains(It.first)) - { - continue; - } - - CloudCacheResult Result = Session.PutBlob(It.first, It.second); + CloudCacheResult Result = Session.PutBlob(Hash, Blobs.at(Hash)); + Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) @@ -313,12 +323,15 @@ namespace detail { // Batch check for missing objects std::set<IoHash> Keys; - for (const auto& It : Objects) - { - Keys.insert(It.first); - } + std::transform(Objects.begin(), Objects.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); + // Todo: Endpoint doesn't exist for objects CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys); + Log().debug("Queried {} missing objects Need={} Duration={}s Result={}", + Keys.size(), + ExistsResult.Needs.size(), + ExistsResult.ElapsedSeconds, + ExistsResult.Success); ElapsedSeconds += ExistsResult.ElapsedSeconds; if (!ExistsResult.Success) { @@ -330,14 +343,10 @@ namespace detail { // TODO: Batch upload missing objects - for (const auto& It : Objects) + for (const auto& Hash : ExistsResult.Needs) { - if (ExistsResult.Have.contains(It.first)) - { - continue; - } - - CloudCacheResult Result = Session.PutObject(It.first, It.second.GetBuffer().AsIoBuffer()); + CloudCacheResult Result = Session.PutObject(Hash, Objects.at(Hash).GetBuffer().AsIoBuffer()); + Log().debug("Put object {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) @@ -370,7 +379,21 @@ namespace detail { Exception = 6, }; - std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome) + [[nodiscard]] static std::string_view ComputeTaskStateToString(const ComputeTaskState Outcome) + { + switch (Outcome) + { + case ComputeTaskState::Queued: + return "Queued"sv; + case ComputeTaskState::Executing: + return "Executing"sv; + case ComputeTaskState::Complete: + return "Complete"sv; + }; + return "Unknown"sv; + } + + [[nodiscard]] static std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome) { switch (Outcome) { @@ -392,27 +415,36 @@ namespace detail { return "Unknown"sv; } - virtual GetUpstreamApplyUpdatesResult GetUpdates() override + virtual GetUpstreamApplyUpdatesResult GetUpdates(WorkerThreadPool& ThreadPool) override { - int64_t Bytes{}; - double ElapsedSeconds{}; - UpstreamApplyCompleted CompletedTasks; + int64_t Bytes{}; + double ElapsedSeconds{}; { std::scoped_lock Lock(m_TaskMutex); if (m_PendingTasks.empty()) { - // Nothing to do. - return {.Success = true}; + if (m_CompletedTasks.empty()) + { + // Nothing to do. + return {.Success = true}; + } + + UpstreamApplyCompleted CompletedTasks; + std::swap(CompletedTasks, m_CompletedTasks); + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; } } try { CloudCacheSession ComputeSession(m_Client); - CloudCacheSession StorageSession(m_StorageClient); CloudCacheResult UpdatesResult = ComputeSession.GetComputeUpdates(m_ChannelId); + Log().debug("Get compute updates Bytes={} Duration={}s Result={}", + UpdatesResult.Bytes, + UpdatesResult.ElapsedSeconds, + UpdatesResult.Success); Bytes += UpdatesResult.Bytes; ElapsedSeconds += UpdatesResult.ElapsedSeconds; if (!UpdatesResult.Success) @@ -427,44 +459,86 @@ namespace detail { return {.Error{.ErrorCode = -1, .Reason = "Failed get task updates"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } - CbObject TaskStatus = LoadCompactBinaryObject(UpdatesResult.Response); + CbObject TaskStatus = LoadCompactBinaryObject(std::move(UpdatesResult.Response)); for (auto& It : TaskStatus["u"sv]) { - CbObjectView Status = It.AsObjectView(); - const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32(); + CbObjectView Status = It.AsObjectView(); + IoHash TaskId = Status["h"sv].AsHash(); + const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32(); + const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)Status["o"sv].AsInt32(); + + Log().info("Task {} State={}", TaskId, ComputeTaskStateToString(State)); - // Only care about completed tasks + // Only completed tasks need to be processed if (State != ComputeTaskState::Complete) { continue; } - const IoHash TaskId = Status["h"sv].AsHash(); - - IoHash WorkerId; - IoHash ActionId; + IoHash WorkerId{}; + IoHash ActionId{}; + UpstreamApplyType ApplyType{}; { std::scoped_lock Lock(m_TaskMutex); auto TaskIt = m_PendingTasks.find(TaskId); - if (TaskIt == m_PendingTasks.end()) + if (TaskIt != m_PendingTasks.end()) { - continue; + WorkerId = TaskIt->second.WorkerDescriptor.GetHash(); + ActionId = TaskIt->second.Action.GetHash(); + ApplyType = TaskIt->second.Type; + m_PendingTasks.erase(TaskIt); } - WorkerId = TaskIt->second.WorkerDescriptor.GetHash(); - ActionId = TaskIt->second.Action.GetHash(); - m_PendingTasks.erase(TaskIt); } - GetUpstreamApplyResult Result = ProcessTaskStatus(Status, StorageSession); - Bytes += Result.Bytes; - ElapsedSeconds += Result.ElapsedSeconds; + if (WorkerId == IoHash::Zero) + { + Log().warn("Task {} missing from pending tasks", TaskId); + continue; + } - CompletedTasks[WorkerId][ActionId] = std::move(Result); + if (Outcome != ComputeTaskOutcome::Success) + { + const std::string_view Detail = Status["d"sv].AsString(); + { + std::scoped_lock Lock(m_TaskMutex); + m_CompletedTasks[WorkerId][ActionId] = { + .Error{.ErrorCode = -1, .Reason = fmt::format("Task {} {}", ComputeTaskOutcomeToString(Outcome), Detail)}}; + } + continue; + } + + ThreadPool.ScheduleWork([this, + ApplyType, + ResultHash = Status["r"sv].AsHash(), + TaskId = std::move(TaskId), + WorkerId = std::move(WorkerId), + ActionId = std::move(ActionId)]() { + GetUpstreamApplyResult Result = ProcessTaskStatus(ApplyType, ResultHash); + Log().debug("Task Processed {} Files={} Attachments={} ExitCode={}", + TaskId, + Result.OutputFiles.size(), + Result.OutputPackage.GetAttachments().size(), + Result.Error.ErrorCode); + { + std::scoped_lock Lock(m_TaskMutex); + m_CompletedTasks[WorkerId][ActionId] = std::move(Result); + } + }); } - return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; + { + std::scoped_lock Lock(m_TaskMutex); + if (m_CompletedTasks.empty()) + { + // Nothing to do. + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + UpstreamApplyCompleted CompletedTasks; + std::swap(CompletedTasks, m_CompletedTasks); + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; + } } catch (std::exception& Err) { @@ -473,7 +547,6 @@ namespace detail { .Error{.ErrorCode = -1, .Reason = Err.what()}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, - .Completed = std::move(CompletedTasks), }; } } @@ -496,6 +569,7 @@ namespace detail { std::mutex m_TaskMutex; std::unordered_map<IoHash, UpstreamApplyRecord> m_PendingTasks; + UpstreamApplyCompleted m_CompletedTasks; struct UpstreamData { @@ -512,30 +586,22 @@ namespace detail { std::set<std::string> Files; }; - [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const CbObjectView& TaskStatus, CloudCacheSession& Session) + [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const UpstreamApplyType ApplyType, const IoHash& ResultHash) { try { - const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)TaskStatus["o"sv].AsInt32(); - - if (Outcome != ComputeTaskOutcome::Success) - { - const std::string_view Detail = TaskStatus["d"sv].AsString(); - if (!Detail.empty()) - { - return {.Error{.ErrorCode = -1, - .Reason = fmt::format("Task {}: {}", ComputeTaskOutcomeToString(Outcome), std::string(Detail))}}; - } - return {.Error{.ErrorCode = -1, .Reason = fmt::format("Task {}", ComputeTaskOutcomeToString(Outcome))}}; - } - - const IoHash ResultHash = TaskStatus["r"sv].AsHash(); + CloudCacheSession Session(m_StorageClient); int64_t Bytes{}; double ElapsedSeconds{}; // Get Result object and all Object Attachments + Binary Attachment IDs CloudCacheResult ObjectRefResult = Session.GetRef("responses"sv, ResultHash, ZenContentType::kCbObject); + Log().debug("Get ref {} Bytes={} Duration={}s Result={}", + ResultHash, + ObjectRefResult.Bytes, + ObjectRefResult.ElapsedSeconds, + ObjectRefResult.Success); Bytes += ObjectRefResult.Bytes; ElapsedSeconds += ObjectRefResult.ElapsedSeconds; @@ -574,6 +640,11 @@ namespace detail { ObjectsToIterate.pop_back(); CloudCacheResult ObjectResult = Session.GetObject(Hash); + Log().debug("Get object {} Bytes={} Duration={}s Result={}", + Hash, + ObjectResult.Bytes, + ObjectResult.ElapsedSeconds, + ObjectResult.Success); Bytes += ObjectRefResult.Bytes; ElapsedSeconds += ObjectRefResult.ElapsedSeconds; if (!ObjectResult.Success) @@ -606,6 +677,11 @@ namespace detail { for (auto& It : BinaryData) { CloudCacheResult BlobResult = Session.GetBlob(It.first); + Log().debug("Get blob {} Bytes={} Duration={}s Result={}", + It.first, + BlobResult.Bytes, + BlobResult.ElapsedSeconds, + BlobResult.Success); Bytes += ObjectRefResult.Bytes; ElapsedSeconds += ObjectRefResult.ElapsedSeconds; if (!BlobResult.Success) @@ -626,9 +702,9 @@ namespace detail { std::string StdOut = std::string((const char*)BinaryData[StdOutHash].GetData(), BinaryData[StdOutHash].GetSize()); std::string StdErr = std::string((const char*)BinaryData[StdErrHash].GetData(), BinaryData[StdErrHash].GetSize()); - if (ExitCode != 0) + if (OutputHash == IoHash::Zero) { - return {.Error{.ErrorCode = ExitCode, .Reason = "Task completed with errors"}, + return {.Error{.ErrorCode = ExitCode, .Reason = "Task completed with no output object"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .StdOut = std::move(StdOut), @@ -637,137 +713,174 @@ namespace detail { CbObject OutputObject = LoadCompactBinaryObject(ObjectData[OutputHash]); - // Get build.output - IoHash BuildOutputId; - IoBuffer BuildOutput; - for (auto& It : OutputObject["f"sv]) + switch (ApplyType) { - const CbObjectView FileObject = It.AsObjectView(); - if (FileObject["n"sv].AsString() == "Build.output"sv) - { - BuildOutputId = FileObject["h"sv].AsBinaryAttachment(); - BuildOutput = BinaryData[BuildOutputId]; - break; - } - } + case UpstreamApplyType::Simple: + { + std::map<std::filesystem::path, IoHash> OutputFiles; - if (BuildOutput.GetSize() == 0) - { - return {.Error{.ErrorCode = ExitCode, .Reason = "Build.output file not found in task results"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .StdOut = std::move(StdOut), - .StdErr = std::move(StdErr)}; - } + ResolveMerkleTreeDirectory(""sv, OutputHash, ObjectData, OutputFiles); - // Get Output directory node - IoBuffer OutputDirectoryTree; - for (auto& It : OutputObject["d"sv]) - { - const CbObjectView DirectoryObject = It.AsObjectView(); - if (DirectoryObject["n"sv].AsString() == "Outputs"sv) - { - OutputDirectoryTree = ObjectData[DirectoryObject["h"sv].AsObjectAttachment()]; + return {.OutputFiles = std::move(OutputFiles), + .FileData = std::move(BinaryData), + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr), + .Success = true}; + } break; - } - } - - if (OutputDirectoryTree.GetSize() == 0) - { - return {.Error{.ErrorCode = ExitCode, .Reason = "Outputs directory not found in task results"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .StdOut = std::move(StdOut), - .StdErr = std::move(StdErr)}; - } - - // load build.output as CbObject + case UpstreamApplyType::Asset: + { + if (ExitCode != 0) + { + return {.Error{.ErrorCode = ExitCode, .Reason = "Task completed with errors"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr)}; + } - // Move Outputs from Horde to CbPackage + // Get build.output + IoHash BuildOutputId; + IoBuffer BuildOutput; + for (auto& It : OutputObject["f"sv]) + { + const CbObjectView FileObject = It.AsObjectView(); + if (FileObject["n"sv].AsString() == "Build.output"sv) + { + BuildOutputId = FileObject["h"sv].AsBinaryAttachment(); + BuildOutput = BinaryData[BuildOutputId]; + break; + } + } - std::unordered_map<IoHash, IoHash> CidToCompressedId; - CbPackage OutputPackage; - CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree); - int64_t TotalAttachmentBytes = 0; - int64_t TotalRawAttachmentBytes = 0; + if (BuildOutput.GetSize() == 0) + { + return {.Error{.ErrorCode = ExitCode, .Reason = "Build.output file not found in task results"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr)}; + } - for (auto& It : OutputDirectoryTreeObject["f"sv]) - { - CbObjectView FileObject = It.AsObjectView(); - // Name is the uncompressed hash - IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString()); - // Hash is the compressed data hash, and how it is stored in Horde - IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment(); + // Get Output directory node + IoBuffer OutputDirectoryTree; + for (auto& It : OutputObject["d"sv]) + { + const CbObjectView DirectoryObject = It.AsObjectView(); + if (DirectoryObject["n"sv].AsString() == "Outputs"sv) + { + OutputDirectoryTree = ObjectData[DirectoryObject["h"sv].AsObjectAttachment()]; + break; + } + } - if (!BinaryData.contains(CompressedId)) - { - Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId.ToHexString()); - return {.Error{.ErrorCode = -1, .Reason = "Object attachment chunk not retrieved from Horde"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - CidToCompressedId[DecompressedId] = CompressedId; - } + if (OutputDirectoryTree.GetSize() == 0) + { + return {.Error{.ErrorCode = ExitCode, .Reason = "Outputs directory not found in task results"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr)}; + } - // Iterate attachments, verify all chunks exist, and add to CbPackage - bool AnyErrors = false; - CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput); - BuildOutputObject.IterateAttachments([&](CbFieldView Field) { - const IoHash DecompressedId = Field.AsHash(); - if (!CidToCompressedId.contains(DecompressedId)) - { - Log().warn("Attachment not found {}", DecompressedId.ToHexString()); - AnyErrors = true; - return; - } - const IoHash& CompressedId = CidToCompressedId.at(DecompressedId); + // load build.output as CbObject - if (!BinaryData.contains(CompressedId)) - { - Log().warn("Missing output {} compressed {} uncompressed", - CompressedId.ToHexString(), - DecompressedId.ToHexString()); - AnyErrors = true; - return; - } + // Move Outputs from Horde to CbPackage - CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId])); + std::unordered_map<IoHash, IoHash> CidToCompressedId; + CbPackage OutputPackage; + CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree); + int64_t TotalAttachmentBytes = 0; + int64_t TotalRawAttachmentBytes = 0; - if (!AttachmentBuffer) - { - Log().warn("Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed", - CompressedId.ToHexString(), - DecompressedId.ToHexString()); - AnyErrors = true; - return; - } + for (auto& It : OutputDirectoryTreeObject["f"sv]) + { + CbObjectView FileObject = It.AsObjectView(); + // Name is the uncompressed hash + IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString()); + // Hash is the compressed data hash, and how it is stored in Horde + IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment(); + + if (!BinaryData.contains(CompressedId)) + { + Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId); + return {.Error{.ErrorCode = -1, .Reason = "Object attachment chunk not retrieved from Horde"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + CidToCompressedId[DecompressedId] = CompressedId; + } - TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); - TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize(); + // Iterate attachments, verify all chunks exist, and add to CbPackage + bool AnyErrors = false; + CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput); + BuildOutputObject.IterateAttachments([&](CbFieldView Field) { + const IoHash DecompressedId = Field.AsHash(); + if (!CidToCompressedId.contains(DecompressedId)) + { + Log().warn("Attachment not found {}", DecompressedId); + AnyErrors = true; + return; + } + const IoHash& CompressedId = CidToCompressedId.at(DecompressedId); + + if (!BinaryData.contains(CompressedId)) + { + Log().warn("Missing output {} compressed {} uncompressed", CompressedId, DecompressedId); + AnyErrors = true; + return; + } + + CompressedBuffer AttachmentBuffer = + CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId])); + + if (!AttachmentBuffer) + { + Log().warn( + "Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed", + CompressedId, + DecompressedId); + AnyErrors = true; + return; + } + + TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); + TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize(); + + CbAttachment Attachment(AttachmentBuffer); + OutputPackage.AddAttachment(Attachment); + }); + + if (AnyErrors) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr)}; + } - CbAttachment Attachment(AttachmentBuffer); - OutputPackage.AddAttachment(Attachment); - }); + OutputPackage.SetObject(BuildOutputObject); - if (AnyErrors) - { - return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .StdOut = std::move(StdOut), - .StdErr = std::move(StdErr)}; + return {.OutputPackage = std::move(OutputPackage), + .TotalAttachmentBytes = TotalAttachmentBytes, + .TotalRawAttachmentBytes = TotalRawAttachmentBytes, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr), + .Success = true}; + } + break; } - OutputPackage.SetObject(BuildOutputObject); - - return {.OutputPackage = std::move(OutputPackage), - .TotalAttachmentBytes = TotalAttachmentBytes, - .TotalRawAttachmentBytes = TotalRawAttachmentBytes, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds, - .StdOut = std::move(StdOut), - .StdErr = std::move(StdErr), - .Success = true}; + return {.Error{.ErrorCode = ExitCode, .Reason = "Unknown apply type"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr)}; } catch (std::exception& Err) { @@ -778,8 +891,11 @@ namespace detail { [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data) { std::string ExecutablePath; + std::string WorkingDirectory; + std::vector<std::string> Arguments; std::map<std::string, std::string> Environment; std::set<std::filesystem::path> InputFiles; + std::set<std::string> Outputs; std::map<std::filesystem::path, IoHash> InputFileHashes; ExecutablePath = ApplyRecord.WorkerDescriptor["path"sv].AsString(); @@ -790,6 +906,8 @@ namespace detail { return false; } + WorkingDirectory = ApplyRecord.WorkerDescriptor["workdir"sv].AsString(); + for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv]) { CbObjectView FileEntry = It.AsObjectView(); @@ -830,45 +948,67 @@ namespace detail { Environment[std::string(Env.substr(0, Index))] = Env.substr(Index + 1); } + switch (ApplyRecord.Type) { - static const std::filesystem::path BuildActionPath = "Build.action"sv; - static const std::filesystem::path InputPath = "Inputs"sv; - const IoHash ActionId = ApplyRecord.Action.GetHash(); - - InputFiles.insert(BuildActionPath); - InputFileHashes[BuildActionPath] = ActionId; - Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(), - ApplyRecord.Action.GetBuffer().GetSize()); - - bool AnyErrors = false; - ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) { - const IoHash Cid = Field.AsHash(); - const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()}; - IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid); - - if (!DataBuffer) + case UpstreamApplyType::Simple: { - Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid); - AnyErrors = true; - return; - } + for (auto& It : ApplyRecord.WorkerDescriptor["arguments"sv]) + { + Arguments.push_back(std::string(It.AsString())); + } - if (InputFiles.contains(FilePath)) - { - return; + for (auto& It : ApplyRecord.WorkerDescriptor["outputs"sv]) + { + Outputs.insert(std::string(It.AsString())); + } } + break; + case UpstreamApplyType::Asset: + { + static const std::filesystem::path BuildActionPath = "Build.action"sv; + static const std::filesystem::path InputPath = "Inputs"sv; + const IoHash ActionId = ApplyRecord.Action.GetHash(); + + Arguments.push_back("-Build=build.action"); + Outputs.insert("Build.output"); + Outputs.insert("Outputs"); + + InputFiles.insert(BuildActionPath); + InputFileHashes[BuildActionPath] = ActionId; + Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(), + ApplyRecord.Action.GetBuffer().GetSize()); + + bool AnyErrors = false; + ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) { + const IoHash Cid = Field.AsHash(); + const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()}; + IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid); + + if (!DataBuffer) + { + Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid); + AnyErrors = true; + return; + } - const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize()); + if (InputFiles.contains(FilePath)) + { + return; + } - InputFiles.insert(FilePath); - InputFileHashes[FilePath] = CompressedId; - Data.Blobs[CompressedId] = std::move(DataBuffer); - }); + const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize()); - if (AnyErrors) - { - return false; - } + InputFiles.insert(FilePath); + InputFileHashes[FilePath] = CompressedId; + Data.Blobs[CompressedId] = std::move(DataBuffer); + }); + + if (AnyErrors) + { + return false; + } + } + break; } const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles); @@ -912,13 +1052,7 @@ namespace detail { Data.RequirementsId = RequirementsId; } - CbObject Task = BuildTask(ExecutablePath, - {"-Build=build.action"}, - Environment, - {}, - SandboxHash, - Data.RequirementsId, - {"Build.output", "Outputs"}); + CbObject Task = BuildTask(ExecutablePath, Arguments, Environment, WorkingDirectory, SandboxHash, Data.RequirementsId, Outputs); const IoHash TaskId = Task.GetHash(); Data.Objects[TaskId] = std::move(Task); @@ -1059,6 +1193,32 @@ namespace detail { return DirectoryTreeWriter.Save(); } + void ResolveMerkleTreeDirectory(const std::filesystem::path& ParentDirectory, + const IoHash& DirectoryHash, + const std::map<IoHash, IoBuffer>& Objects, + std::map<std::filesystem::path, IoHash>& OutputFiles) + { + CbObject Directory = LoadCompactBinaryObject(Objects.at(DirectoryHash)); + + for (auto& It : Directory["f"sv]) + { + const CbObjectView FileObject = It.AsObjectView(); + const std::filesystem::path Path = ParentDirectory / FileObject["n"sv].AsString(); + + OutputFiles[Path] = FileObject["h"sv].AsBinaryAttachment(); + } + + for (auto& It : Directory["d"sv]) + { + const CbObjectView DirectoryObject = It.AsObjectView(); + + ResolveMerkleTreeDirectory(ParentDirectory / DirectoryObject["n"sv].AsString(), + DirectoryObject["h"sv].AsObjectAttachment(), + Objects, + OutputFiles); + } + } + [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition, const std::map<std::string_view, int64_t>& Resources, const bool Exclusive) @@ -1117,7 +1277,7 @@ namespace detail { if (!WorkingDirectory.empty()) { - TaskWriter.AddString("s"sv, WorkingDirectory); + TaskWriter.AddString("w"sv, WorkingDirectory); } TaskWriter.AddObjectAttachment("s"sv, SandboxHash); @@ -1200,6 +1360,7 @@ public: , m_CasStore(CasStore) , m_CidStore(CidStore) , m_Stats(Options.StatsEnabled) + , m_AsyncWorkPool(Options.ThreadCount) { } @@ -1226,11 +1387,6 @@ public: { m_ShutdownEvent.Reset(); - for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) - { - m_UpstreamThreads.emplace_back(&UpstreamApplyImpl::ProcessUpstreamQueue, this); - } - m_UpstreamUpdatesThread = std::thread(&UpstreamApplyImpl::ProcessUpstreamUpdates, this); m_EndpointMonitorThread = std::thread(&UpstreamApplyImpl::MonitorEndpoints, this); @@ -1267,14 +1423,7 @@ public: m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)}; } - if (!m_UpstreamThreads.empty()) - { - m_UpstreamQueue.Enqueue(std::move(ApplyRecord)); - } - else - { - ProcessApplyRecord(std::move(ApplyRecord)); - } + m_AsyncWorkPool.ScheduleWork([this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); }); return {.ApplyId = ActionId, .Success = true}; } @@ -1299,7 +1448,7 @@ public: virtual void GetStatus(CbObjectWriter& Status) override { Status << "worker_threads" << m_Options.ThreadCount; - Status << "queue_count" << m_UpstreamQueue.Size(); + Status << "queue_count" << m_AsyncWorkPool.PendingWork(); Status.BeginArray("endpoints"); for (const auto& Ep : m_Endpoints) @@ -1375,6 +1524,8 @@ private: } } + Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId); + { std::scoped_lock Lock(m_ApplyTasksMutex); if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) @@ -1382,7 +1533,6 @@ private: Status->State = UpstreamApplyState::Complete; Status->Result = {.Error{.ErrorCode = -1, .Reason = "No available endpoint"}}; } - Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId); } } catch (std::exception& e) @@ -1397,30 +1547,13 @@ private: } } - void ProcessUpstreamQueue() - { - for (;;) - { - UpstreamApplyRecord ApplyRecord; - if (m_UpstreamQueue.WaitAndDequeue(ApplyRecord)) - { - ProcessApplyRecord(std::move(ApplyRecord)); - } - - if (!m_RunState.IsRunning) - { - break; - } - } - } - void ProcessApplyUpdates() { for (auto& Endpoint : m_Endpoints) { if (Endpoint->IsHealthy()) { - GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(); + GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_AsyncWorkPool); m_Stats.Add(*Endpoint, Result); if (!Result.Success) @@ -1515,24 +1648,14 @@ private: if (m_RunState.Stop()) { m_ShutdownEvent.Set(); - - m_UpstreamQueue.CompleteAdding(); - for (std::thread& Thread : m_UpstreamThreads) - { - Thread.join(); - } - m_EndpointMonitorThread.join(); m_UpstreamUpdatesThread.join(); - m_UpstreamThreads.clear(); m_Endpoints.clear(); } } spdlog::logger& Log() { return m_Log; } - using UpstreamApplyQueue = BlockingQueue<UpstreamApplyRecord>; - struct RunState { std::mutex Mutex; @@ -1558,13 +1681,12 @@ private: UpstreamApplyOptions m_Options; CasStore& m_CasStore; CidStore& m_CidStore; - UpstreamApplyQueue m_UpstreamQueue; UpstreamApplyStats m_Stats; UpstreamApplyTasks m_ApplyTasks; std::mutex m_ApplyTasksMutex; std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints; Event m_ShutdownEvent; - std::vector<std::thread> m_UpstreamThreads; + WorkerThreadPool m_AsyncWorkPool; std::thread m_UpstreamUpdatesThread; std::thread m_EndpointMonitorThread; RunState m_RunState; |