// Copyright Epic Games, Inc. All Rights Reserved. #include "exec_cmd.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace std::literals; namespace eastl { template<> struct hash : public zen::IoHash::Hasher { }; } // namespace eastl #if ZEN_WITH_COMPUTE_SERVICES namespace zen { ExecCommand::ExecCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName), ""); m_Options.add_option("", "", "log", "Action log directory", cxxopts::value(m_RecordingLogPath), ""); m_Options.add_option("", "p", "path", "Recording path (directory or .actionlog file)", cxxopts::value(m_RecordingPath), ""); m_Options.add_option("", "", "offset", "Recording replay start offset", cxxopts::value(m_Offset), ""); m_Options.add_option("", "", "stride", "Recording replay stride", cxxopts::value(m_Stride), ""); m_Options.add_option("", "", "limit", "Recording replay limit", cxxopts::value(m_Limit), ""); m_Options.add_option("", "", "beacon", "Beacon path", cxxopts::value(m_BeaconPath), ""); m_Options.add_option("", "", "mode", "Select execution mode (http,inproc,dump,direct,beacon,buildlog)", cxxopts::value(m_Mode)->default_value("http"), ""); m_Options.add_option("", "", "quiet", "Quiet mode (less logging)", cxxopts::value(m_Quiet), ""); m_Options.parse_positional("mode"); } ExecCommand::~ExecCommand() { } void ExecCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { // Configure if (!ParseOptions(argc, argv)) { return; } m_HostName = ResolveTargetHostSpec(m_HostName); if (m_RecordingPath.empty()) { throw OptionParseException("replay path is required!", m_Options.help()); } m_VerboseLogging = GlobalOptions.IsVerbose; m_QuietLogging = m_Quiet && !m_VerboseLogging; enum ExecMode { kHttp, kDirect, kInproc, kDump, kBeacon, kBuildLog } Mode; if (m_Mode == "http"sv) { Mode = kHttp; } else if (m_Mode == "direct"sv) { Mode = kDirect; } else if (m_Mode == "inproc"sv) { Mode = kInproc; } else if (m_Mode == "dump"sv) { Mode = kDump; } else if (m_Mode == "beacon"sv) { Mode = kBeacon; } else if (m_Mode == "buildlog"sv) { Mode = kBuildLog; } else { throw OptionParseException("invalid mode specified!", m_Options.help()); } // Gather information from recording path std::unique_ptr Reader; std::unique_ptr UeReader; std::filesystem::path RecordingPath{m_RecordingPath}; if (!std::filesystem::is_directory(RecordingPath)) { throw OptionParseException("replay path should be a directory path!", m_Options.help()); } else { if (std::filesystem::is_directory(RecordingPath / "cid")) { Reader = std::make_unique(RecordingPath); m_WorkerMap = Reader->ReadWorkers(); m_ChunkResolver = Reader.get(); m_RecordingReader = Reader.get(); } else { UeReader = std::make_unique(RecordingPath); m_WorkerMap = UeReader->ReadWorkers(); m_ChunkResolver = UeReader.get(); m_RecordingReader = UeReader.get(); } } ZEN_CONSOLE("found {} workers, {} action items", m_WorkerMap.size(), m_RecordingReader->GetActionCount()); for (auto& Kv : m_WorkerMap) { CbObject WorkerDesc = Kv.second.GetObject(); const IoHash& WorkerId = Kv.first; RegisterWorkerFunctionsFromDescription(WorkerDesc, WorkerId); if (m_VerboseLogging) { zen::ExtendableStringBuilder<1024> ObjStr; # if 0 zen::CompactBinaryToJson(WorkerDesc, ObjStr); ZEN_CONSOLE("worker {}: {}", WorkerId, ObjStr); # else zen::CompactBinaryToYaml(WorkerDesc, ObjStr); ZEN_CONSOLE("worker {}:\n{}", WorkerId, ObjStr); # endif } } if (m_VerboseLogging) { EmitFunctionList(m_FunctionList); } // Iterate over work items and dispatch or log them int ReturnValue = 0; Stopwatch ExecTimer; switch (Mode) { case kHttp: // Forward requests to HTTP function service ReturnValue = HttpExecute(); break; case kDirect: // Not currently supported ReturnValue = LocalMessagingExecute(); break; case kInproc: // Handle execution in-core (by spawning child processes) ReturnValue = InProcessExecute(); break; case kDump: // Dump high level information about actions to console ReturnValue = DumpWorkItems(); break; case kBeacon: ReturnValue = BeaconExecute(); break; case kBuildLog: ReturnValue = BuildActionsLog(); break; default: ZEN_ERROR("Unknown operating mode! No work submitted"); ReturnValue = 1; } ZEN_CONSOLE("complete - took {}", NiceTimeSpanMs(ExecTimer.GetElapsedTimeMs())); if (!ReturnValue) { ZEN_CONSOLE("all work items completed successfully"); } else { ZEN_CONSOLE("some work items failed (code {})", ReturnValue); } } int ExecCommand::InProcessExecute() { ZEN_ASSERT(m_ChunkResolver); ChunkResolver& Resolver = *m_ChunkResolver; zen::compute::FunctionServiceSession FunctionSession(Resolver); std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); FunctionSession.AddLocalRunner(Resolver, TempPath); return ExecUsingSession(FunctionSession); } int ExecCommand::ExecUsingSession(zen::compute::FunctionServiceSession& FunctionSession) { struct JobTracker { public: inline void Insert(int LsnField) { RwLock::ExclusiveLockScope _(Lock); PendingJobs.insert(LsnField); } inline bool IsEmpty() const { RwLock::SharedLockScope _(Lock); return PendingJobs.empty(); } inline void Remove(int CompleteLsn) { RwLock::ExclusiveLockScope _(Lock); PendingJobs.erase(CompleteLsn); } inline size_t GetSize() const { RwLock::SharedLockScope _(Lock); return PendingJobs.size(); } private: mutable RwLock Lock; std::unordered_set PendingJobs; }; JobTracker PendingJobs; std::atomic IsDraining{0}; auto DrainCompletedJobs = [&] { if (IsDraining.exchange(1)) { return; } auto _ = MakeGuard([&] { IsDraining.store(0, std::memory_order_release); }); CbObjectWriter Cbo; FunctionSession.GetCompleted(Cbo); if (CbObject Completed = Cbo.Save()) { for (auto& It : Completed["completed"sv]) { int32_t CompleteLsn = It.AsInt32(); CbPackage ResultPackage; HttpResponseCode Response = FunctionSession.GetActionResult(CompleteLsn, /* out */ ResultPackage); if (Response == HttpResponseCode::OK) { PendingJobs.Remove(CompleteLsn); ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, PendingJobs.GetSize()); } } } }; // Describe workers ZEN_CONSOLE("describing {} workers", m_WorkerMap.size()); for (auto Kv : m_WorkerMap) { CbPackage WorkerDesc = Kv.second; FunctionSession.RegisterWorker(WorkerDesc); } // Then submit work items int FailedWorkCounter = 0; size_t RemainingWorkItems = m_RecordingReader->GetActionCount(); int SubmittedWorkItems = 0; ZEN_CONSOLE("submitting {} work items", RemainingWorkItems); int OffsetCounter = m_Offset; int StrideCounter = m_Stride; auto ShouldSchedule = [&]() -> bool { if (m_Limit && SubmittedWorkItems >= m_Limit) { // Limit reached, ignore return false; } if (OffsetCounter && OffsetCounter--) { // Still in offset, ignore return false; } if (--StrideCounter == 0) { StrideCounter = m_Stride; return true; } return false; }; m_RecordingReader->IterateActions( [&](CbObject ActionObject, const IoHash& ActionId) { // Enqueue job Stopwatch SubmitTimer; const int Priority = 0; if (ShouldSchedule()) { if (m_VerboseLogging) { int AttachmentCount = 0; uint64_t AttachmentBytes = 0; eastl::hash_set ReferencedChunks; ActionObject.IterateAttachments([&](CbFieldView Field) { IoHash AttachData = Field.AsAttachment(); ReferencedChunks.insert(AttachData); ++AttachmentCount; if (IoBuffer ChunkData = m_ChunkResolver->FindChunkByCid(AttachData)) { AttachmentBytes += ChunkData.GetSize(); } }); zen::ExtendableStringBuilder<1024> ObjStr; zen::CompactBinaryToJson(ActionObject, ObjStr); ZEN_CONSOLE("work item {} ({} attachments, {} bytes): {}", ActionId, AttachmentCount, NiceBytes(AttachmentBytes), ObjStr); } if (zen::compute::FunctionServiceSession::EnqueueResult EnqueueResult = FunctionSession.EnqueueAction(ActionObject, Priority)) { const int32_t LsnField = EnqueueResult.Lsn; --RemainingWorkItems; ++SubmittedWorkItems; if (!m_QuietLogging) { ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining", SubmittedWorkItems, LsnField, NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()), RemainingWorkItems); } PendingJobs.Insert(LsnField); } else { if (!m_QuietLogging) { std::string_view FunctionName = ActionObject["Function"sv].AsString(); const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid(); const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid(); ZEN_ERROR( "failed to resolve function for work with (Function:{},FunctionVersion:{},BuildSystemVersion:{}). Work " "descriptor " "at: 'file://{}'", std::string(FunctionName), FunctionVersion, BuildSystemVersion, ""); EmitFunctionListOnce(m_FunctionList); } ++FailedWorkCounter; } } // Check for completed work DrainCompletedJobs(); }, 8); // Wait until all pending work is complete while (!PendingJobs.IsEmpty()) { // TODO: improve this logic zen::Sleep(500); DrainCompletedJobs(); } if (FailedWorkCounter) { return 1; } return 0; } int ExecCommand::LocalMessagingExecute() { // Non-HTTP work submission path // To be reimplemented using final transport return 0; } ////////////////////////////////////////////////////////////////////////// int ExecCommand::HttpExecute() { ZEN_ASSERT(m_ChunkResolver); ChunkResolver& Resolver = *m_ChunkResolver; std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); zen::compute::FunctionServiceSession FunctionSession(Resolver); FunctionSession.AddRemoteRunner(Resolver, TempPath, m_HostName); return ExecUsingSession(FunctionSession); } int ExecCommand::BeaconExecute() { ZEN_ASSERT(m_ChunkResolver); ChunkResolver& Resolver = *m_ChunkResolver; std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); zen::compute::FunctionServiceSession FunctionSession(Resolver); FunctionSession.AddRemoteRunner(Resolver, TempPath, "http://localhost:8558"); // FunctionSession.AddRemoteRunner(Resolver, TempPath, "http://10.99.9.246:8558"); return ExecUsingSession(FunctionSession); } ////////////////////////////////////////////////////////////////////////// void ExecCommand::RegisterWorkerFunctionsFromDescription(const CbObject& WorkerDesc, const IoHash& WorkerId) { const Guid WorkerBuildSystemVersion = WorkerDesc["buildsystem_version"sv].AsUuid(); for (auto& Item : WorkerDesc["functions"sv]) { CbObjectView Function = Item.AsObjectView(); std::string_view FunctionName = Function["name"sv].AsString(); const Guid FunctionVersion = Function["version"sv].AsUuid(); m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName}, .FunctionVersion = FunctionVersion, .BuildSystemVersion = WorkerBuildSystemVersion, .WorkerId = WorkerId}); } } void ExecCommand::EmitFunctionListOnce(const std::vector& FunctionList) { if (m_FunctionListEmittedOnce == false) { EmitFunctionList(FunctionList); m_FunctionListEmittedOnce = true; } } int ExecCommand::DumpWorkItems() { std::atomic EmittedCount{0}; eastl::hash_map SeenAttachments; // Attachment CID -> count of references m_RecordingReader->IterateActions( [&](CbObject ActionObject, const IoHash& ActionId) { eastl::hash_map Attachments; uint64_t AttachmentBytes = 0; uint64_t UncompressedAttachmentBytes = 0; ActionObject.IterateAttachments([&](const zen::CbFieldView AttachmentField) { const IoHash AttachmentCid = AttachmentField.GetValue().AsHash(); IoBuffer AttachmentData = m_ChunkResolver->FindChunkByCid(AttachmentCid); IoHash RawHash; uint64_t RawSize = 0; CompressedBuffer CompressedData = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentData), RawHash, RawSize); Attachments[AttachmentCid] = CompressedData; AttachmentBytes += CompressedData.GetCompressedSize(); UncompressedAttachmentBytes += CompressedData.DecodeRawSize(); if (auto [Iter, Inserted] = SeenAttachments.insert({AttachmentCid, 1}); !Inserted) { ++Iter->second; } }); zen::ExtendableStringBuilder<1024> ObjStr; # if 0 zen::CompactBinaryToJson(ActionObject, ObjStr); ZEN_CONSOLE("work item {} ({} attachments): {}", ActionId, Attachments.size(), ObjStr); # else zen::CompactBinaryToYaml(ActionObject, ObjStr); ZEN_CONSOLE("work item {} ({} attachments, {}->{} bytes):\n{}", ActionId, Attachments.size(), AttachmentBytes, UncompressedAttachmentBytes, ObjStr); # endif ++EmittedCount; }, 1); ZEN_CONSOLE("emitted: {} actions", EmittedCount.load()); eastl::map> ReferenceHistogram; for (const auto& [K, V] : SeenAttachments) { if (V > 1) { ReferenceHistogram[V].push_back(K); } } for (const auto& [RefCount, Cids] : ReferenceHistogram) { ZEN_CONSOLE("{} attachments with {} references", Cids.size(), RefCount); } return 0; } ////////////////////////////////////////////////////////////////////////// int ExecCommand::BuildActionsLog() { ZEN_ASSERT(m_ChunkResolver); ChunkResolver& Resolver = *m_ChunkResolver; if (m_RecordingPath.empty()) { throw OptionParseException("need to specify recording path", m_Options.help()); } if (std::filesystem::exists(m_RecordingLogPath)) { throw OptionParseException(fmt::format("recording log directory '{}' already exists!", m_RecordingLogPath), m_Options.help()); } ZEN_NOT_IMPLEMENTED("build log generation not implemented yet!"); std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); zen::compute::FunctionServiceSession FunctionSession(Resolver); FunctionSession.StartRecording(Resolver, m_RecordingLogPath); return ExecUsingSession(FunctionSession); } void ExecCommand::EmitFunctionList(const std::vector& FunctionList) { ZEN_CONSOLE("=== Known functions:\n==========================="); ZEN_CONSOLE("{:30} {:36} {:36} {}", "function", "version", "build system", "worker id"); for (const FunctionDefinition& Func : FunctionList) { ZEN_CONSOLE("{:30} {:36} {:36} {}", Func.FunctionName, Func.FunctionVersion, Func.BuildSystemVersion, Func.WorkerId); } ZEN_CONSOLE("==========================="); } } // namespace zen #endif // ZEN_WITH_COMPUTE_SERVICES