aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamapply.cpp
diff options
context:
space:
mode:
authorJoe Kirchoff <[email protected]>2022-03-30 14:15:15 -0700
committerGitHub <[email protected]>2022-03-30 14:15:15 -0700
commiteb33c52b8e338b1bccf0d9d26b56d7ef611f6059 (patch)
tree994b2af87e7b0cfba3250d41227c94d777738dc4 /zenserver/upstream/upstreamapply.cpp
parentRetain flags for small objects in structured cache (#68) (diff)
downloadzen-eb33c52b8e338b1bccf0d9d26b56d7ef611f6059.tar.xz
zen-eb33c52b8e338b1bccf0d9d26b56d7ef611f6059.zip
Simple file-based compute (#65)
Diffstat (limited to 'zenserver/upstream/upstreamapply.cpp')
-rw-r--r--zenserver/upstream/upstreamapply.cpp682
1 files changed, 402 insertions, 280 deletions
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp
index fd304adb8..17a6bb3cf 100644
--- a/zenserver/upstream/upstreamapply.cpp
+++ b/zenserver/upstream/upstreamapply.cpp
@@ -7,7 +7,6 @@
# include "jupiter.h"
# include "zen.h"
-# include <zencore/blockingqueue.h>
# include <zencore/compactbinary.h>
# include <zencore/compactbinarybuilder.h>
# include <zencore/compactbinarypackage.h>
@@ -19,6 +18,7 @@
# include <zencore/stream.h>
# include <zencore/thread.h>
# include <zencore/timer.h>
+# include <zencore/workthreadpool.h>
# include <zenstore/cas.h>
# include <zenstore/cidstore.h>
@@ -33,7 +33,6 @@
# include <algorithm>
# include <atomic>
-# include <map>
# include <set>
# include <stack>
# include <thread>
@@ -148,7 +147,7 @@ namespace detail {
virtual std::string_view DisplayName() const override { return m_DisplayName; }
- virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) override
+ virtual PostUpstreamApplyResult PostApply(UpstreamApplyRecord ApplyRecord) override
{
int64_t Bytes{};
double ElapsedSeconds{};
@@ -168,7 +167,7 @@ namespace detail {
// Pending task is already queued, return success
return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
}
- m_PendingTasks[UpstreamData.TaskId] = ApplyRecord;
+ m_PendingTasks[UpstreamData.TaskId] = std::move(ApplyRecord);
}
CloudCacheSession ComputeSession(m_Client);
@@ -200,17 +199,23 @@ namespace detail {
.ElapsedSeconds = ElapsedSeconds};
}
- Result = StorageSession.PutRef("requests"sv,
- UpstreamData.TaskId,
- UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(),
- ZenContentType::kCbObject);
-
- Bytes += Result.Bytes;
- ElapsedSeconds += Result.ElapsedSeconds;
- if (!Result.Success)
+ PutRefResult RefResult = StorageSession.PutRef("requests"sv,
+ UpstreamData.TaskId,
+ UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(),
+ ZenContentType::kCbObject);
+ Log().debug("Put ref {} Need={} Bytes={} Duration={}s Result={}",
+ UpstreamData.TaskId,
+ RefResult.Needs.size(),
+ RefResult.Bytes,
+ RefResult.ElapsedSeconds,
+ RefResult.Success);
+
+ Bytes += RefResult.Bytes;
+ ElapsedSeconds += RefResult.ElapsedSeconds;
+ if (!RefResult.Success)
{
- return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
- .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to add task ref"},
+ return {.Error{.ErrorCode = RefResult.ErrorCode ? RefResult.ErrorCode : -1,
+ .Reason = !RefResult.Reason.empty() ? std::move(RefResult.Reason) : "Failed to add task ref"},
.Bytes = Bytes,
.ElapsedSeconds = ElapsedSeconds};
}
@@ -223,9 +228,15 @@ namespace detail {
Writer.BeginArray("t"sv);
Writer.AddObjectAttachment(UpstreamData.TaskId);
Writer.EndArray();
- IoBuffer TasksData = Writer.Save().GetBuffer().AsIoBuffer();
+ CbObject TasksObject = Writer.Save();
+ IoBuffer TasksData = TasksObject.GetBuffer().AsIoBuffer();
CloudCacheResult Result = ComputeSession.PostComputeTasks(TasksData);
+ Log().debug("Post compute task {} Bytes={} Duration={}s Result={}",
+ TasksObject.GetHash(),
+ Result.Bytes,
+ Result.ElapsedSeconds,
+ Result.Success);
Bytes += Result.Bytes;
ElapsedSeconds += Result.ElapsedSeconds;
if (!Result.Success)
@@ -241,6 +252,7 @@ namespace detail {
.ElapsedSeconds = ElapsedSeconds};
}
+ Log().info("Task posted {}", UpstreamData.TaskId);
return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
}
catch (std::exception& Err)
@@ -262,12 +274,14 @@ namespace detail {
// Batch check for missing blobs
std::set<IoHash> Keys;
- for (const auto& It : Blobs)
- {
- Keys.insert(It.first);
- }
+ std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; });
CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys);
+ Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}",
+ Keys.size(),
+ ExistsResult.Needs.size(),
+ ExistsResult.ElapsedSeconds,
+ ExistsResult.Success);
ElapsedSeconds += ExistsResult.ElapsedSeconds;
if (!ExistsResult.Success)
{
@@ -279,14 +293,10 @@ namespace detail {
// TODO: Batch upload missing blobs
- for (const auto& It : Blobs)
+ for (const auto& Hash : ExistsResult.Needs)
{
- if (ExistsResult.Have.contains(It.first))
- {
- continue;
- }
-
- CloudCacheResult Result = Session.PutBlob(It.first, It.second);
+ CloudCacheResult Result = Session.PutBlob(Hash, Blobs.at(Hash));
+ Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success);
Bytes += Result.Bytes;
ElapsedSeconds += Result.ElapsedSeconds;
if (!Result.Success)
@@ -313,12 +323,15 @@ namespace detail {
// Batch check for missing objects
std::set<IoHash> Keys;
- for (const auto& It : Objects)
- {
- Keys.insert(It.first);
- }
+ std::transform(Objects.begin(), Objects.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; });
+ // Todo: Endpoint doesn't exist for objects
CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys);
+ Log().debug("Queried {} missing objects Need={} Duration={}s Result={}",
+ Keys.size(),
+ ExistsResult.Needs.size(),
+ ExistsResult.ElapsedSeconds,
+ ExistsResult.Success);
ElapsedSeconds += ExistsResult.ElapsedSeconds;
if (!ExistsResult.Success)
{
@@ -330,14 +343,10 @@ namespace detail {
// TODO: Batch upload missing objects
- for (const auto& It : Objects)
+ for (const auto& Hash : ExistsResult.Needs)
{
- if (ExistsResult.Have.contains(It.first))
- {
- continue;
- }
-
- CloudCacheResult Result = Session.PutObject(It.first, It.second.GetBuffer().AsIoBuffer());
+ CloudCacheResult Result = Session.PutObject(Hash, Objects.at(Hash).GetBuffer().AsIoBuffer());
+ Log().debug("Put object {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success);
Bytes += Result.Bytes;
ElapsedSeconds += Result.ElapsedSeconds;
if (!Result.Success)
@@ -370,7 +379,21 @@ namespace detail {
Exception = 6,
};
- std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome)
+ [[nodiscard]] static std::string_view ComputeTaskStateToString(const ComputeTaskState Outcome)
+ {
+ switch (Outcome)
+ {
+ case ComputeTaskState::Queued:
+ return "Queued"sv;
+ case ComputeTaskState::Executing:
+ return "Executing"sv;
+ case ComputeTaskState::Complete:
+ return "Complete"sv;
+ };
+ return "Unknown"sv;
+ }
+
+ [[nodiscard]] static std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome)
{
switch (Outcome)
{
@@ -392,27 +415,36 @@ namespace detail {
return "Unknown"sv;
}
- virtual GetUpstreamApplyUpdatesResult GetUpdates() override
+ virtual GetUpstreamApplyUpdatesResult GetUpdates(WorkerThreadPool& ThreadPool) override
{
- int64_t Bytes{};
- double ElapsedSeconds{};
- UpstreamApplyCompleted CompletedTasks;
+ int64_t Bytes{};
+ double ElapsedSeconds{};
{
std::scoped_lock Lock(m_TaskMutex);
if (m_PendingTasks.empty())
{
- // Nothing to do.
- return {.Success = true};
+ if (m_CompletedTasks.empty())
+ {
+ // Nothing to do.
+ return {.Success = true};
+ }
+
+ UpstreamApplyCompleted CompletedTasks;
+ std::swap(CompletedTasks, m_CompletedTasks);
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true};
}
}
try
{
CloudCacheSession ComputeSession(m_Client);
- CloudCacheSession StorageSession(m_StorageClient);
CloudCacheResult UpdatesResult = ComputeSession.GetComputeUpdates(m_ChannelId);
+ Log().debug("Get compute updates Bytes={} Duration={}s Result={}",
+ UpdatesResult.Bytes,
+ UpdatesResult.ElapsedSeconds,
+ UpdatesResult.Success);
Bytes += UpdatesResult.Bytes;
ElapsedSeconds += UpdatesResult.ElapsedSeconds;
if (!UpdatesResult.Success)
@@ -427,44 +459,86 @@ namespace detail {
return {.Error{.ErrorCode = -1, .Reason = "Failed get task updates"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds};
}
- CbObject TaskStatus = LoadCompactBinaryObject(UpdatesResult.Response);
+ CbObject TaskStatus = LoadCompactBinaryObject(std::move(UpdatesResult.Response));
for (auto& It : TaskStatus["u"sv])
{
- CbObjectView Status = It.AsObjectView();
- const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32();
+ CbObjectView Status = It.AsObjectView();
+ IoHash TaskId = Status["h"sv].AsHash();
+ const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32();
+ const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)Status["o"sv].AsInt32();
+
+ Log().info("Task {} State={}", TaskId, ComputeTaskStateToString(State));
- // Only care about completed tasks
+ // Only completed tasks need to be processed
if (State != ComputeTaskState::Complete)
{
continue;
}
- const IoHash TaskId = Status["h"sv].AsHash();
-
- IoHash WorkerId;
- IoHash ActionId;
+ IoHash WorkerId{};
+ IoHash ActionId{};
+ UpstreamApplyType ApplyType{};
{
std::scoped_lock Lock(m_TaskMutex);
auto TaskIt = m_PendingTasks.find(TaskId);
- if (TaskIt == m_PendingTasks.end())
+ if (TaskIt != m_PendingTasks.end())
{
- continue;
+ WorkerId = TaskIt->second.WorkerDescriptor.GetHash();
+ ActionId = TaskIt->second.Action.GetHash();
+ ApplyType = TaskIt->second.Type;
+ m_PendingTasks.erase(TaskIt);
}
- WorkerId = TaskIt->second.WorkerDescriptor.GetHash();
- ActionId = TaskIt->second.Action.GetHash();
- m_PendingTasks.erase(TaskIt);
}
- GetUpstreamApplyResult Result = ProcessTaskStatus(Status, StorageSession);
- Bytes += Result.Bytes;
- ElapsedSeconds += Result.ElapsedSeconds;
+ if (WorkerId == IoHash::Zero)
+ {
+ Log().warn("Task {} missing from pending tasks", TaskId);
+ continue;
+ }
- CompletedTasks[WorkerId][ActionId] = std::move(Result);
+ if (Outcome != ComputeTaskOutcome::Success)
+ {
+ const std::string_view Detail = Status["d"sv].AsString();
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ m_CompletedTasks[WorkerId][ActionId] = {
+ .Error{.ErrorCode = -1, .Reason = fmt::format("Task {} {}", ComputeTaskOutcomeToString(Outcome), Detail)}};
+ }
+ continue;
+ }
+
+ ThreadPool.ScheduleWork([this,
+ ApplyType,
+ ResultHash = Status["r"sv].AsHash(),
+ TaskId = std::move(TaskId),
+ WorkerId = std::move(WorkerId),
+ ActionId = std::move(ActionId)]() {
+ GetUpstreamApplyResult Result = ProcessTaskStatus(ApplyType, ResultHash);
+ Log().debug("Task Processed {} Files={} Attachments={} ExitCode={}",
+ TaskId,
+ Result.OutputFiles.size(),
+ Result.OutputPackage.GetAttachments().size(),
+ Result.Error.ErrorCode);
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ m_CompletedTasks[WorkerId][ActionId] = std::move(Result);
+ }
+ });
}
- return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true};
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ if (m_CompletedTasks.empty())
+ {
+ // Nothing to do.
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
+ }
+ UpstreamApplyCompleted CompletedTasks;
+ std::swap(CompletedTasks, m_CompletedTasks);
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true};
+ }
}
catch (std::exception& Err)
{
@@ -473,7 +547,6 @@ namespace detail {
.Error{.ErrorCode = -1, .Reason = Err.what()},
.Bytes = Bytes,
.ElapsedSeconds = ElapsedSeconds,
- .Completed = std::move(CompletedTasks),
};
}
}
@@ -496,6 +569,7 @@ namespace detail {
std::mutex m_TaskMutex;
std::unordered_map<IoHash, UpstreamApplyRecord> m_PendingTasks;
+ UpstreamApplyCompleted m_CompletedTasks;
struct UpstreamData
{
@@ -512,30 +586,22 @@ namespace detail {
std::set<std::string> Files;
};
- [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const CbObjectView& TaskStatus, CloudCacheSession& Session)
+ [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const UpstreamApplyType ApplyType, const IoHash& ResultHash)
{
try
{
- const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)TaskStatus["o"sv].AsInt32();
-
- if (Outcome != ComputeTaskOutcome::Success)
- {
- const std::string_view Detail = TaskStatus["d"sv].AsString();
- if (!Detail.empty())
- {
- return {.Error{.ErrorCode = -1,
- .Reason = fmt::format("Task {}: {}", ComputeTaskOutcomeToString(Outcome), std::string(Detail))}};
- }
- return {.Error{.ErrorCode = -1, .Reason = fmt::format("Task {}", ComputeTaskOutcomeToString(Outcome))}};
- }
-
- const IoHash ResultHash = TaskStatus["r"sv].AsHash();
+ CloudCacheSession Session(m_StorageClient);
int64_t Bytes{};
double ElapsedSeconds{};
// Get Result object and all Object Attachments + Binary Attachment IDs
CloudCacheResult ObjectRefResult = Session.GetRef("responses"sv, ResultHash, ZenContentType::kCbObject);
+ Log().debug("Get ref {} Bytes={} Duration={}s Result={}",
+ ResultHash,
+ ObjectRefResult.Bytes,
+ ObjectRefResult.ElapsedSeconds,
+ ObjectRefResult.Success);
Bytes += ObjectRefResult.Bytes;
ElapsedSeconds += ObjectRefResult.ElapsedSeconds;
@@ -574,6 +640,11 @@ namespace detail {
ObjectsToIterate.pop_back();
CloudCacheResult ObjectResult = Session.GetObject(Hash);
+ Log().debug("Get object {} Bytes={} Duration={}s Result={}",
+ Hash,
+ ObjectResult.Bytes,
+ ObjectResult.ElapsedSeconds,
+ ObjectResult.Success);
Bytes += ObjectRefResult.Bytes;
ElapsedSeconds += ObjectRefResult.ElapsedSeconds;
if (!ObjectResult.Success)
@@ -606,6 +677,11 @@ namespace detail {
for (auto& It : BinaryData)
{
CloudCacheResult BlobResult = Session.GetBlob(It.first);
+ Log().debug("Get blob {} Bytes={} Duration={}s Result={}",
+ It.first,
+ BlobResult.Bytes,
+ BlobResult.ElapsedSeconds,
+ BlobResult.Success);
Bytes += ObjectRefResult.Bytes;
ElapsedSeconds += ObjectRefResult.ElapsedSeconds;
if (!BlobResult.Success)
@@ -626,9 +702,9 @@ namespace detail {
std::string StdOut = std::string((const char*)BinaryData[StdOutHash].GetData(), BinaryData[StdOutHash].GetSize());
std::string StdErr = std::string((const char*)BinaryData[StdErrHash].GetData(), BinaryData[StdErrHash].GetSize());
- if (ExitCode != 0)
+ if (OutputHash == IoHash::Zero)
{
- return {.Error{.ErrorCode = ExitCode, .Reason = "Task completed with errors"},
+ return {.Error{.ErrorCode = ExitCode, .Reason = "Task completed with no output object"},
.Bytes = Bytes,
.ElapsedSeconds = ElapsedSeconds,
.StdOut = std::move(StdOut),
@@ -637,137 +713,174 @@ namespace detail {
CbObject OutputObject = LoadCompactBinaryObject(ObjectData[OutputHash]);
- // Get build.output
- IoHash BuildOutputId;
- IoBuffer BuildOutput;
- for (auto& It : OutputObject["f"sv])
+ switch (ApplyType)
{
- const CbObjectView FileObject = It.AsObjectView();
- if (FileObject["n"sv].AsString() == "Build.output"sv)
- {
- BuildOutputId = FileObject["h"sv].AsBinaryAttachment();
- BuildOutput = BinaryData[BuildOutputId];
- break;
- }
- }
+ case UpstreamApplyType::Simple:
+ {
+ std::map<std::filesystem::path, IoHash> OutputFiles;
- if (BuildOutput.GetSize() == 0)
- {
- return {.Error{.ErrorCode = ExitCode, .Reason = "Build.output file not found in task results"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .StdOut = std::move(StdOut),
- .StdErr = std::move(StdErr)};
- }
+ ResolveMerkleTreeDirectory(""sv, OutputHash, ObjectData, OutputFiles);
- // Get Output directory node
- IoBuffer OutputDirectoryTree;
- for (auto& It : OutputObject["d"sv])
- {
- const CbObjectView DirectoryObject = It.AsObjectView();
- if (DirectoryObject["n"sv].AsString() == "Outputs"sv)
- {
- OutputDirectoryTree = ObjectData[DirectoryObject["h"sv].AsObjectAttachment()];
+ return {.OutputFiles = std::move(OutputFiles),
+ .FileData = std::move(BinaryData),
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .StdOut = std::move(StdOut),
+ .StdErr = std::move(StdErr),
+ .Success = true};
+ }
break;
- }
- }
-
- if (OutputDirectoryTree.GetSize() == 0)
- {
- return {.Error{.ErrorCode = ExitCode, .Reason = "Outputs directory not found in task results"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .StdOut = std::move(StdOut),
- .StdErr = std::move(StdErr)};
- }
-
- // load build.output as CbObject
+ case UpstreamApplyType::Asset:
+ {
+ if (ExitCode != 0)
+ {
+ return {.Error{.ErrorCode = ExitCode, .Reason = "Task completed with errors"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .StdOut = std::move(StdOut),
+ .StdErr = std::move(StdErr)};
+ }
- // Move Outputs from Horde to CbPackage
+ // Get build.output
+ IoHash BuildOutputId;
+ IoBuffer BuildOutput;
+ for (auto& It : OutputObject["f"sv])
+ {
+ const CbObjectView FileObject = It.AsObjectView();
+ if (FileObject["n"sv].AsString() == "Build.output"sv)
+ {
+ BuildOutputId = FileObject["h"sv].AsBinaryAttachment();
+ BuildOutput = BinaryData[BuildOutputId];
+ break;
+ }
+ }
- std::unordered_map<IoHash, IoHash> CidToCompressedId;
- CbPackage OutputPackage;
- CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree);
- int64_t TotalAttachmentBytes = 0;
- int64_t TotalRawAttachmentBytes = 0;
+ if (BuildOutput.GetSize() == 0)
+ {
+ return {.Error{.ErrorCode = ExitCode, .Reason = "Build.output file not found in task results"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .StdOut = std::move(StdOut),
+ .StdErr = std::move(StdErr)};
+ }
- for (auto& It : OutputDirectoryTreeObject["f"sv])
- {
- CbObjectView FileObject = It.AsObjectView();
- // Name is the uncompressed hash
- IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString());
- // Hash is the compressed data hash, and how it is stored in Horde
- IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment();
+ // Get Output directory node
+ IoBuffer OutputDirectoryTree;
+ for (auto& It : OutputObject["d"sv])
+ {
+ const CbObjectView DirectoryObject = It.AsObjectView();
+ if (DirectoryObject["n"sv].AsString() == "Outputs"sv)
+ {
+ OutputDirectoryTree = ObjectData[DirectoryObject["h"sv].AsObjectAttachment()];
+ break;
+ }
+ }
- if (!BinaryData.contains(CompressedId))
- {
- Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId.ToHexString());
- return {.Error{.ErrorCode = -1, .Reason = "Object attachment chunk not retrieved from Horde"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
- }
- CidToCompressedId[DecompressedId] = CompressedId;
- }
+ if (OutputDirectoryTree.GetSize() == 0)
+ {
+ return {.Error{.ErrorCode = ExitCode, .Reason = "Outputs directory not found in task results"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .StdOut = std::move(StdOut),
+ .StdErr = std::move(StdErr)};
+ }
- // Iterate attachments, verify all chunks exist, and add to CbPackage
- bool AnyErrors = false;
- CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput);
- BuildOutputObject.IterateAttachments([&](CbFieldView Field) {
- const IoHash DecompressedId = Field.AsHash();
- if (!CidToCompressedId.contains(DecompressedId))
- {
- Log().warn("Attachment not found {}", DecompressedId.ToHexString());
- AnyErrors = true;
- return;
- }
- const IoHash& CompressedId = CidToCompressedId.at(DecompressedId);
+ // load build.output as CbObject
- if (!BinaryData.contains(CompressedId))
- {
- Log().warn("Missing output {} compressed {} uncompressed",
- CompressedId.ToHexString(),
- DecompressedId.ToHexString());
- AnyErrors = true;
- return;
- }
+ // Move Outputs from Horde to CbPackage
- CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId]));
+ std::unordered_map<IoHash, IoHash> CidToCompressedId;
+ CbPackage OutputPackage;
+ CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree);
+ int64_t TotalAttachmentBytes = 0;
+ int64_t TotalRawAttachmentBytes = 0;
- if (!AttachmentBuffer)
- {
- Log().warn("Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed",
- CompressedId.ToHexString(),
- DecompressedId.ToHexString());
- AnyErrors = true;
- return;
- }
+ for (auto& It : OutputDirectoryTreeObject["f"sv])
+ {
+ CbObjectView FileObject = It.AsObjectView();
+ // Name is the uncompressed hash
+ IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString());
+ // Hash is the compressed data hash, and how it is stored in Horde
+ IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment();
+
+ if (!BinaryData.contains(CompressedId))
+ {
+ Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId);
+ return {.Error{.ErrorCode = -1, .Reason = "Object attachment chunk not retrieved from Horde"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+ CidToCompressedId[DecompressedId] = CompressedId;
+ }
- TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize();
- TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize();
+ // Iterate attachments, verify all chunks exist, and add to CbPackage
+ bool AnyErrors = false;
+ CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput);
+ BuildOutputObject.IterateAttachments([&](CbFieldView Field) {
+ const IoHash DecompressedId = Field.AsHash();
+ if (!CidToCompressedId.contains(DecompressedId))
+ {
+ Log().warn("Attachment not found {}", DecompressedId);
+ AnyErrors = true;
+ return;
+ }
+ const IoHash& CompressedId = CidToCompressedId.at(DecompressedId);
+
+ if (!BinaryData.contains(CompressedId))
+ {
+ Log().warn("Missing output {} compressed {} uncompressed", CompressedId, DecompressedId);
+ AnyErrors = true;
+ return;
+ }
+
+ CompressedBuffer AttachmentBuffer =
+ CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId]));
+
+ if (!AttachmentBuffer)
+ {
+ Log().warn(
+ "Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed",
+ CompressedId,
+ DecompressedId);
+ AnyErrors = true;
+ return;
+ }
+
+ TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize();
+ TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize();
+
+ CbAttachment Attachment(AttachmentBuffer);
+ OutputPackage.AddAttachment(Attachment);
+ });
+
+ if (AnyErrors)
+ {
+ return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .StdOut = std::move(StdOut),
+ .StdErr = std::move(StdErr)};
+ }
- CbAttachment Attachment(AttachmentBuffer);
- OutputPackage.AddAttachment(Attachment);
- });
+ OutputPackage.SetObject(BuildOutputObject);
- if (AnyErrors)
- {
- return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .StdOut = std::move(StdOut),
- .StdErr = std::move(StdErr)};
+ return {.OutputPackage = std::move(OutputPackage),
+ .TotalAttachmentBytes = TotalAttachmentBytes,
+ .TotalRawAttachmentBytes = TotalRawAttachmentBytes,
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .StdOut = std::move(StdOut),
+ .StdErr = std::move(StdErr),
+ .Success = true};
+ }
+ break;
}
- OutputPackage.SetObject(BuildOutputObject);
-
- return {.OutputPackage = std::move(OutputPackage),
- .TotalAttachmentBytes = TotalAttachmentBytes,
- .TotalRawAttachmentBytes = TotalRawAttachmentBytes,
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .StdOut = std::move(StdOut),
- .StdErr = std::move(StdErr),
- .Success = true};
+ return {.Error{.ErrorCode = ExitCode, .Reason = "Unknown apply type"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .StdOut = std::move(StdOut),
+ .StdErr = std::move(StdErr)};
}
catch (std::exception& Err)
{
@@ -778,8 +891,11 @@ namespace detail {
[[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data)
{
std::string ExecutablePath;
+ std::string WorkingDirectory;
+ std::vector<std::string> Arguments;
std::map<std::string, std::string> Environment;
std::set<std::filesystem::path> InputFiles;
+ std::set<std::string> Outputs;
std::map<std::filesystem::path, IoHash> InputFileHashes;
ExecutablePath = ApplyRecord.WorkerDescriptor["path"sv].AsString();
@@ -790,6 +906,8 @@ namespace detail {
return false;
}
+ WorkingDirectory = ApplyRecord.WorkerDescriptor["workdir"sv].AsString();
+
for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv])
{
CbObjectView FileEntry = It.AsObjectView();
@@ -830,45 +948,67 @@ namespace detail {
Environment[std::string(Env.substr(0, Index))] = Env.substr(Index + 1);
}
+ switch (ApplyRecord.Type)
{
- static const std::filesystem::path BuildActionPath = "Build.action"sv;
- static const std::filesystem::path InputPath = "Inputs"sv;
- const IoHash ActionId = ApplyRecord.Action.GetHash();
-
- InputFiles.insert(BuildActionPath);
- InputFileHashes[BuildActionPath] = ActionId;
- Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(),
- ApplyRecord.Action.GetBuffer().GetSize());
-
- bool AnyErrors = false;
- ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) {
- const IoHash Cid = Field.AsHash();
- const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()};
- IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid);
-
- if (!DataBuffer)
+ case UpstreamApplyType::Simple:
{
- Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid);
- AnyErrors = true;
- return;
- }
+ for (auto& It : ApplyRecord.WorkerDescriptor["arguments"sv])
+ {
+ Arguments.push_back(std::string(It.AsString()));
+ }
- if (InputFiles.contains(FilePath))
- {
- return;
+ for (auto& It : ApplyRecord.WorkerDescriptor["outputs"sv])
+ {
+ Outputs.insert(std::string(It.AsString()));
+ }
}
+ break;
+ case UpstreamApplyType::Asset:
+ {
+ static const std::filesystem::path BuildActionPath = "Build.action"sv;
+ static const std::filesystem::path InputPath = "Inputs"sv;
+ const IoHash ActionId = ApplyRecord.Action.GetHash();
+
+ Arguments.push_back("-Build=build.action");
+ Outputs.insert("Build.output");
+ Outputs.insert("Outputs");
+
+ InputFiles.insert(BuildActionPath);
+ InputFileHashes[BuildActionPath] = ActionId;
+ Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(),
+ ApplyRecord.Action.GetBuffer().GetSize());
+
+ bool AnyErrors = false;
+ ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) {
+ const IoHash Cid = Field.AsHash();
+ const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()};
+ IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid);
+
+ if (!DataBuffer)
+ {
+ Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid);
+ AnyErrors = true;
+ return;
+ }
- const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize());
+ if (InputFiles.contains(FilePath))
+ {
+ return;
+ }
- InputFiles.insert(FilePath);
- InputFileHashes[FilePath] = CompressedId;
- Data.Blobs[CompressedId] = std::move(DataBuffer);
- });
+ const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize());
- if (AnyErrors)
- {
- return false;
- }
+ InputFiles.insert(FilePath);
+ InputFileHashes[FilePath] = CompressedId;
+ Data.Blobs[CompressedId] = std::move(DataBuffer);
+ });
+
+ if (AnyErrors)
+ {
+ return false;
+ }
+ }
+ break;
}
const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles);
@@ -912,13 +1052,7 @@ namespace detail {
Data.RequirementsId = RequirementsId;
}
- CbObject Task = BuildTask(ExecutablePath,
- {"-Build=build.action"},
- Environment,
- {},
- SandboxHash,
- Data.RequirementsId,
- {"Build.output", "Outputs"});
+ CbObject Task = BuildTask(ExecutablePath, Arguments, Environment, WorkingDirectory, SandboxHash, Data.RequirementsId, Outputs);
const IoHash TaskId = Task.GetHash();
Data.Objects[TaskId] = std::move(Task);
@@ -1059,6 +1193,32 @@ namespace detail {
return DirectoryTreeWriter.Save();
}
+ void ResolveMerkleTreeDirectory(const std::filesystem::path& ParentDirectory,
+ const IoHash& DirectoryHash,
+ const std::map<IoHash, IoBuffer>& Objects,
+ std::map<std::filesystem::path, IoHash>& OutputFiles)
+ {
+ CbObject Directory = LoadCompactBinaryObject(Objects.at(DirectoryHash));
+
+ for (auto& It : Directory["f"sv])
+ {
+ const CbObjectView FileObject = It.AsObjectView();
+ const std::filesystem::path Path = ParentDirectory / FileObject["n"sv].AsString();
+
+ OutputFiles[Path] = FileObject["h"sv].AsBinaryAttachment();
+ }
+
+ for (auto& It : Directory["d"sv])
+ {
+ const CbObjectView DirectoryObject = It.AsObjectView();
+
+ ResolveMerkleTreeDirectory(ParentDirectory / DirectoryObject["n"sv].AsString(),
+ DirectoryObject["h"sv].AsObjectAttachment(),
+ Objects,
+ OutputFiles);
+ }
+ }
+
[[nodiscard]] CbObject BuildRequirements(const std::string_view Condition,
const std::map<std::string_view, int64_t>& Resources,
const bool Exclusive)
@@ -1117,7 +1277,7 @@ namespace detail {
if (!WorkingDirectory.empty())
{
- TaskWriter.AddString("s"sv, WorkingDirectory);
+ TaskWriter.AddString("w"sv, WorkingDirectory);
}
TaskWriter.AddObjectAttachment("s"sv, SandboxHash);
@@ -1200,6 +1360,7 @@ public:
, m_CasStore(CasStore)
, m_CidStore(CidStore)
, m_Stats(Options.StatsEnabled)
+ , m_AsyncWorkPool(Options.ThreadCount)
{
}
@@ -1226,11 +1387,6 @@ public:
{
m_ShutdownEvent.Reset();
- for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
- {
- m_UpstreamThreads.emplace_back(&UpstreamApplyImpl::ProcessUpstreamQueue, this);
- }
-
m_UpstreamUpdatesThread = std::thread(&UpstreamApplyImpl::ProcessUpstreamUpdates, this);
m_EndpointMonitorThread = std::thread(&UpstreamApplyImpl::MonitorEndpoints, this);
@@ -1267,14 +1423,7 @@ public:
m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)};
}
- if (!m_UpstreamThreads.empty())
- {
- m_UpstreamQueue.Enqueue(std::move(ApplyRecord));
- }
- else
- {
- ProcessApplyRecord(std::move(ApplyRecord));
- }
+ m_AsyncWorkPool.ScheduleWork([this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); });
return {.ApplyId = ActionId, .Success = true};
}
@@ -1299,7 +1448,7 @@ public:
virtual void GetStatus(CbObjectWriter& Status) override
{
Status << "worker_threads" << m_Options.ThreadCount;
- Status << "queue_count" << m_UpstreamQueue.Size();
+ Status << "queue_count" << m_AsyncWorkPool.PendingWork();
Status.BeginArray("endpoints");
for (const auto& Ep : m_Endpoints)
@@ -1375,6 +1524,8 @@ private:
}
}
+ Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId);
+
{
std::scoped_lock Lock(m_ApplyTasksMutex);
if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
@@ -1382,7 +1533,6 @@ private:
Status->State = UpstreamApplyState::Complete;
Status->Result = {.Error{.ErrorCode = -1, .Reason = "No available endpoint"}};
}
- Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId);
}
}
catch (std::exception& e)
@@ -1397,30 +1547,13 @@ private:
}
}
- void ProcessUpstreamQueue()
- {
- for (;;)
- {
- UpstreamApplyRecord ApplyRecord;
- if (m_UpstreamQueue.WaitAndDequeue(ApplyRecord))
- {
- ProcessApplyRecord(std::move(ApplyRecord));
- }
-
- if (!m_RunState.IsRunning)
- {
- break;
- }
- }
- }
-
void ProcessApplyUpdates()
{
for (auto& Endpoint : m_Endpoints)
{
if (Endpoint->IsHealthy())
{
- GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates();
+ GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_AsyncWorkPool);
m_Stats.Add(*Endpoint, Result);
if (!Result.Success)
@@ -1515,24 +1648,14 @@ private:
if (m_RunState.Stop())
{
m_ShutdownEvent.Set();
-
- m_UpstreamQueue.CompleteAdding();
- for (std::thread& Thread : m_UpstreamThreads)
- {
- Thread.join();
- }
-
m_EndpointMonitorThread.join();
m_UpstreamUpdatesThread.join();
- m_UpstreamThreads.clear();
m_Endpoints.clear();
}
}
spdlog::logger& Log() { return m_Log; }
- using UpstreamApplyQueue = BlockingQueue<UpstreamApplyRecord>;
-
struct RunState
{
std::mutex Mutex;
@@ -1558,13 +1681,12 @@ private:
UpstreamApplyOptions m_Options;
CasStore& m_CasStore;
CidStore& m_CidStore;
- UpstreamApplyQueue m_UpstreamQueue;
UpstreamApplyStats m_Stats;
UpstreamApplyTasks m_ApplyTasks;
std::mutex m_ApplyTasksMutex;
std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints;
Event m_ShutdownEvent;
- std::vector<std::thread> m_UpstreamThreads;
+ WorkerThreadPool m_AsyncWorkPool;
std::thread m_UpstreamUpdatesThread;
std::thread m_EndpointMonitorThread;
RunState m_RunState;