aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/sessionsclient.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil/sessionsclient.cpp')
-rw-r--r--src/zenutil/sessionsclient.cpp460
1 files changed, 232 insertions, 228 deletions
diff --git a/src/zenutil/sessionsclient.cpp b/src/zenutil/sessionsclient.cpp
index c62cc4099..6ba997a62 100644
--- a/src/zenutil/sessionsclient.cpp
+++ b/src/zenutil/sessionsclient.cpp
@@ -2,15 +2,12 @@
#include <zenutil/sessionsclient.h>
-#include <zencore/blockingqueue.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
#include <zencore/logging/logmsg.h>
#include <zencore/thread.h>
-#include <zenhttp/httpclient.h>
-#include <thread>
#include <vector>
ZEN_THIRD_PARTY_INCLUDES_START
@@ -21,232 +18,76 @@ namespace zen {
//////////////////////////////////////////////////////////////////////////
//
-// SessionLogSink — batching log sink that forwards to /sessions/{id}/log
+// SessionLogSink — thin enqueuer that posts log messages to the
+// SessionsServiceClient worker thread via its BlockingQueue.
//
-static const char*
-LogLevelToString(logging::LogLevel Level)
-{
- switch (Level)
- {
- case logging::Trace:
- return "trace";
- case logging::Debug:
- return "debug";
- case logging::Info:
- return "info";
- case logging::Warn:
- return "warn";
- case logging::Err:
- return "error";
- case logging::Critical:
- return "critical";
- default:
- return "info";
- }
-}
-
-struct BufferedLogEntry
-{
- enum class Type : uint8_t
- {
- Log,
- Flush,
- Shutdown
- };
-
- Type Type = Type::Log;
- std::string Level;
- std::string Message;
-};
-
class SessionLogSink final : public logging::Sink
{
public:
- SessionLogSink(std::string TargetUrl, std::string LogPath) : m_LogPath(std::move(LogPath))
- {
- HttpClientSettings Settings;
- Settings.ConnectTimeout = std::chrono::milliseconds(3000);
- m_Http = std::make_unique<HttpClient>(std::move(TargetUrl), Settings);
-
- SetLevel(logging::Info);
-
- m_WorkerThread = std::thread([this]() {
- zen::SetCurrentThreadName("SessionLog");
- WorkerLoop();
- });
- }
+ explicit SessionLogSink(BlockingQueue<SessionsServiceClient::SessionCommand>* Queue) : m_Queue(Queue) { SetLevel(logging::Info); }
- ~SessionLogSink() override
- {
- BufferedLogEntry ShutdownMsg;
- ShutdownMsg.Type = BufferedLogEntry::Type::Shutdown;
- m_Queue.Enqueue(std::move(ShutdownMsg));
-
- if (m_WorkerThread.joinable())
- {
- m_WorkerThread.join();
- }
- }
+ ~SessionLogSink() override = default;
void Log(const logging::LogMessage& Msg) override
{
- BufferedLogEntry Entry;
- Entry.Type = BufferedLogEntry::Type::Log;
- Entry.Level = LogLevelToString(Msg.GetLevel());
- Entry.Message = std::string(Msg.GetPayload());
- m_Queue.Enqueue(std::move(Entry));
+ SessionsServiceClient::SessionCommand Cmd;
+ Cmd.CommandType = SessionsServiceClient::SessionCommand::Type::Log;
+ Cmd.LogLevel = Msg.GetLevel();
+ Cmd.LogMessage = CompactString(Msg.GetPayload());
+ m_Queue->Enqueue(std::move(Cmd));
}
void Flush() override
{
- // Best-effort: enqueue a flush marker so the worker sends any pending entries
- BufferedLogEntry FlushMsg;
- FlushMsg.Type = BufferedLogEntry::Type::Flush;
- m_Queue.Enqueue(std::move(FlushMsg));
+ SessionsServiceClient::SessionCommand Cmd;
+ Cmd.CommandType = SessionsServiceClient::SessionCommand::Type::FlushLogs;
+ m_Queue->Enqueue(std::move(Cmd));
}
void SetFormatter(std::unique_ptr<logging::Formatter> /*InFormatter*/) override
{
- // No formatting needed — we send raw message text
+ // No formatting needed - we send raw message text
}
private:
- static constexpr size_t BatchSize = 50;
-
- void WorkerLoop()
- {
- std::vector<BufferedLogEntry> Batch;
- Batch.reserve(BatchSize);
-
- BufferedLogEntry Msg;
- while (m_Queue.WaitAndDequeue(Msg))
- {
- if (Msg.Type == BufferedLogEntry::Type::Shutdown)
- {
- // Drain remaining log entries
- BufferedLogEntry Remaining;
- while (m_Queue.WaitAndDequeue(Remaining))
- {
- if (Remaining.Type == BufferedLogEntry::Type::Log)
- {
- Batch.push_back(std::move(Remaining));
- }
- }
- if (!Batch.empty())
- {
- SendBatch(Batch);
- }
- return;
- }
-
- if (Msg.Type == BufferedLogEntry::Type::Flush)
- {
- if (!Batch.empty())
- {
- SendBatch(Batch);
- Batch.clear();
- }
- continue;
- }
-
- // Log entry
- Batch.push_back(std::move(Msg));
-
- if (Batch.size() >= BatchSize)
- {
- SendBatch(Batch);
- Batch.clear();
- }
- else
- {
- // Drain any additional queued entries without blocking
- while (Batch.size() < BatchSize && m_Queue.Size() > 0)
- {
- BufferedLogEntry Extra;
- if (m_Queue.WaitAndDequeue(Extra))
- {
- if (Extra.Type == BufferedLogEntry::Type::Shutdown)
- {
- if (!Batch.empty())
- {
- SendBatch(Batch);
- }
- // Drain remaining
- while (m_Queue.WaitAndDequeue(Extra))
- {
- if (Extra.Type == BufferedLogEntry::Type::Log)
- {
- Batch.push_back(std::move(Extra));
- }
- }
- if (!Batch.empty())
- {
- SendBatch(Batch);
- }
- return;
- }
- if (Extra.Type == BufferedLogEntry::Type::Log)
- {
- Batch.push_back(std::move(Extra));
- }
- else if (Extra.Type == BufferedLogEntry::Type::Flush)
- {
- break;
- }
- }
- }
-
- if (!Batch.empty())
- {
- SendBatch(Batch);
- Batch.clear();
- }
- }
- }
- }
-
- void SendBatch(const std::vector<BufferedLogEntry>& Batch)
- {
- try
- {
- CbObjectWriter Writer;
- Writer.BeginArray("entries");
- for (const BufferedLogEntry& Entry : Batch)
- {
- Writer.BeginObject();
- Writer << "level" << Entry.Level;
- Writer << "message" << Entry.Message;
- Writer.EndObject();
- }
- Writer.EndArray();
-
- HttpClient::Response Result = m_Http->Post(m_LogPath, Writer.Save());
- (void)Result; // Best-effort
- }
- catch (const std::exception&)
- {
- // Best-effort — silently discard on failure
- }
- }
-
- std::string m_LogPath;
- std::unique_ptr<HttpClient> m_Http;
- BlockingQueue<BufferedLogEntry> m_Queue;
- std::thread m_WorkerThread;
+ BlockingQueue<SessionsServiceClient::SessionCommand>* m_Queue;
};
+//////////////////////////////////////////////////////////////////////////
+//
+// SessionsServiceClient
+//
+
SessionsServiceClient::SessionsServiceClient(Options Opts)
: m_Log(logging::Get("sessionsclient"))
, m_Options(std::move(Opts))
-, m_SessionPath(fmt::format("sessions/{}", m_Options.SessionId))
+, m_SessionPath(fmt::format("/sessions/{}", m_Options.SessionId))
{
- HttpClientSettings Settings;
- Settings.ConnectTimeout = std::chrono::milliseconds(3000);
- m_Http = std::make_unique<HttpClient>(m_Options.TargetUrl, Settings);
+ // Strip trailing slash to avoid double-slash when appending paths like /sessions/{id}
+ while (m_Options.TargetUrl.ends_with('/'))
+ {
+ m_Options.TargetUrl.pop_back();
+ }
+
+ m_WorkerThread = std::thread([this]() {
+ zen::SetCurrentThreadName("SessionIO");
+ WorkerLoop();
+ });
}
-SessionsServiceClient::~SessionsServiceClient() = default;
+SessionsServiceClient::~SessionsServiceClient()
+{
+ SessionCommand ShutdownCmd;
+ ShutdownCmd.CommandType = SessionCommand::Type::Shutdown;
+ m_Queue.Enqueue(std::move(ShutdownCmd));
+ m_Queue.CompleteAdding();
+
+ if (m_WorkerThread.joinable())
+ {
+ m_WorkerThread.join();
+ }
+}
CbObject
SessionsServiceClient::BuildRequestBody(CbObjectView Metadata) const
@@ -261,21 +102,65 @@ SessionsServiceClient::BuildRequestBody(CbObjectView Metadata) const
{
Writer << "jobid" << m_Options.JobId;
}
- if (Metadata.GetSize() > 0)
+ if (Metadata)
{
Writer.AddObject("metadata", Metadata);
}
return Writer.Save();
}
-bool
+//////////////////////////////////////////////////////////////////////////
+// Public API — non-blocking enqueuers
+
+void
SessionsServiceClient::Announce(CbObjectView Metadata)
{
+ SessionCommand Cmd;
+ Cmd.CommandType = SessionCommand::Type::Announce;
+ if (Metadata)
+ {
+ Cmd.Metadata = CbObject::Clone(Metadata);
+ }
+ m_Queue.Enqueue(std::move(Cmd));
+}
+
+void
+SessionsServiceClient::UpdateMetadata(CbObjectView Metadata)
+{
+ SessionCommand Cmd;
+ Cmd.CommandType = SessionCommand::Type::UpdateMetadata;
+ if (Metadata)
+ {
+ Cmd.Metadata = CbObject::Clone(Metadata);
+ }
+ m_Queue.Enqueue(std::move(Cmd));
+}
+
+void
+SessionsServiceClient::Remove()
+{
+ SessionCommand Cmd;
+ Cmd.CommandType = SessionCommand::Type::Remove;
+ m_Queue.Enqueue(std::move(Cmd));
+}
+
+logging::SinkPtr
+SessionsServiceClient::CreateLogSink()
+{
+ return Ref(new SessionLogSink(&m_Queue));
+}
+
+//////////////////////////////////////////////////////////////////////////
+// Worker thread — processes all session HTTP I/O
+
+void
+SessionsServiceClient::DoAnnounce(HttpClient& Http, CbObjectView Metadata)
+{
try
{
CbObject Body = BuildRequestBody(Metadata);
- HttpClient::Response Result = m_Http->Post(m_SessionPath, std::move(Body));
+ HttpClient::Response Result = Http.Post(m_SessionPath, std::move(Body));
if (Result.Error)
{
@@ -283,26 +168,24 @@ SessionsServiceClient::Announce(CbObjectView Metadata)
m_Options.TargetUrl,
static_cast<int>(Result.Error->ErrorCode),
Result.Error->ErrorMessage);
- return false;
+ return;
}
if (!IsHttpOk(Result.StatusCode))
{
ZEN_WARN("sessions announce failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode));
- return false;
+ return;
}
- ZEN_INFO("session announced to '{}'", m_Options.TargetUrl);
- return true;
+ ZEN_DEBUG("session announced to '{}'", m_Options.TargetUrl);
}
catch (const std::exception& Ex)
{
ZEN_WARN("sessions announce failed for '{}': {}", m_Options.TargetUrl, Ex.what());
- return false;
}
}
-bool
-SessionsServiceClient::UpdateMetadata(CbObjectView Metadata)
+void
+SessionsServiceClient::DoUpdateMetadata(HttpClient& Http, CbObjectView Metadata)
{
try
{
@@ -311,7 +194,7 @@ SessionsServiceClient::UpdateMetadata(CbObjectView Metadata)
MemoryView View = Body.GetView();
IoBuffer Payload = IoBufferBuilder::MakeCloneFromMemory(View, ZenContentType::kCbObject);
- HttpClient::Response Result = m_Http->Put(m_SessionPath, Payload);
+ HttpClient::Response Result = Http.Put(m_SessionPath, Payload);
if (Result.Error)
{
@@ -319,29 +202,26 @@ SessionsServiceClient::UpdateMetadata(CbObjectView Metadata)
m_Options.TargetUrl,
static_cast<int>(Result.Error->ErrorCode),
Result.Error->ErrorMessage);
- return false;
+ return;
}
if (!IsHttpOk(Result.StatusCode))
{
ZEN_WARN("sessions update failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode));
- return false;
+ return;
}
-
- return true;
}
catch (const std::exception& Ex)
{
ZEN_WARN("sessions update failed for '{}': {}", m_Options.TargetUrl, Ex.what());
- return false;
}
}
-bool
-SessionsServiceClient::Remove()
+void
+SessionsServiceClient::DoRemove(HttpClient& Http)
{
try
{
- HttpClient::Response Result = m_Http->Delete(m_SessionPath);
+ HttpClient::Response Result = Http.Delete(m_SessionPath);
if (Result.Error)
{
@@ -349,29 +229,153 @@ SessionsServiceClient::Remove()
m_Options.TargetUrl,
static_cast<int>(Result.Error->ErrorCode),
Result.Error->ErrorMessage);
- return false;
+ return;
}
if (!IsHttpOk(Result.StatusCode))
{
ZEN_WARN("sessions remove failed for '{}': HTTP status {}", m_Options.TargetUrl, static_cast<int>(Result.StatusCode));
- return false;
+ return;
}
- ZEN_INFO("session removed from '{}'", m_Options.TargetUrl);
- return true;
+ ZEN_DEBUG("session removed from '{}'", m_Options.TargetUrl);
}
catch (const std::exception& Ex)
{
ZEN_WARN("sessions remove failed for '{}': {}", m_Options.TargetUrl, Ex.what());
- return false;
}
}
-logging::SinkPtr
-SessionsServiceClient::CreateLogSink()
+void
+SessionsServiceClient::SendLogBatch(HttpClient& Http, const std::string& LogPath, const std::vector<SessionCommand>& Batch)
{
+ try
+ {
+ CbObjectWriter Writer;
+ Writer.BeginArray("entries");
+ for (const SessionCommand& Entry : Batch)
+ {
+ Writer.BeginObject();
+ Writer << "level" << static_cast<int32_t>(Entry.LogLevel);
+ Writer << "message" << Entry.LogMessage.c_str();
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+
+ HttpClient::Response Result = Http.Post(LogPath, Writer.Save());
+ (void)Result; // Best-effort
+ }
+ catch (const std::exception&)
+ {
+ // Best-effort — silently discard on failure
+ }
+}
+
+void
+SessionsServiceClient::WorkerLoop()
+{
+ HttpClientSettings Settings = m_Options.ClientSettings;
+ Settings.ConnectTimeout = std::chrono::milliseconds(3000);
+ Settings.Timeout = std::chrono::milliseconds(5000);
+ HttpClient Http(m_Options.TargetUrl, Settings);
+
std::string LogPath = m_SessionPath + "/log";
- return Ref(new SessionLogSink(m_Options.TargetUrl, std::move(LogPath)));
+ bool Removed = false;
+
+ static constexpr size_t BatchSize = 50;
+
+ std::vector<SessionCommand> LogBatch;
+ LogBatch.reserve(BatchSize);
+
+ auto FlushLogBatch = [&]() {
+ if (!LogBatch.empty())
+ {
+ SendLogBatch(Http, LogPath, LogBatch);
+ LogBatch.clear();
+ }
+ };
+
+ // Returns false to signal loop exit (Shutdown received)
+ auto ProcessCommand = [&](SessionCommand& Cmd) -> bool {
+ switch (Cmd.CommandType)
+ {
+ case SessionCommand::Type::Log:
+ LogBatch.push_back(std::move(Cmd));
+ if (LogBatch.size() >= BatchSize)
+ {
+ FlushLogBatch();
+ }
+ return true;
+
+ case SessionCommand::Type::FlushLogs:
+ FlushLogBatch();
+ return true;
+
+ case SessionCommand::Type::Announce:
+ FlushLogBatch();
+ DoAnnounce(Http, Cmd.Metadata);
+ return true;
+
+ case SessionCommand::Type::UpdateMetadata:
+ FlushLogBatch();
+ DoUpdateMetadata(Http, Cmd.Metadata);
+ return true;
+
+ case SessionCommand::Type::Remove:
+ FlushLogBatch();
+ if (!Removed)
+ {
+ Removed = true;
+ DoRemove(Http);
+ }
+ return true;
+
+ case SessionCommand::Type::Shutdown:
+ {
+ // Drain remaining log entries from the queue
+ SessionCommand Remaining;
+ while (m_Queue.WaitAndDequeue(Remaining))
+ {
+ if (Remaining.CommandType == SessionCommand::Type::Log)
+ {
+ LogBatch.push_back(std::move(Remaining));
+ }
+ }
+ FlushLogBatch();
+
+ if (!Removed)
+ {
+ Removed = true;
+ DoRemove(Http);
+ }
+ return false;
+ }
+ }
+ return true;
+ };
+
+ SessionCommand Cmd;
+ while (m_Queue.WaitAndDequeue(Cmd))
+ {
+ if (!ProcessCommand(Cmd))
+ {
+ return;
+ }
+
+ // Drain additional queued entries without blocking (batching optimization)
+ while (LogBatch.size() < BatchSize && m_Queue.Size() > 0)
+ {
+ SessionCommand Extra;
+ if (m_Queue.WaitAndDequeue(Extra))
+ {
+ if (!ProcessCommand(Extra))
+ {
+ return;
+ }
+ }
+ }
+
+ FlushLogBatch();
+ }
}
} // namespace zen